Hi everyone!
I’m building quite a sophisticated pipeline.
It starts with a list of links to gzipped jsonl files,
provides them to ProducerConsumers (by number of CPU cores).
Files are huge, so ProducerConsumer reads them as a stream and generates a stream of maps.
There are several consumers served by BroadcastDispatcher (I need every record to be tried with every consumer, many will be rejected though).
The problem is (and here most probably comes my lack of understanding how GenStage works), I never see handle_demand
callback called on ProducerConsumer. Is it an expected behavior? Is there a way to make it work?
Processing the full file in handle_events
callback is not an option, since it is 1GB .jsonl.gz
.
I would greatly appreciate any advice. I also would be happy to pay for a short 1-to-1 mentorship from an expert in GenStage data processing, if you’re interested.
Here is an oversimplified version, showing the problem:
defmodule Simple.Application do
@moduledoc false
use Application
def start(_type, _args) do
children = [
Simple.Producer,
Simple.ProducerConsumer,
Simple.Consumer
]
opts = [strategy: :one_for_one, name: Simple.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule Simple.Producer do
use GenStage
require Logger
def start_link(_number, _opts \\ []) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:producer, []}
end
def perform do
source = [0..999, 1000..1999, 2000..2999] |> Enum.map(&Enum.to_list/1)
GenServer.cast(__MODULE__, {:source, source})
end
def handle_cast({:source, source}, state) do
{:noreply, source, state}
end
@doc """
Logic doesn't matter here, since all events are sent directly into the queue
"""
def handle_demand(demand, state) when demand > 0 do
Logger.info("#{__MODULE__} demand #{demand}")
{:noreply, [], state}
end
end
defmodule Simple.ProducerConsumer do
use GenStage
require Logger
defmodule State do
defstruct queue: [], pending_demand: 0
end
def start_link(_) do
GenStage.start_link(__MODULE__, %State{}, name: __MODULE__)
end
def init(initial_state) do
subscription = [{Simple.Producer, max_demand: 1}]
{:producer_consumer, initial_state,
subscribe_to: subscription, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(demand, %{queue: queue, pending_demand: pending_demand} = state)
when demand > 0 do
Logger.info("Simple.ProducerConsumer demand: #{demand}")
Logger.info(inspect(state))
{events, pending_demand, queue} = try_dispatch(queue, demand + pending_demand)
{:noreply, events, %{state | pending_demand: pending_demand, queue: queue}}
end
def handle_events([events], _from, %{pending_demand: pending_demand, queue: queue} = state) do
{events, pending_demand, queue} = try_dispatch(events ++ queue, pending_demand)
{:noreply, events, %{state | pending_demand: pending_demand, queue: queue}}
end
defp try_dispatch(events, demand) do
available = length(events)
cond do
available == demand ->
{events, 0, []}
available < demand ->
{events, demand - available, []}
available > demand ->
{events, queue} = Enum.split(events, demand)
{events, 0, queue}
end
end
end
defmodule Simple.Consumer do
use GenStage
def start_link(_opts \\ []) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
opts = [
subscribe_to: [
{Simple.ProducerConsumer,
[min_demand: 5, max_demand: 10, selector: fn event -> rem(event, 10) == 3 end]}
]
]
{:consumer, :the_state_does_not_matter, opts}
end
def handle_events(events, _from, state) do
IO.inspect(events,
charlists: :as_lists,
label: :consumer
)
{:noreply, [], state}
end
end