Hi everyone! I’m doing my first commercial project in Elixir and I stumbled upon an interesting challenge.
ChatGPT is asked to generate JSON-formatted response. I’m consuming it as a stream, where each chunk is a response token. I managed to create a stream of Map updates, something along the lines:
stream
|> Stream.map(&IO.inspect/1)
|> Stream.run
# {:updated, {"image_prompt_1", ""}}
# {:updated, {"image_prompt_1", "A"}}
# {:updated, {"image_prompt_1", "A beautiful"}}
# {:finished, {"image_prompt_1", "A beautiful ocean"}}
# {:updated, {"image_prompt_2", ""}}
# {:updated, {"image_prompt_2", "A"}}
# {:updated, {"image_prompt_2", "A dark"}}
# {:finished, {"image_prompt_2", "A dark forest"}}
# ...
# {:finished, {"some_other_key", "Some value"}}
That stream can be consumed to create a final Map like this:
content = for {:finished, data} <- stream, do: data, into: %{}
# %{
# "image_prompt_1" => "A beautiful ocean",
# "image_prompt_2" => "A dark forest"
# "some_other_key" => "Some value"
# }
And here comes my challenge. For some keys, I’d like to generate additional data asynchronously. Eg. for image_prompt_1
and image_prompt_2
I’d like to ask Midjourney to generate images and put them into image_1
and image_2
keys. So something like Task.async_stream
, but just for a subset of stream elements and generates new items, do not alters existing ones.
This is something I came up with so far:
defmodule StreamUtils do
def stream_emit_async(stream, create_tasks, timeout \\ 60000) do
stream
|> Stream.concat([:exit]) # I had to add :exit at the end of the stream to await pending tasks
|> Stream.transform([], fn
:exit, tasks -> {Task.await_many(tasks, timeout), []}
item, tasks -> stream_inject_tasks_transform(item, tasks, create_tasks)
end)
end
defp stream_inject_tasks_transform(item, current_tasks, create_tasks) do
# create_tasks can return a single function, a list of functions, or nothing
functions = case create_tasks.(item) do
nil -> []
data when is_list(data) -> data
fun -> [fun]
end
{items, tasks} = functions
|> Enum.map(&Task.async/1)
|> Enum.concat(current_tasks)
|> check_finished_tasks
{[item] ++ items, tasks}
end
defp check_finished_tasks(tasks) do
# timeout 0 to check and return instantly
tasks_with_results = Task.yield_many(tasks, 0)
items = for {task, {:ok, result}} <- tasks_with_results, do: result
tasks = for {task, nil} <- tasks_with_results, do: task
{items, tasks}
end
end
Usage:
object_delta_stream
|> StreamUtils.stream_emit_async(
fn {event, {key, value}} ->
if event == :finished && String.contains?(key, "image_prompt") do
fn -> {:finished, {String.replace(key, "image_prompt", "image"), make_api_call(value)}} end
end
end)
and… it kinda works. Not all corner cases are yet covered (eg task crashes). Just, my question. Is there maybe a better solution? Something built-in? Also this only checks if any task finished when we receive an item, not straight away. I tried playing with Stream.resouce but I don’t know if we can take just one item from stream and resume stream later…?