Flow shutdown once GenStage complete

I’m writing a Flow that receives processes each item from a database cursor (Mongo in my case). Once that cursor is exhausted I’d like the flow to shut down automatically.

Here’s the code I have so far:

defmodule CursorProducer do
  use GenStage

  def start_link(opts) do
    {topology, opts} = opts |> Keyword.pop!(:topology)
    {collection, opts} = opts |> Keyword.pop!(:collection)
    {filter, opts} = opts |> Keyword.pop(:filter, %{})

    cursor = Mongo.find(topology, collection, filter)

    GenStage.start_link(__MODULE__, cursor, opts)
  end

  def init(cursor), do: {:producer, cursor}

  def handle_info(:stop, cursor) do
    IO.inspect("genstage stop")
    {:stop, :normal, cursor}
  end

  def handle_demand(demand, cursor) when demand > 0 do
    batch = cursor |> Enum.take(demand)

    if Enum.count(batch) < demand do
      # There may well be a better way of determining when the cursor is exhausted but this is what I have for now
      IO.inspect("shutdown in theory")
      GenStage.async_info(self(), :stop)
    end

    {:noreply, batch, cursor}
  end
end

defmodule Processor do
  use Flow, restart: :transient

  def start_link(opts) do
    Flow.from_specs([{CursorProducer, opts}])
    |> Flow.map(fn doc -> IO.inspect(doc) end)
    |> Flow.start_link(opts)
  end
end

What I’m expecting is:

  1. GenStage.async_info/2 gets called with :stop (this appears to be working)
  2. CursorProducer.handle_info gets called with :stop (this appears to never happen, I’m not sure why)
  3. The producer stage shuts itself down with reason :normal
  4. This propagates into Flow which shuts itself down too

As I suggest above it seems like GenStage.async_info is called as expected, however a message is never received and handle_info/2 is never called. Does anyone know why? I must be missing something. Any help appreciated.