I apologize if this has been covered elsewhere in the forum or in the GenStage documentation. I have a workaround, but I’m primarily interested in understanding why the GenStage is working in the way that it is.
This is my sample project, https://github.com/corybuecker/elixir-genstage-test. All of the code is in this script.
I have a very simple manual consumer that demands one event every 100 milliseconds. The producer stores the incoming events in a queue. The events stored in the queue are integers from a range of one to 50. The consumer is configured to throw an error if it processes the number “25”.
The consumer restarts normally after the failing event, and continues to request and process events normally.
09:51:15.734 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142640>} for 1 event
09:51:15.734 [info] producer: new demand
25
09:51:15.736 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
09:51:15.742 [error] GenServer #PID<0.154.0> terminating
** (stop) bad return value: 25
Last message: {:"$gen_consumer", {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142640>}, [25]}
State: {100}
09:51:15.837 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
26
09:51:15.837 [info] producer: new demand
09:51:15.938 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
09:51:15.938 [info] producer: new demand
27
After the queue is empty, the consumer continues to demand events and the producer replies with an empty list.
09:51:20.786 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
09:51:20.786 [info] producer: new demand
09:51:20.786 [info] producer: empty queue
09:51:20.887 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
09:51:20.887 [info] producer: new demand
09:51:20.887 [info] producer: empty queue
The problem occurs after this point if I try to push new events to the producer. For example, assume I name the process, connect to the node from a separate process and push 50 new integers to the producer’s queue. The consumer will process events normally and throw an error when it processes the number “25”. However, after it restarts and demands new events, it doesn’t receive an event from the producer.
10:03:19.110 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>} for 1 event
10:03:19.110 [info] producer: new demand
25
10:03:19.110 [error] GenServer #PID<0.155.0> terminating
** (stop) bad return value: 25
Last message: {:"$gen_consumer", {#PID<0.153.0>, #Reference<0.3357723273.3345219585.142763>}, [25]}
State: {100}
10:03:19.110 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.212 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.313 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.414 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.515 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
10:03:19.616 [info] consumer: asking {#PID<0.153.0>, #Reference<0.3357723273.3345219585.148409>} for 1 event
...
I have discovered that if I send anything other than an empty List to the consumer when the producer’s queue is empty, such as a List with a false value, [false], then this problem does not occur after the consumer restarts. Additionally, if there is no error thrown on processing the number “25” and the consumer never restarts, the problem does not occur.
Again, I apologize if has been covered elsewhere. I’m pretty new to Elixir, so I’m sure I’m just missing something obvious.