How to make Task.async_stream return results as they are ready?

How to make Task.async_stream return results as they are ready?

I have the following code:

defmodule Demo do
  def run do
    child =
      spawn(fn ->
        Stream.resource(
          fn -> :ok end,
          fn _ ->
            receive do
              {:msg, msg} ->
                IO.puts("# received #{inspect(msg)}")
                {[msg], nil}

              _ ->
                {[], nil}
            end
          end,
          fn _ -> :ok end
        )
        |> Stream.each(fn x ->
          IO.puts("## before #{inspect(x)}")
          x
        end)
        |> Task.async_stream(
          fn x ->
            IO.puts("### during #{inspect(x)}")
            x
          end,
          ordered: false,
          max_concurrency: 2
        )
        |> Stream.each(fn x ->
          IO.puts("#### after #{inspect(x)}")
          x
        end)
        |> Stream.run()
      end)

    1..3
    |> Enum.each(fn x -> send(child, {:msg, x}) end)
  end
end

When run, I get the following output:

iex(6)> Demo.run
# received 1
## before 1
# received 2
### during 1
## before 2
#### after {:ok, 1}
### during 2
# received 3
## before 3
### during 3
#### after {:ok, 2}

The Stream.each after the Task.async_stream is never called with the last message. Async_stream seems to withhold output until it has max_concurrency results ready.

Is there any way to make to avoid the buffering?

Background: I’m building a data processing pipeline that processes endless stream of data and cleanup after each task has to happen immediately after its done. I could do it in the task itself, but that would break encapsulation and force me into complicating things with process messaging. I previously used GenStage, which worked well at small scale, but could use performance improvement as no backpressure is needed.

Try passing ordered: false to async_stream; the docs specifically mention “This is also useful when you’re using the tasks only for the side effects”.

1 Like

Replace the Stream.each + Task.async_stream with Enum.each + Task.async, collecting the handles to all tasks and then just Task.await_many them (note that this succeeds and does not block even if the tasks are already completed).

That, or @al2o3cr’s approach.

I don’t get that. Sounds like you are complicating things by not doing it in the task itself from where I am standing.