How to make Task.async_stream return results as they are ready?
I have the following code:
defmodule Demo do
def run do
child =
spawn(fn ->
Stream.resource(
fn -> :ok end,
fn _ ->
receive do
{:msg, msg} ->
IO.puts("# received #{inspect(msg)}")
{[msg], nil}
_ ->
{[], nil}
end
end,
fn _ -> :ok end
)
|> Stream.each(fn x ->
IO.puts("## before #{inspect(x)}")
x
end)
|> Task.async_stream(
fn x ->
IO.puts("### during #{inspect(x)}")
x
end,
ordered: false,
max_concurrency: 2
)
|> Stream.each(fn x ->
IO.puts("#### after #{inspect(x)}")
x
end)
|> Stream.run()
end)
1..3
|> Enum.each(fn x -> send(child, {:msg, x}) end)
end
end
When run, I get the following output:
iex(6)> Demo.run
# received 1
## before 1
# received 2
### during 1
## before 2
#### after {:ok, 1}
### during 2
# received 3
## before 3
### during 3
#### after {:ok, 2}
The Stream.each after the Task.async_stream is never called with the last message. Async_stream seems to withhold output until it has max_concurrency results ready.
Is there any way to make to avoid the buffering?
Background: I’m building a data processing pipeline that processes endless stream of data and cleanup after each task has to happen immediately after its done. I could do it in the task itself, but that would break encapsulation and force me into complicating things with process messaging. I previously used GenStage, which worked well at small scale, but could use performance improvement as no backpressure is needed.