Controlled Demand for GenStage's ProducerConsumer

Hi everyone!
I’m building quite a sophisticated pipeline.
It starts with a list of links to gzipped jsonl files,
provides them to ProducerConsumers (by number of CPU cores).
Files are huge, so ProducerConsumer reads them as a stream and generates a stream of maps.
There are several consumers served by BroadcastDispatcher (I need every record to be tried with every consumer, many will be rejected though).

The problem is (and here most probably comes my lack of understanding how GenStage works), I never see handle_demand callback called on ProducerConsumer. Is it an expected behavior? Is there a way to make it work?

Processing the full file in handle_events callback is not an option, since it is 1GB .jsonl.gz.

I would greatly appreciate any advice. I also would be happy to pay for a short 1-to-1 mentorship from an expert in GenStage data processing, if you’re interested.

Here is an oversimplified version, showing the problem:

defmodule Simple.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = [
      Simple.Producer,
      Simple.ProducerConsumer,
      Simple.Consumer
    ]

    opts = [strategy: :one_for_one, name: Simple.Supervisor]
    Supervisor.start_link(children, opts)
  end
end


defmodule Simple.Producer do
  use GenStage
  require Logger

  def start_link(_number, _opts \\ []) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer, []}
  end

  def perform do
    source = [0..999, 1000..1999, 2000..2999] |> Enum.map(&Enum.to_list/1)
    GenServer.cast(__MODULE__, {:source, source})
  end

  def handle_cast({:source, source}, state) do
    {:noreply, source, state}
  end

  @doc """
  Logic doesn't matter here, since all events are sent directly into the queue
  """
  def handle_demand(demand, state) when demand > 0 do
    Logger.info("#{__MODULE__} demand #{demand}")

    {:noreply, [], state}
  end
end


defmodule Simple.ProducerConsumer do
  use GenStage
  require Logger

  defmodule State do
    defstruct queue: [], pending_demand: 0
  end

  def start_link(_) do
    GenStage.start_link(__MODULE__, %State{}, name: __MODULE__)
  end

  def init(initial_state) do
    subscription = [{Simple.Producer, max_demand: 1}]

    {:producer_consumer, initial_state,
     subscribe_to: subscription, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_demand(demand, %{queue: queue, pending_demand: pending_demand} = state)
      when demand > 0 do
    Logger.info("Simple.ProducerConsumer demand: #{demand}")
    Logger.info(inspect(state))

    {events, pending_demand, queue} = try_dispatch(queue, demand + pending_demand)

    {:noreply, events, %{state | pending_demand: pending_demand, queue: queue}}
  end

  def handle_events([events], _from, %{pending_demand: pending_demand, queue: queue} = state) do
    {events, pending_demand, queue} = try_dispatch(events ++ queue, pending_demand)
    {:noreply, events, %{state | pending_demand: pending_demand, queue: queue}}
  end

  defp try_dispatch(events, demand) do
    available = length(events)

    cond do
      available == demand ->
        {events, 0, []}

      available < demand ->
        {events, demand - available, []}

      available > demand ->
        {events, queue} = Enum.split(events, demand)
        {events, 0, queue}
    end
  end
end

defmodule Simple.Consumer do
  use GenStage

  def start_link(_opts \\ []) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    opts = [
      subscribe_to: [
        {Simple.ProducerConsumer,
         [min_demand: 5, max_demand: 10, selector: fn event -> rem(event, 10) == 3 end]}
      ]
    ]

    {:consumer, :the_state_does_not_matter, opts}
  end

  def handle_events(events, _from, state) do
    IO.inspect(events,
      charlists: :as_lists,
      label: :consumer
    )

    {:noreply, [], state}
  end
end