GenStage with Multiple Producers with ordering

I’m trying to build a pipeline with GenStage and have a situation where my producers are a bottleneck. I’d like to run multiple producers and have found several examples of how to do this, but I have a twist. The producers produce independent results, but the results must be processed in order.

For example, a producer that counts from some starting number and returns some number of counts. If my start is 0 and I want 1500 counts, I might spin up three of these producers, and give them starting values of 0, 500, and 1000. I have no guarantee of what order they will answer, but I’d like to process the numbers in order.

I started out with an intermediate GenStage between the consumer and producers. The goal was to have it kick off the producers and order their results before sending them to the consumer. My only problem is getting the results back to the consumer.

def handle_demand(demand, _state = {counter, workers, results} ) when demand > 0 do
    split = demand/2
    {new_counter, _, new_results} = Map.keys(workers)
    |> Enum.reduce({counter,split,results},&start_prod/2)
    
    # Not sure what to return in events?
    {:noreply, events, {new_counter, workers, new_results} }
  end

  defp start_prod(producer, {counter,split,items}=acc) do
    GenStage.ask(producer,{counter,split})
    {counter+split,split,Map.put(items,counter,[])}
  end

I would use handle_events/3 to get the replies and assemble them in order, but handle_demand/2 needs to return something to the consumer.

Thoughts?

2 Likes

Looking at what you described you are likely looking at a :producer_consumer that buffers new events coming in with handle_events and dispatches the events that are ready in the reply from that same handle_events call. Here is the base scenario I came up with:

# File a.ex
defmodule A do
  use GenStage

  defp gen_events({name, {left, current, step}}, demand) do
    count = min left, demand

    events =
      0..(count-1)
      |> Enum.to_list()
      |> Enum.map(&({name, &1 * step + current}))

    {events, {name, {left - count, count * step + current, step}}}
  end

  def start_link(name, config) do
    GenStage.start_link __MODULE__, {name, config}, name: name
  end

  def init(state) do
    {:producer, state}
  end

  def handle_demand(_demand, {_,{left, _, _}} = state) when left < 1 do
    {:stop, :normal, state}
  end
  def handle_demand(demand, state) when demand > 0 do
    :timer.sleep 50
    {events, new_state} = gen_events state, demand
    {:noreply, events, new_state}
  end
end

# File b.ex
defmodule B do
  use GenStage

  defp event_order({_,lhs},{_,rhs}),
    do: lhs <= rhs

  defp dequeue_events(quantity) do
    fn ({name, queue}, {list, queue_map}) ->
      {front, rest} = :queue.split quantity, queue
      {(list ++ :queue.to_list front), (Map.put queue_map , name, rest)}
    end
  end

  defp extract_events(queue_map, quantity) when quantity < 1 do
    {[], queue_map}
  end
  defp extract_events(queue_map, quantity) do
    {list, queue_map} =
      List.foldl (Map.to_list queue_map),
        {[], queue_map},
        dequeue_events(quantity)

    {(Enum.sort list, &event_order/2), queue_map}
  end

  defp queue_event({name, _} = event, queue_map),
    do: Map.update! queue_map, name, &(:queue.in event, &1)

  defp dispatch_events(queue_map, new_events) do
    # add new events to the correct queues
    queue_map = List.foldl new_events, queue_map, &queue_event/2

    # how many events **per each and every queue** are ready to go?
    ready_count = Enum.reduce (Map.values queue_map), &(min (:queue.len &1), &2)

    # Extract the same number of events from each queue; merge and sort them
    {ready_events, queue_map} = extract_events queue_map, ready_count
    {:noreply, ready_events, queue_map}
  end

  defp add_queue(name, queue_map),
    do: Map.put queue_map, name, :queue.new()

  def start_link(producer_names) do
    GenStage.start_link __MODULE__, producer_names
  end

  def init(producer_names) do
    state = List.foldl producer_names, %{}, &add_queue/2
    {:producer_consumer, state}
  end

  def handle_events(events, _from, state) do
    dispatch_events state, events
  end
end

# File c.ex
defmodule C do
  use GenStage

  def start_link() do
    GenStage.start_link __MODULE__, :ok
  end

  def init(:ok) do
    {:consumer, :no_state}
  end

  def handle_events(events, _from, _state) do
    :timer.sleep 200

    IO.inspect events

    {:noreply, [], :no_state}
  end
end

# File: trial.ex
defmodule Trial do
  def run do
    events_per_producer = 500
    max_demand = 50
    p1 = :p1
    p2 = :p2
    p3 = :p3
    {:ok, a1} = A.start_link p1, {events_per_producer,0,3}
    {:ok, a2} = A.start_link p2, {events_per_producer,1,3}
    {:ok, a3} = A.start_link p3, {events_per_producer,2,3}
    {:ok, b} = B.start_link [p1,p2,p3]
    {:ok, c} = C.start_link

    GenStage.sync_subscribe b, [to: a1, max_demand: max_demand, cancel: :temporary]
    GenStage.sync_subscribe b, [to: a2, max_demand: max_demand, cancel: :temporary]
    GenStage.sync_subscribe b, [to: a3, max_demand: max_demand, cancel: :temporary]
    GenStage.sync_subscribe c, to: b

    # ... and wait in iex for things to finish
  end
end

You still need to decide how you are going to shutdown the pipeline gracefully - in this example I had to specify cancel: :temporary when subscribing to prevent the exhausted/terminating producers from tearing down the rest of the chain before all of the events were processed - when everything is said and done B and C are just sitting there idly (most likely a job for a supervisor).

The other thing is that demand isn’t “distributed” which is why I specified it at the subscription level (otherwise each of them would see the default of 1000). All producers first see the maximum demand of 50 and are then throttled back to 25 for the remainder of the run.

Addendum: Also note that a producer doesn’t have to fulfill the entire demand in handle_demand's return value - but it is obligated to store the pending demand and deliver it as it becomes available. The following A and Trial module demonstrate this approach. In order to dispatch more events this producer casts a :more message to itself as long as there is pending demand - this could be done one event at a time, i.e. casting a message for each event to be generated (as long as there is pending demand). This modification allowed me to remove the :max_demand subscribe option because “demand” no longer directly drives the number of events that are returned by handle_demand.

# File a.ex
defmodule A do
  use GenStage

  defp gen_events({left, _, _, _config,  _simulate} = state) when left < 1 do
    {:stop, :normal, state}
  end
  defp gen_events({left, current, pending, {step, name} = config, {emit, wait} = simulate}) do
    :timer.sleep wait  # pretend we're working hard

    count =
      left
      |> min(pending)
      |> min(emit)

    events =
      0..(count-1)
      |> Enum.to_list()
      |> Enum.map(&({name, &1 * step + current}))

    left = left - count
    current = count * step + current
    pending =
      case pending - count do
        remain when remain < 1 ->
          0
        remain ->
          GenStage.cast self(), :more  # dispatch more later
          remain
      end

    {:noreply, events, {left, current, pending, config, simulate}}
  end

  # add the new demand to the pending demand
  defp gen_events({left, current, pending, config, simulate}, demand),
    do: gen_events({left, current, pending + demand, config, simulate})

  ## callbacks
  def start_link(name, {{left, start, step}, simulate}) do
    GenStage.start_link __MODULE__, {left, start, {step, name}, simulate}, name: name
  end

  def init({left, start ,config, simulate}) do
    {:producer, {left, start, 0, config, simulate}} # start with no pending demand
  end

  # dole out some more events
  def handle_cast(:more, state),
   do: gen_events state

  def handle_demand(demand, state) when demand > 0,
    do: gen_events state, demand

end

# File: trial.ex
defmodule Trial do
  def run do
    events_per_producer = 500
    p1 = :p1
    p2 = :p2
    p3 = :p3
    simulate = {9,10} # 9 events at a time after 10 ms
    {:ok, a1} = A.start_link p1, {{events_per_producer,0,3},simulate}
    {:ok, a2} = A.start_link p2, {{events_per_producer,1,3},simulate}
    {:ok, a3} = A.start_link p3, {{events_per_producer,2,3},simulate}
    {:ok, b} = B.start_link [p1,p2,p3]
    {:ok, c} = C.start_link

    GenStage.sync_subscribe b, [to: a1, cancel: :temporary]
    GenStage.sync_subscribe b, [to: a2, cancel: :temporary]
    GenStage.sync_subscribe b, [to: a3, cancel: :temporary]
    GenStage.sync_subscribe c, to: b

    # ... and wait in iex for things to finish
  end
end

Finally here is a version were the producer casts itself a :next message to generate a single event. Once generated in handle_cast the event is dispatched if there is pending demand, otherwise it is buffered.
handle_demand increases pending demand and dispatches any buffered events within the limit of the pending demand.

# File a.ex
defmodule A do
  use GenStage

  defp get_dispatch(events, pending) when pending < 1 or (length events) < 1 do
    {[], events, pending} # no pending demand or buffered events to dispatch
  end
  defp get_dispatch(events, pending) when pending >= (length events) do
    {(Enum.reverse events), [], (pending - (length events))} # dispatch all buffered events
  end
  defp get_dispatch(events, pending) do
    quantity = length events
    {events, dispatch} = Enum.split events, (quantity - pending)
    {(Enum.reverse dispatch), events, (pending - quantity)} # dispatch pending events
  end

  defp dispatch_events({left, _current, [], _pending, _config, _simulate} = state) when left < 1 do
    {:stop, :normal, state} # nothing left to generate and no events buffered
  end                       # therefore terminate
  defp dispatch_events({left, current, events, pending, config, simulate}) do
    {dispatch, events, pending} = get_dispatch(events, pending)
    {:noreply, dispatch, {left, current, events, pending, config, simulate}}
  end

  defp dispatch_events({left, current, events, pending, config, simulate}, demand),
    do: dispatch_events {left, current, events, pending + demand, config, simulate}

  defp gen_event({left, _current, _events, _pending, _config,  _wait} = state) when left < 1 do
    state # no more events left to generate
  end
  defp gen_event({left, current, events, pending, {step, name} = config, wait}) do
    :timer.sleep wait # pretend we're working hard

    events = [{name, current} | events] # add new event
    current = current + step
    left =
      case left - 1 do
        remain when remain < 1 ->
          0
        remain ->
          GenStage.cast self(), :next # generate next event
          remain
      end

    {left, current, events, pending, config, wait}
  end

  ## callbacks
  def start_link(name, {{left, start, step}, simulate}) do
    GenStage.start_link __MODULE__, {left, start, {step, name}, simulate}, name: name
  end

  def init({left, start ,config, simulate}) do
    GenStage.cast self(), :next  # generate first event
    {:producer, {left, start, [], 0, config, simulate}} # start with no events and no pending demand
  end

  # generate next event and dispatch pending demand
  def handle_cast(:next, state) do
    state
    |> gen_event()
    |> dispatch_events()
  end

  def handle_demand(demand, state) when demand > 0,
    do: dispatch_events state, demand

end

# File: trial.ex
defmodule Trial do
  def run do
    events_per_producer = 500
    step = 3
    p1 = :p1
    p2 = :p2
    p3 = :p3
    wait = 10 # 1 event every 10 ms
    {:ok, a1} = A.start_link p1, {{events_per_producer,0,step},wait}
    {:ok, a2} = A.start_link p2, {{events_per_producer,1,step},wait}
    {:ok, a3} = A.start_link p3, {{events_per_producer,2,step},wait}
    {:ok, b} = B.start_link [p1,p2,p3]
    {:ok, c} = C.start_link

    GenStage.sync_subscribe b, [to: a1, cancel: :temporary]
    GenStage.sync_subscribe b, [to: a2, cancel: :temporary]
    GenStage.sync_subscribe b, [to: a3, cancel: :temporary]
    GenStage.sync_subscribe c, to: b

    # ... and wait in iex for things to finish
  end
end

And I guess continually spinning of some tasks might be another option:

# File a.ex
defmodule A do
  use GenStage

  ## Dispatch available buffered events if there is demand

  defp get_dispatch(events, pending) when pending < 1 or (length events) < 1 do
    {[], events, pending} # no pending demand or buffered events to dispatch
  end
  defp get_dispatch(events, pending) when pending >= (length events) do
    {(Enum.reverse events), [], (pending - (length events))} # dispatch all buffered events
  end
  defp get_dispatch(events, pending) do
    quantity = length events
    {events, dispatch} = Enum.split events, (quantity - pending)
    {(Enum.reverse dispatch), events, (pending - quantity)} # dispatch pending events
  end

  defp stage_status(:done, []), do: GenStage.cast(self(), :stop); :ok
  defp stage_status(_status, _events), do: :ok

  defp dispatch_events(%{status: :done, events: []} = state) do
    {:stop, :normal, state} # all events generated and dispatched already
  end
  defp dispatch_events(%{status: status, events: events, pending: pending} = state) do
    {dispatch, events, pending} = get_dispatch(events, pending)
    stage_status(status, events)
    {:noreply, dispatch, %{state | events: events, pending: pending}}
  end

  defp dispatch_events(%{pending: pending} = state, demand),
    do: dispatch_events %{state | pending: pending + demand}

  ## Manage task queue and buffer events

  defp dequeue_results(queue, results, events) do
    case :queue.peek queue do
      {:value, task} ->
        cond do
          Map.has_key? results, task.ref -> # transfer event from results
            events = [results[task.ref] | events]
            results = Map.delete results, task.ref
            queue = :queue.drop queue
            dequeue_results queue, results, events
          true -> # no result with matching ref
            {queue, results, events}
        end
      :empty -> # no pending tasks in queue
        {queue, results, events}
    end
  end

  defp new_task(value, wait) do
    Task.async fn ->
      :timer.sleep (:rand.uniform wait) # pretend to work
      value                             # then just return the value given
    end
  end

  defp add_tasks(queue, start, stop, _wait) when start >= stop,
    do: queue
  defp add_tasks(queue, start, stop, wait),
    do: add_tasks (:queue.in (new_task start, wait), queue), (start + 1), stop, wait

  defp queue_tasks(queue, remain, current, running, max_tasks, simulate) do
    new_quantity = min(remain, max_tasks - running)
    cond do
      new_quantity > 0 ->
        remain = remain - new_quantity
        start = current
        current = current + new_quantity
        running = running + new_quantity
        {(add_tasks queue, start, current, simulate), remain, current, running}
      true ->
        {queue, remain, current, running}
    end
  end

  defp events_status(remain, queue) do
    queue_len = :queue.len queue
    cond do
      remain < 1 and queue_len < 1 ->
        :done # done **generating** events (though some might still be buffered)
      true ->
        :running
    end
  end

  defp next_event(%{status: :done} = state) do
    state # no more events left to generate
  end
  defp next_event(
    %{remain: remain, current: current, running: running,
      queue: queue, results: results, events: events, max_tasks: max_tasks,
      simulate: simulate} = state
  ) do
    # convert results to events, ready for dispatch
    {queue, results, events} = dequeue_results queue, results, events

    # fill up task queue
    {queue, remain, current, running} =
      queue_tasks queue, remain, current, running, max_tasks, simulate

    %{ state | status: (events_status remain, queue), remain: remain, current: current,
       running: running, queue: queue, results: results, events: events}
  end

  ## callbacks
  def start_link(config) do
    GenStage.start_link __MODULE__, config
  end

  def init({left, max_tasks, simulate}) do
    GenStage.cast self(), :load  # spin up tasks later

    # start with no events and no pending demand
    {:producer,
     %{status: :init,       # prevent dispatch events from stopping process
       remain: left,
       current: 0,
       running: 0,          # running tasks
       queue: :queue.new(), # queued tasks, some possibly finshed
       results: %{},        # results of finished tasks in the queue
       events: [],          # events ready for dispatch
       pending: 0,          # pending demand
       max_tasks: max_tasks,
       simulate: simulate
     }
    }
  end

  def terminate(_reason, _state) do
    :ok
  end

  def handle_cast(:load, state) do
    state
    |> next_event()      # spin up the tasks
    |> dispatch_events() # mostly for the reply
  end
  def handle_cast(:stop, state) do
    {:stop, :normal, state}
  end

  # handle Task completion
  def handle_info(
    {ref, result},
    %{results: results, running: running} = state
  ) when is_reference ref do
    Process.demonitor ref, [:flush]

    %{state | results: (Map.put results, ref, result), running: (running - 1)}
    |> next_event()
    |> dispatch_events()
  end

  def handle_demand(demand, state) when demand > 0,
    do: dispatch_events state, demand

end

# File: trial.ex
defmodule Trial do
  def run do
    quantity = 1500
    parallel = 50
    wait = 100 # up to 100ms fake work for each event
    {:ok, a} = A.start_link {quantity, parallel, wait}
    {:ok, c} = C.start_link

    GenStage.sync_subscribe c, [to: a, cancel: :temporary]

    # ... and wait in iex for things to finish
  end
end
9 Likes

It’s criminal the guy didn’t respond to you, that’s awesome, and helpful… thanks

6 Likes