Guarantees provided by GenStage?

In general, the docs for GenStage seem to have very minimal documentation around how to deal with errors, what to expect in the presence of errors, at most once vs at least once delivery of events, etc. I’m having a hard time understanding the guarantees that GenStage provides in the presence of errors during stages.

For example, a textbook implementation of GenStage.BroadcastDispatcher (seen here: https://gist.github.com/stavro/15308d54b41c6feab371ee09de198b55) seems to have pretty lackluster behavior in the face of errors during a consumer stage. In a simple test, if I enqueue 10 events, and a single consumer has an error on the first one, the next 9 appear to never even be attempted.

Since the consumer is supervised independently, and only received a single event to process (as seen in the debug logs), why are the other nine events not handled by the consumer after its supervisor restarts it?

1 Like

That’s because it does not give any guarantees. :slight_smile: If there is an error, you lose everything that is in the message queue and you didn’t persist elsewhere yourself, as in any other Elixir process or abstraction, such as a GenServer.

1 Like

Got it - I misunderstood and thought that the message queue was kept outside of the consumer process.

Hmm. Ok. Thanks!

Are there any recommendations for an at-least once delivery of messages?

Eg if you had a pipeline with:

RabbitMQ -> GenStage -> GenStage -> GenStage -> Database

And you want to ensure that every message from the RabbitMQ is processed by the GenStage pipeline and written to the database.

Seems like you would need to keep track of the original message ID through the pipeline and only send the ACK to RabbitMQ it after writing to the database.

1 Like

The solution I’m using is to have the delivery_tag as part of the event and acknowledging to RabbitMQ only after the last stage: https://github.com/pma/wabbit/blob/master/examples/direct_exchange.ex#L98

4 Likes

Did you come up with a solution for this? I am in the same situation, although with Kafka

1 Like

Haven’t had to connect a multi-step GenStage pipeline to Kafka or Rabbit yet. But I would approach it the same way, updating the consumer offsets in the last stage of the pipeline.
Which Kafka client are use using?

2 Likes

@mbuhot Sorry for the late reply. I am using the Brod erlang library

1 Like