Looking at what you described you are likely looking at a :producer_consumer
that buffers new events coming in with handle_events
and dispatches the events that are ready in the reply from that same handle_events
call. Here is the base scenario I came up with:
# File a.ex
defmodule A do
use GenStage
defp gen_events({name, {left, current, step}}, demand) do
count = min left, demand
events =
0..(count-1)
|> Enum.to_list()
|> Enum.map(&({name, &1 * step + current}))
{events, {name, {left - count, count * step + current, step}}}
end
def start_link(name, config) do
GenStage.start_link __MODULE__, {name, config}, name: name
end
def init(state) do
{:producer, state}
end
def handle_demand(_demand, {_,{left, _, _}} = state) when left < 1 do
{:stop, :normal, state}
end
def handle_demand(demand, state) when demand > 0 do
:timer.sleep 50
{events, new_state} = gen_events state, demand
{:noreply, events, new_state}
end
end
# File b.ex
defmodule B do
use GenStage
defp event_order({_,lhs},{_,rhs}),
do: lhs <= rhs
defp dequeue_events(quantity) do
fn ({name, queue}, {list, queue_map}) ->
{front, rest} = :queue.split quantity, queue
{(list ++ :queue.to_list front), (Map.put queue_map , name, rest)}
end
end
defp extract_events(queue_map, quantity) when quantity < 1 do
{[], queue_map}
end
defp extract_events(queue_map, quantity) do
{list, queue_map} =
List.foldl (Map.to_list queue_map),
{[], queue_map},
dequeue_events(quantity)
{(Enum.sort list, &event_order/2), queue_map}
end
defp queue_event({name, _} = event, queue_map),
do: Map.update! queue_map, name, &(:queue.in event, &1)
defp dispatch_events(queue_map, new_events) do
# add new events to the correct queues
queue_map = List.foldl new_events, queue_map, &queue_event/2
# how many events **per each and every queue** are ready to go?
ready_count = Enum.reduce (Map.values queue_map), &(min (:queue.len &1), &2)
# Extract the same number of events from each queue; merge and sort them
{ready_events, queue_map} = extract_events queue_map, ready_count
{:noreply, ready_events, queue_map}
end
defp add_queue(name, queue_map),
do: Map.put queue_map, name, :queue.new()
def start_link(producer_names) do
GenStage.start_link __MODULE__, producer_names
end
def init(producer_names) do
state = List.foldl producer_names, %{}, &add_queue/2
{:producer_consumer, state}
end
def handle_events(events, _from, state) do
dispatch_events state, events
end
end
# File c.ex
defmodule C do
use GenStage
def start_link() do
GenStage.start_link __MODULE__, :ok
end
def init(:ok) do
{:consumer, :no_state}
end
def handle_events(events, _from, _state) do
:timer.sleep 200
IO.inspect events
{:noreply, [], :no_state}
end
end
# File: trial.ex
defmodule Trial do
def run do
events_per_producer = 500
max_demand = 50
p1 = :p1
p2 = :p2
p3 = :p3
{:ok, a1} = A.start_link p1, {events_per_producer,0,3}
{:ok, a2} = A.start_link p2, {events_per_producer,1,3}
{:ok, a3} = A.start_link p3, {events_per_producer,2,3}
{:ok, b} = B.start_link [p1,p2,p3]
{:ok, c} = C.start_link
GenStage.sync_subscribe b, [to: a1, max_demand: max_demand, cancel: :temporary]
GenStage.sync_subscribe b, [to: a2, max_demand: max_demand, cancel: :temporary]
GenStage.sync_subscribe b, [to: a3, max_demand: max_demand, cancel: :temporary]
GenStage.sync_subscribe c, to: b
# ... and wait in iex for things to finish
end
end
You still need to decide how you are going to shutdown the pipeline gracefully - in this example I had to specify cancel: :temporary
when subscribing to prevent the exhausted/terminating producers from tearing down the rest of the chain before all of the events were processed - when everything is said and done B
and C
are just sitting there idly (most likely a job for a supervisor).
The other thing is that demand isn’t “distributed” which is why I specified it at the subscription level (otherwise each of them would see the default of 1000). All producers first see the maximum demand of 50 and are then throttled back to 25 for the remainder of the run.
Addendum: Also note that a producer doesn’t have to fulfill the entire demand in handle_demand
's return value - but it is obligated to store the pending demand and deliver it as it becomes available. The following A
and Trial
module demonstrate this approach. In order to dispatch more events this producer casts a :more
message to itself as long as there is pending demand - this could be done one event at a time, i.e. casting a message for each event to be generated (as long as there is pending demand). This modification allowed me to remove the :max_demand
subscribe option because “demand” no longer directly drives the number of events that are returned by handle_demand
.
# File a.ex
defmodule A do
use GenStage
defp gen_events({left, _, _, _config, _simulate} = state) when left < 1 do
{:stop, :normal, state}
end
defp gen_events({left, current, pending, {step, name} = config, {emit, wait} = simulate}) do
:timer.sleep wait # pretend we're working hard
count =
left
|> min(pending)
|> min(emit)
events =
0..(count-1)
|> Enum.to_list()
|> Enum.map(&({name, &1 * step + current}))
left = left - count
current = count * step + current
pending =
case pending - count do
remain when remain < 1 ->
0
remain ->
GenStage.cast self(), :more # dispatch more later
remain
end
{:noreply, events, {left, current, pending, config, simulate}}
end
# add the new demand to the pending demand
defp gen_events({left, current, pending, config, simulate}, demand),
do: gen_events({left, current, pending + demand, config, simulate})
## callbacks
def start_link(name, {{left, start, step}, simulate}) do
GenStage.start_link __MODULE__, {left, start, {step, name}, simulate}, name: name
end
def init({left, start ,config, simulate}) do
{:producer, {left, start, 0, config, simulate}} # start with no pending demand
end
# dole out some more events
def handle_cast(:more, state),
do: gen_events state
def handle_demand(demand, state) when demand > 0,
do: gen_events state, demand
end
# File: trial.ex
defmodule Trial do
def run do
events_per_producer = 500
p1 = :p1
p2 = :p2
p3 = :p3
simulate = {9,10} # 9 events at a time after 10 ms
{:ok, a1} = A.start_link p1, {{events_per_producer,0,3},simulate}
{:ok, a2} = A.start_link p2, {{events_per_producer,1,3},simulate}
{:ok, a3} = A.start_link p3, {{events_per_producer,2,3},simulate}
{:ok, b} = B.start_link [p1,p2,p3]
{:ok, c} = C.start_link
GenStage.sync_subscribe b, [to: a1, cancel: :temporary]
GenStage.sync_subscribe b, [to: a2, cancel: :temporary]
GenStage.sync_subscribe b, [to: a3, cancel: :temporary]
GenStage.sync_subscribe c, to: b
# ... and wait in iex for things to finish
end
end
Finally here is a version were the producer casts itself a :next
message to generate a single event. Once generated in handle_cast
the event is dispatched if there is pending demand, otherwise it is buffered.
handle_demand
increases pending demand and dispatches any buffered events within the limit of the pending demand.
# File a.ex
defmodule A do
use GenStage
defp get_dispatch(events, pending) when pending < 1 or (length events) < 1 do
{[], events, pending} # no pending demand or buffered events to dispatch
end
defp get_dispatch(events, pending) when pending >= (length events) do
{(Enum.reverse events), [], (pending - (length events))} # dispatch all buffered events
end
defp get_dispatch(events, pending) do
quantity = length events
{events, dispatch} = Enum.split events, (quantity - pending)
{(Enum.reverse dispatch), events, (pending - quantity)} # dispatch pending events
end
defp dispatch_events({left, _current, [], _pending, _config, _simulate} = state) when left < 1 do
{:stop, :normal, state} # nothing left to generate and no events buffered
end # therefore terminate
defp dispatch_events({left, current, events, pending, config, simulate}) do
{dispatch, events, pending} = get_dispatch(events, pending)
{:noreply, dispatch, {left, current, events, pending, config, simulate}}
end
defp dispatch_events({left, current, events, pending, config, simulate}, demand),
do: dispatch_events {left, current, events, pending + demand, config, simulate}
defp gen_event({left, _current, _events, _pending, _config, _wait} = state) when left < 1 do
state # no more events left to generate
end
defp gen_event({left, current, events, pending, {step, name} = config, wait}) do
:timer.sleep wait # pretend we're working hard
events = [{name, current} | events] # add new event
current = current + step
left =
case left - 1 do
remain when remain < 1 ->
0
remain ->
GenStage.cast self(), :next # generate next event
remain
end
{left, current, events, pending, config, wait}
end
## callbacks
def start_link(name, {{left, start, step}, simulate}) do
GenStage.start_link __MODULE__, {left, start, {step, name}, simulate}, name: name
end
def init({left, start ,config, simulate}) do
GenStage.cast self(), :next # generate first event
{:producer, {left, start, [], 0, config, simulate}} # start with no events and no pending demand
end
# generate next event and dispatch pending demand
def handle_cast(:next, state) do
state
|> gen_event()
|> dispatch_events()
end
def handle_demand(demand, state) when demand > 0,
do: dispatch_events state, demand
end
# File: trial.ex
defmodule Trial do
def run do
events_per_producer = 500
step = 3
p1 = :p1
p2 = :p2
p3 = :p3
wait = 10 # 1 event every 10 ms
{:ok, a1} = A.start_link p1, {{events_per_producer,0,step},wait}
{:ok, a2} = A.start_link p2, {{events_per_producer,1,step},wait}
{:ok, a3} = A.start_link p3, {{events_per_producer,2,step},wait}
{:ok, b} = B.start_link [p1,p2,p3]
{:ok, c} = C.start_link
GenStage.sync_subscribe b, [to: a1, cancel: :temporary]
GenStage.sync_subscribe b, [to: a2, cancel: :temporary]
GenStage.sync_subscribe b, [to: a3, cancel: :temporary]
GenStage.sync_subscribe c, to: b
# ... and wait in iex for things to finish
end
end
And I guess continually spinning of some tasks might be another option:
# File a.ex
defmodule A do
use GenStage
## Dispatch available buffered events if there is demand
defp get_dispatch(events, pending) when pending < 1 or (length events) < 1 do
{[], events, pending} # no pending demand or buffered events to dispatch
end
defp get_dispatch(events, pending) when pending >= (length events) do
{(Enum.reverse events), [], (pending - (length events))} # dispatch all buffered events
end
defp get_dispatch(events, pending) do
quantity = length events
{events, dispatch} = Enum.split events, (quantity - pending)
{(Enum.reverse dispatch), events, (pending - quantity)} # dispatch pending events
end
defp stage_status(:done, []), do: GenStage.cast(self(), :stop); :ok
defp stage_status(_status, _events), do: :ok
defp dispatch_events(%{status: :done, events: []} = state) do
{:stop, :normal, state} # all events generated and dispatched already
end
defp dispatch_events(%{status: status, events: events, pending: pending} = state) do
{dispatch, events, pending} = get_dispatch(events, pending)
stage_status(status, events)
{:noreply, dispatch, %{state | events: events, pending: pending}}
end
defp dispatch_events(%{pending: pending} = state, demand),
do: dispatch_events %{state | pending: pending + demand}
## Manage task queue and buffer events
defp dequeue_results(queue, results, events) do
case :queue.peek queue do
{:value, task} ->
cond do
Map.has_key? results, task.ref -> # transfer event from results
events = [results[task.ref] | events]
results = Map.delete results, task.ref
queue = :queue.drop queue
dequeue_results queue, results, events
true -> # no result with matching ref
{queue, results, events}
end
:empty -> # no pending tasks in queue
{queue, results, events}
end
end
defp new_task(value, wait) do
Task.async fn ->
:timer.sleep (:rand.uniform wait) # pretend to work
value # then just return the value given
end
end
defp add_tasks(queue, start, stop, _wait) when start >= stop,
do: queue
defp add_tasks(queue, start, stop, wait),
do: add_tasks (:queue.in (new_task start, wait), queue), (start + 1), stop, wait
defp queue_tasks(queue, remain, current, running, max_tasks, simulate) do
new_quantity = min(remain, max_tasks - running)
cond do
new_quantity > 0 ->
remain = remain - new_quantity
start = current
current = current + new_quantity
running = running + new_quantity
{(add_tasks queue, start, current, simulate), remain, current, running}
true ->
{queue, remain, current, running}
end
end
defp events_status(remain, queue) do
queue_len = :queue.len queue
cond do
remain < 1 and queue_len < 1 ->
:done # done **generating** events (though some might still be buffered)
true ->
:running
end
end
defp next_event(%{status: :done} = state) do
state # no more events left to generate
end
defp next_event(
%{remain: remain, current: current, running: running,
queue: queue, results: results, events: events, max_tasks: max_tasks,
simulate: simulate} = state
) do
# convert results to events, ready for dispatch
{queue, results, events} = dequeue_results queue, results, events
# fill up task queue
{queue, remain, current, running} =
queue_tasks queue, remain, current, running, max_tasks, simulate
%{ state | status: (events_status remain, queue), remain: remain, current: current,
running: running, queue: queue, results: results, events: events}
end
## callbacks
def start_link(config) do
GenStage.start_link __MODULE__, config
end
def init({left, max_tasks, simulate}) do
GenStage.cast self(), :load # spin up tasks later
# start with no events and no pending demand
{:producer,
%{status: :init, # prevent dispatch events from stopping process
remain: left,
current: 0,
running: 0, # running tasks
queue: :queue.new(), # queued tasks, some possibly finshed
results: %{}, # results of finished tasks in the queue
events: [], # events ready for dispatch
pending: 0, # pending demand
max_tasks: max_tasks,
simulate: simulate
}
}
end
def terminate(_reason, _state) do
:ok
end
def handle_cast(:load, state) do
state
|> next_event() # spin up the tasks
|> dispatch_events() # mostly for the reply
end
def handle_cast(:stop, state) do
{:stop, :normal, state}
end
# handle Task completion
def handle_info(
{ref, result},
%{results: results, running: running} = state
) when is_reference ref do
Process.demonitor ref, [:flush]
%{state | results: (Map.put results, ref, result), running: (running - 1)}
|> next_event()
|> dispatch_events()
end
def handle_demand(demand, state) when demand > 0,
do: dispatch_events state, demand
end
# File: trial.ex
defmodule Trial do
def run do
quantity = 1500
parallel = 50
wait = 100 # up to 100ms fake work for each event
{:ok, a} = A.start_link {quantity, parallel, wait}
{:ok, c} = C.start_link
GenStage.sync_subscribe c, [to: a, cancel: :temporary]
# ... and wait in iex for things to finish
end
end