I’m writing a Flow that receives processes each item from a database cursor (Mongo in my case). Once that cursor is exhausted I’d like the flow to shut down automatically.
Here’s the code I have so far:
defmodule CursorProducer do
use GenStage
def start_link(opts) do
{topology, opts} = opts |> Keyword.pop!(:topology)
{collection, opts} = opts |> Keyword.pop!(:collection)
{filter, opts} = opts |> Keyword.pop(:filter, %{})
cursor = Mongo.find(topology, collection, filter)
GenStage.start_link(__MODULE__, cursor, opts)
end
def init(cursor), do: {:producer, cursor}
def handle_info(:stop, cursor) do
IO.inspect("genstage stop")
{:stop, :normal, cursor}
end
def handle_demand(demand, cursor) when demand > 0 do
batch = cursor |> Enum.take(demand)
if Enum.count(batch) < demand do
# There may well be a better way of determining when the cursor is exhausted but this is what I have for now
IO.inspect("shutdown in theory")
GenStage.async_info(self(), :stop)
end
{:noreply, batch, cursor}
end
end
defmodule Processor do
use Flow, restart: :transient
def start_link(opts) do
Flow.from_specs([{CursorProducer, opts}])
|> Flow.map(fn doc -> IO.inspect(doc) end)
|> Flow.start_link(opts)
end
end
What I’m expecting is:
-
GenStage.async_info/2
gets called with:stop
(this appears to be working) -
CursorProducer.handle_info
gets called with:stop
(this appears to never happen, I’m not sure why) - The producer stage shuts itself down with reason
:normal
- This propagates into Flow which shuts itself down too
As I suggest above it seems like GenStage.async_info
is called as expected, however a message is never received and handle_info/2
is never called. Does anyone know why? I must be missing something. Any help appreciated.