GenStage: manual consumer does not receive events after restart

I apologize if this has been covered elsewhere in the forum or in the GenStage documentation. I have a workaround, but I’m primarily interested in understanding why the GenStage is working in the way that it is.

This is my sample project, https://github.com/corybuecker/elixir-genstage-test. All of the code is in this script.

I have a very simple manual consumer that demands one event every 100 milliseconds. The producer stores the incoming events in a queue. The events stored in the queue are integers from a range of one to 50. The consumer is configured to throw an error if it processes the number “25”.

The consumer restarts normally after the failing event, and continues to request and process events normally.

09:51:15.734 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142640>} for 1 event
09:51:15.734 [info]  producer: new demand
25
09:51:15.736 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
09:51:15.742 [error] GenServer #PID<0.154.0> terminating
** (stop) bad return value: 25
Last message: {:"$gen_consumer", {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142640>}, [25]}
State: {100}
09:51:15.837 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
26
09:51:15.837 [info]  producer: new demand
09:51:15.938 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
09:51:15.938 [info]  producer: new demand
27

After the queue is empty, the consumer continues to demand events and the producer replies with an empty list.

09:51:20.786 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
09:51:20.786 [info]  producer: new demand
09:51:20.786 [info]  producer: empty queue
09:51:20.887 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
09:51:20.887 [info]  producer: new demand
09:51:20.887 [info]  producer: empty queue

The problem occurs after this point if I try to push new events to the producer. For example, assume I name the process, connect to the node from a separate process and push 50 new integers to the producer’s queue. The consumer will process events normally and throw an error when it processes the number “25”. However, after it restarts and demands new events, it doesn’t receive an event from the producer.

10:03:19.110 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
10:03:19.110 [info]  producer: new demand
25
10:03:19.110 [error] GenServer #PID<0.155.0> terminating
** (stop) bad return value: 25
Last message: {:"$gen_consumer", {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>}, [25]}
State: {100}
10:03:19.110 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.212 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.313 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.414 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.515 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.616 [info]  consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
...

I have discovered that if I send anything other than an empty List to the consumer when the producer’s queue is empty, such as a List with a false value, [false], then this problem does not occur after the consumer restarts. Additionally, if there is no error thrown on processing the number “25” and the consumer never restarts, the problem does not occur.

Again, I apologize if has been covered elsewhere. I’m pretty new to Elixir, so I’m sure I’m just missing something obvious.

When you receive demand from a consumer and you cannot fulfill that demand, it is your responsibility to store that demand in the producer and emit entries as soon as the new element arrives.

So what is happening is that the consumer is asking for items and you are losing this demand, which puts all of GenStage buffers off-balance.

Btw, feel free to ask and welcome!

1 Like

Without looking too deeply into the specifics of the problem that you are describing I immediately noticed this in your code:

  def handle_call(event, _from, {queue}) do
    {:reply, :ok, [], {:queue.in(event, queue)}}
  end

  def handle_demand(_new_demand, {queue}) do
    Logger.info("producer: new demand")
    case :queue.out(queue) do
      {{:value, event}, queue} ->
        {:noreply, [event], {queue}}
      {:empty, queue} ->
        Logger.info("producer: empty queue")
        {:noreply, [], {queue}}
    end
  end

Your handle_demand ignores the demand quantity - effectively breaking the contract that allows the back-pressure mechanism to work. When you receive demand

  • You are obliged to release as many events as you can while not exceeding the demand.
  • If you cannot satisfy the entire demand you are obliged to store demand - something you are clearly not doing. The consumer will not issue a new demand until at least some of the of the previous demand (:min_demand) has been received. So when you are returning that empty event list because you have an empty producer queue, your producer state should also reflect that you have outstanding demand.

That way when you finally receive events in handle_call you can immediately release the outstanding demands/events to the consumer while only storing events the exceed the stored demand. Essentially:

  def handle_call(event, _from, {queue, 0}),
    do {:reply, :ok, [], {:queue.in(event, queue), 0}}
  def handle_call(event, _from, {queue, demand}),
    do {:reply, :ok, [event], {queue, demand-1}}
3 Likes

That makes perfect sense, thank you!