Hi,
I’m struggling since a few days with a dysfunctional piece of code using GenStage.
I think I don’t fully grasp the handle/demand mechanism of the latter, that’s why I’d like some insights about how I should write my code. Even though I know I could use Broadway to do what I want, I thought it would be better to a solid understanding of the underlying layer first.
When I want to produce events, my code use handle_cast/2 with {:noreply, events, state}
. The events are dispatched, and put in an internal buffer in the producer as well. But sometimes, typically when I reach max_demand events, I also get handle_demand/2 being called, which returns {:noreply, [List.first(state.buffer)], state}
. So the same message is sent twice, which crashes my pipeline down.
Here is the code:
defmodule Sandbox.Producer do
use GenStage
require Logger
alias __MODULE__, as: State
defstruct buffer: [],
counter: 0
def start_link(_) do
GenStage.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
{:producer, %State{}}
end
def handle_cast({:publish, payload}, %State{} = state) do
message = {payload, state.counter}
{:noreply, [message], %State{buffer: List.insert_at(state.buffer, -1, message)}}
end
def handle_demand(_, %State{buffer: []} = state), do: {:noreply, [], state}
def handle_demand(_, state) do
{:noreply, [List.first(state.buffer)], state}
end
def handle_info({:ack, id}, state) do
message = {_, expected_id} = List.first(state.buffer)
# If the acked id we just got is the one we expected,
# then we publish the next message from the buffer.
# Otherwise we send the last message again (???)
if id == expected_id do
new_buffer = Enum.drop(state.buffer, 1)
next_event = if Enum.empty?(new_buffer), do: [], else: [List.first(new_buffer)]
{:noreply, next_event, %State{state | buffer: new_buffer, counter: state.counter + 1}}
else
{:noreply, [message], state}
end
end
end
defmodule Sandbox.Consumer do
use GenStage
require Logger
alias __MODULE__, as: State
defstruct last_id: nil
def start_link(_) do
GenStage.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
{:consumer, %State{}, subscribe_to: [{Sandbox.Producer, []}]}
end
def handle_events([{_, id}], _from, %State{last_id: nil} = state) do
Logger.info("Processing message ##{id}")
# Since last_id can be nil either at startup or after a crash,
# we accept all values and proceed as if it was normal
ack_message(id)
{:noreply, [], %State{state | last_id: id}}
end
def handle_events([{_, id}], _from, state) do
Logger.info("Processing message ##{id}")
if id == state.last_id + 1 do
# Ok, we have the events in strict order
ack_message(id)
{:noreply, [], %State{state | last_id: id}}
else
# We miss a message
{:stop, :missing_message, state}
end
end
defp ack_message(id) do
send(Sandbox.Producer, {:ack, id})
end
end
I’m sure it’s the wrong to do that but if I return {:noreply, [], state}
from handle_cast/2
in the Producer, the demand loop stops, so nothing is being asked anymore.
I’m quite confused.
Can someone enlighten me on this ?
Edit: maybe I should show some IEx session to illustrate what’s going on
iex(1)> Sandbox.publish("Message")
:ok
iex(2)>
16:06:27.577 [info] Processing message #0
iex(2)> Sandbox.publish("Message")
:ok
16:06:29.594 [info] Processing message #1
iex(3)> Sandbox.publish("Message")
:ok
iex(4)>
16:06:30.953 [info] Processing message #1
16:06:30.965 [error] GenServer Sandbox.Consumer terminating
** (stop) :missing_message
Last message: {:"$gen_consumer", {#PID<0.180.0>, #Reference<0.1998684319.2482241540.37822>}, [{"Message", 1}]}