If I understand correctly:
- I can use a GenStage as a
:producer
- I can add events to the GenStage by something as simple as a
handle_cast
that appends whatever we pass it to some list in the state - The key function I need implemented in the GenStage producer is the
handle_demand
callback which will return both the events to dispatch and the events left in the state - I can create a Flow that will draw from GenStage, and I can use a simple periodic window to partition the events and process them at regular intervals
If this is true, then I think this should work:
defmodule MyProducer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:producer, []}
end
def handle_demand(demand, state) do
{to_dispatch, remaining} = Enum.split(state, demand)
{:noreply, to_dispatch, remaining}
end
def handle_cast({:add_events, events}, state) do
{:noreply, [], state ++ events}
end
end
defmodule MyFlow do
def start_link do
window = Flow.Window.periodic(1, :minute)
x =
Flow.from_stage(MyProducer, window: window)
|> Flow.map(fn(x) -> x * 2 end)
|> Flow.emit(:events)
|> IO.inspect
Flow.start_link(x)
end
end
MyProducer.start_link
MyFlow.start_link
GenServer.cast(MyProducer, {:add_events, Enum.to_list(1..100000)})
I would expect this to consume from the producer at some rate, with one minute windows, and … Printing to the console a list every minute or so.
But no. Nothing. What am I doing wrong?