Execute Tasks generating additional items for Stream

Hi everyone! I’m doing my first commercial project in Elixir and I stumbled upon an interesting challenge.

ChatGPT is asked to generate JSON-formatted response. I’m consuming it as a stream, where each chunk is a response token. I managed to create a stream of Map updates, something along the lines:

stream
|> Stream.map(&IO.inspect/1)
|> Stream.run

# {:updated, {"image_prompt_1", ""}}
# {:updated, {"image_prompt_1", "A"}}
# {:updated, {"image_prompt_1", "A beautiful"}}
# {:finished, {"image_prompt_1", "A beautiful ocean"}}

# {:updated, {"image_prompt_2", ""}}
# {:updated, {"image_prompt_2", "A"}}
# {:updated, {"image_prompt_2", "A dark"}}
# {:finished, {"image_prompt_2", "A dark forest"}}
# ...
# {:finished, {"some_other_key", "Some value"}}

That stream can be consumed to create a final Map like this:

content = for {:finished, data} <- stream, do: data, into: %{}

# %{
#  "image_prompt_1" => "A beautiful ocean", 
#  "image_prompt_2" => "A dark forest"
#  "some_other_key" => "Some value"
# }

And here comes my challenge. For some keys, I’d like to generate additional data asynchronously. Eg. for image_prompt_1 and image_prompt_2 I’d like to ask Midjourney to generate images and put them into image_1 and image_2 keys. So something like Task.async_stream, but just for a subset of stream elements and generates new items, do not alters existing ones.

This is something I came up with so far:

defmodule StreamUtils do
  def stream_emit_async(stream, create_tasks, timeout \\ 60000) do
    stream
    |> Stream.concat([:exit]) # I had to add :exit at the end of the stream to await pending tasks
    |> Stream.transform([], fn
      :exit, tasks -> {Task.await_many(tasks, timeout), []}
      item, tasks -> stream_inject_tasks_transform(item, tasks, create_tasks)
    end)
  end

  defp stream_inject_tasks_transform(item, current_tasks, create_tasks) do
    # create_tasks can return a single function, a list of functions, or nothing
    functions = case create_tasks.(item) do
      nil -> []
      data when is_list(data) -> data
      fun -> [fun]
    end

    {items, tasks} = functions
    |> Enum.map(&Task.async/1)
    |> Enum.concat(current_tasks)
    |> check_finished_tasks

    {[item] ++ items, tasks}
  end

  defp check_finished_tasks(tasks) do
    # timeout 0 to check and return instantly
    tasks_with_results = Task.yield_many(tasks, 0)
    items = for {task, {:ok, result}} <- tasks_with_results, do: result
    tasks = for {task, nil} <- tasks_with_results, do: task
    {items, tasks}
  end
end

Usage:

object_delta_stream
|> StreamUtils.stream_emit_async(
  fn {event, {key, value}} ->
    if event == :finished && String.contains?(key, "image_prompt") do
      fn -> {:finished, {String.replace(key, "image_prompt", "image"), make_api_call(value)}} end
    end
  end)

and… it kinda works. Not all corner cases are yet covered (eg task crashes). Just, my question. Is there maybe a better solution? Something built-in? Also this only checks if any task finished when we receive an item, not straight away. I tried playing with Stream.resouce but I don’t know if we can take just one item from stream and resume stream later…?

You might really be interested in the book Concurrent Data Processing in Elixir, which addresses exactly this sort of issue. I think it sounds like you want a supervisor that takes in your stream and dispatches messages to appropriate genservers to process each item in the stream, which will cover you for the situation in which tasks crash and also automatically receives a message when the job is completed.

So something like

Stream -> JobSupervisor |
                        |_ImgJobServer
                        |_OtherJobServer

I’d second the prev. answer here. Streams are probably the wrong abstraction here. You’re not looking for lazy enumerables, you’re looking for a message ingestion pipeline with data hydration. That stuff is likely better managed with the likes of gen_stage or flow (maybe broadway).