Confusion about how GenStage demand mechanism works

Hi,

I’m struggling since a few days with a dysfunctional piece of code using GenStage.
I think I don’t fully grasp the handle/demand mechanism of the latter, that’s why I’d like some insights about how I should write my code. Even though I know I could use Broadway to do what I want, I thought it would be better to a solid understanding of the underlying layer first.

When I want to produce events, my code use handle_cast/2 with {:noreply, events, state}. The events are dispatched, and put in an internal buffer in the producer as well. But sometimes, typically when I reach max_demand events, I also get handle_demand/2 being called, which returns {:noreply, [List.first(state.buffer)], state}. So the same message is sent twice, which crashes my pipeline down.

Here is the code:

defmodule Sandbox.Producer do
  use GenStage

  require Logger

  alias __MODULE__, as: State

  defstruct buffer: [],
            counter: 0

  def start_link(_) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    {:producer, %State{}}
  end

  def handle_cast({:publish, payload}, %State{} = state) do
    message = {payload, state.counter}
    {:noreply, [message], %State{buffer: List.insert_at(state.buffer, -1, message)}}
  end

  def handle_demand(_, %State{buffer: []} = state), do: {:noreply, [], state}

  def handle_demand(_, state) do
    {:noreply, [List.first(state.buffer)], state}
  end

  def handle_info({:ack, id}, state) do
    message = {_, expected_id} = List.first(state.buffer)

    # If the acked id we just got is the one we expected,
    # then we publish the next message from the buffer.
    # Otherwise we send the last message again (???)
    if id == expected_id do
      new_buffer = Enum.drop(state.buffer, 1)
      next_event = if Enum.empty?(new_buffer), do: [], else: [List.first(new_buffer)]
      {:noreply, next_event, %State{state | buffer: new_buffer, counter: state.counter + 1}}
    else
      {:noreply, [message], state}
    end
  end
end
defmodule Sandbox.Consumer do
  use GenStage

  require Logger

  alias __MODULE__, as: State

  defstruct last_id: nil

  def start_link(_) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    {:consumer, %State{}, subscribe_to: [{Sandbox.Producer, []}]}
  end

  def handle_events([{_, id}], _from, %State{last_id: nil} = state) do
    Logger.info("Processing message ##{id}")
    # Since last_id can be nil either at startup or after a crash,
    # we accept all values and proceed as if it was normal
    ack_message(id)
    {:noreply, [], %State{state | last_id: id}}
  end

  def handle_events([{_, id}], _from, state) do
    Logger.info("Processing message ##{id}")
    if id == state.last_id + 1 do
      # Ok, we have the events in strict order
      ack_message(id)
      {:noreply, [], %State{state | last_id: id}}
    else
      # We miss a message
      {:stop, :missing_message, state}
    end
  end

  defp ack_message(id) do
    send(Sandbox.Producer, {:ack, id})
  end
end

I’m sure it’s the wrong to do that but if I return {:noreply, [], state} from handle_cast/2 in the Producer, the demand loop stops, so nothing is being asked anymore.
I’m quite confused.

Can someone enlighten me on this ?

Edit: maybe I should show some IEx session to illustrate what’s going on

iex(1)> Sandbox.publish("Message")
:ok
iex(2)> 
16:06:27.577 [info]  Processing message #0
iex(2)> Sandbox.publish("Message")
:ok

16:06:29.594 [info]  Processing message #1
iex(3)> Sandbox.publish("Message")
:ok
iex(4)> 
16:06:30.953 [info]  Processing message #1
 
16:06:30.965 [error] GenServer Sandbox.Consumer terminating
** (stop) :missing_message
Last message: {:"$gen_consumer", {#PID<0.180.0>, #Reference<0.1998684319.2482241540.37822>}, [{"Message", 1}]}

Welcome to the forum!

Just from a quick scan of your code I don’t see the Producer managing demand.

I recommend reviewing this topic:

1 Like

(Thanks for the welcome !)

I don’t use the demand as it is always 1 in this case in order to keep things as simple as possible.

Edit: I’m reading your recommendations. Thanks.

The consumer doesn’t reissue demand.

So if the consumer demand cannot be satisfied at the time of demand the Producer has to “store that demand” and release the demanded events once they become available.

So in your producer’s handle_cast/2 you should only release events if you have “stored demand” and deduct the released events from the stored demand.

Right now you are releasing the event immediately in handle_cast/2 but also store it to be released again, in turn when handle_demand is received.


Also you may like

http://erlang.org/doc/man/queue.html

Example: https://stackoverflow.com/questions/35102899/simple-efficient-fifo-queue-with-elixir#answer-35116873

instead of using the List module.

1 Like

I use the queue module in another project, but here it’s just a sandbox project reduced to its maximum. Thanks for pointing it.

Given your advice, here’s a quick - and dirty - attempt at a solution:

defmodule Sandbox.Producer do
  use GenStage

  require Logger

  alias __MODULE__, as: State

  defstruct buffer: [],
            counter: 0,
            to_ack: 0

  def start_link(_) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    {:producer, %State{}}
  end

  def handle_cast({:publish, payload}, %State{} = state) do
    message = {payload, state.counter}
    {next_event, buffer} =
      if Enum.empty?(state.buffer) do
        {[message], []}
      else
        {[], List.insert_at(state.buffer, -1, message)}
      end

    {:noreply, next_event, %State{state | buffer: buffer, counter: state.counter + 1}}
  end

  def handle_demand(_, %State{buffer: []} = state), do: {:noreply, [], state}

  def handle_demand(_, state) do
    {:noreply, [List.first(state.buffer)], state}
  end

  def handle_info({:ack, id}, state) do
    # If the acked id we just got is the one we expected,
    # then we publish the next message from the buffer.
    # Otherwise we send the last message again (???)
    if id == state.to_ack do
      new_buffer = Enum.drop(state.buffer, 1)
      {:noreply, [], %State{state | buffer: new_buffer, to_ack: state.to_ack + 1}}
    else
      {:noreply, [], state}
    end
  end
end

Seems to work pretty well so far.
Am I on the right track ?

My GenStage is a bit rusty, but this:

def handle_demand(_, %State{buffer: []} = state), do: {:noreply, [], state}

is going to be a problem. What’s going to happen is that if a consumer asks for more events when the producer (temporarily) has an empty buffer, that consumer will never work again. As @peerreynders said, consumers don’t reissue demand: if they ask the producer for more work, they will never ask again until they get more work.

To fix this, you need to buffer demand in your producer: https://hexdocs.pm/gen_stage/GenStage.html#module-buffering-demand Basically, if the producer is asked for events but has none to send, it must remember this unfulfilled demand. Then, when the producer does get more events, it should send as many as necessary to satisfy the unfulfilled demand, and add any extra to the internal buffer.

Here’s a small example to give you the idea (I know you mentioned the consumer only asks for 1 event at a time in your case, but this will make it clearer, I hope):

Producer has 5 events in buffer
Consumer asks for 3 events
Producer emits 3 events

Producer has 2 events in buffer
Consumer asks for 3 events
Producer emits 2 events, increments buffered demand by 1

Producer has 0 events in buffer, buffered demand is 1
Consumer asks for 3 events
Producer increments buffered demand by 3

Producer has 0 events in buffer, buffered demand is 4

<Producer gets/creates 5 new events>

Producer emits 4 events (i.e. the buffered demand)
Producer has 1 event in buffer, buffered demand is back down to 0
Consumer asks for 3 events
Producer emits 1 event, increments buffered demand by 2

Producer has 0 events in buffer, buffered demand is 2

In summary, since consumers won’t ask for work more than once if they don’t receive any, the producer must remember how many events are “owed” downstream and emit them when possible. Hope this helps!

1 Like

Just from the hip here - but it should get my meaning across:

def split(demand, buffer) do
  length = length(buffer)

  cond do
    demand < 1 ->
      {[], buffer, 0}

    demand < length ->
      {ready, rest} = Enum.split(buffer, demand)
      {ready, rest, 0}

    true ->
      # demand >= length
      {buffer, [], demand - length}
  end
end

def handle_cast({:publish, payload}, %State{} = state) do
  new_buffer = List.insert_at(state.buffer, -1, {payload, state.counter})
  {events, pending, demand} = split(state.demand, new_buffer)
  {:noreply, events, %State{state | buffer: pending, demand: demand, counter: state.counter + 1}}
end

def handle_demand(demand, %State{buffer: []} = state) do
  {:noreply, [], Map.update(state, :demand, demand, &(&1 + demand))}
end

def handle_demand(demand, state) do
  {events, pending, demand} = split(demand + state.demand, state.buffer)
  {:noreply, events, %{state | buffer: pending, demand: demand}}
end
2 Likes

I think I start to understand to whole mechanism, thanks to both of you !

I wrote about this recently https://blog.jola.dev/push-based-genstage.

It seems like it’s been mostly explained though!

3 Likes