Call to GenStage and awating response

I have maybe a simple question, but how can I make a call to a GenStage (with a producer and a consumer) where I can get the actual result of doing the computation? If I call to the producer, it doesn’t do the actual work it just buffers it, so it cannot return what I need.
If I call to the consumer where the actual work is happening, I’m not sure if it would be a bad idea to try to call the functions in handle_call that are being called in handle_events. I am also not sure if calling directly to the consumer interferes with the min/max limits.

I think a better option is to use the from somehow, but I don’t think I understand how this works. How can my code receive a response from the GenStage consumer when it is done processing my message so I can know the response?
Sorry if I am not using the correct words.

As you suspect you can save the from in the producer and when you’re done processing the event use GenStage.reply/2 to send the reply.

For example:

# .iex.exs
Mix.install([:gen_stage])

defmodule MyProducer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer, :ok, []}
  end

  def handle_call({:notify, event}, from, state) do
    {:noreply, [{event, from}], state}
  end

  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end
end

defmodule MyConsumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:consumer, :ok, subscribe_to: [MyProducer]}
  end

  def handle_events(events, _from, state) do
    Enum.each(events, fn {event, from} ->
      IO.inspect(event, label: "handled event")

      GenStage.reply(from, :event_handled)
    end)
    {:noreply, [], state}
  end
end

Notice how in the c:handle_call/3 callback in the producer, we dispatch the event alongside the from so we have access to it in the consumer. We also return a {:noreply, events, state} signature instead of {:reply, reply, events, state} because we don’t want to reply immediately after the c:handle_call/3 callback is done.

If we’re running this in IEx we’ll see

$ iex
iex> MyProducer.start_link()
iex> MyConsumer.start_link()
iex> GenStage.call(MyProducer, {:notify, :my_event})
handled event: :my_event # this part is from the IO.inspect/2
:event_handled
3 Likes

Wow, thank you this is very helpful! The trick for me was the :noreply in the call. I missed that – I kept thinking that it should instead use :reply

1 Like