Demand backpressure with Flow.from_enumerable and Stream

Hello, I’m feeding a flow into a stream and I’ve noticed that the flow floods the stream’s message queue if the stream is slow. I thought lowering the max_demand on the flow would limit the number of messages each stage produced at a time, but that doesn’t seem to be the case.

In my specific case, this results in the stream consuming large amounts of memory because each item produced by the flow is quite large.

Here’s the test case I’m using:

def test(sleep_ms, flow_opts) do
  Stream.repeatedly(fn -> nil end)
  |> Flow.from_enumerable(flow_opts)
  |> Flow.map(fn _ -> String.duplicate("0", 1024 * 1024) end)
  |> Stream.map(fn _ ->
    {:messages, messages} = :erlang.process_info(self(), :messages)

    messages
    |> Enum.reduce(%{}, fn
      ({:"$gen_consumer", {pid, _}, _}, acc) -> Map.update(acc, pid, 1, fn current -> current + 1 end)
      (_, acc) -> acc
    end)
    |> IO.inspect()

    :timer.sleep(sleep_ms)
  end)
  |> Stream.filter(fn _ -> false end)
  |> Stream.run()
end

# Slow Stream
test(500, max_demand: 1, stages: 2) # => %{#PID<0.1348.0> => 987, #PID<0.1349.0> => 996}

# Fast Stream
test(0, max_demand: 1, stages: 2) # => %{}

I expected the max_demand of 1 to cause each stage to pull one item from the input stream once it has sent its current item to the receiving process (the process executing the stream).

Instead, it looks to me like each stage pulls 1000 items from the input stream each time a demand is made and sends the results as messages to the stream’s process.

What’s the proper way to limit the number of items each stage processes at a time?

1 Like