GenStage: Bounded buffering and serialization

The general problem I face is that I have N producers emitting events which must be processed in order with respect to each producer, but can be interleaved with the events emitted from other producers in the final consumer step. In addition, the number of events emitted from each producer must be able to be buffered up to a configurable limit, again on a per producer basis. This upper limit must be enforced in such a way that code pushing events into this pipeline can detect when the buffer is full and take appropriate action. I’m also writing this code to be consumed by another application.

In the general approach I’m considering, the application using mine as a dependency will start up a Producer per connected user and in my application I will start up a Buffer (:consumer_producer) per Producer for the purposes of buffering the input. These buffering stages will then forward the input to the final Consumer stage, which will have a number of consumers equal to some multiple of the number of cores on the machine and will process the input from all user specific stages.

It seems like my requirement of being able to buffer the input is an easy one to achieve. As long as I set the max_demand in my Buffer correctly and as long as the Producer only acknowledges events when they are emitted to the Buffer, code submitting events into the pipeline should be able to correctly handle the case that the buffer is full.

One assumption I’m working on is that given a pipeline with a chain of single producers/consumers that any series of events will be processed in the order that they are produced. Working on this assumption, the only thing I then have left to do is wire the stages up in such a way that no further events can be emitted past the buffer until the last emitted event is processed. The only thing is I can’t figure out how to do this without having the pipeline end with the Buffer, having it as a consumer, and then having the Buffer submit work one-by-one to a worker pool for processing. So rather than having N producers piping into a single set of consumers in one large pipeline, I’d have N separate pipelines and a separate worker pool.

Is there a way to fulfill my requirements using GenStage for the entire process, or does my need to serialize the processing of each event on a per-producer basis throw too big of a wrench in the works?

2 Likes

Consider each stage as a unit of concurrency / fault tolerance / back-pressure rather than of code organization. As as extreme, imagine you need to perform three operations, a, b and c on every event in the system. Putting each operation in a separate GenStage that are connected together is not the way to go. If your goal is code isolation, then Elixir modules and functions are the proper tool.

The reason I am saying this is that I am not sure I would have the buffer as separate stage. It could simply be part of the producer. I also wouldn’t necessarily mix all producers and consumers together, especially because items from the same producer needs to be processed in order.

From your description, the simplest solution seems to be N GenServers, one for each kind of producer that keeps the buffer and processes the events as they come. The processing can happen on Task. Alternatively, you can keep N pairs of GenStage, with a single producer and a single consumer, connected directly, each pair isolated from each other.

3 Likes

Splitting the final Consumer stage and having a Buffer/Consumer pair as a pipeline for each connected user would absolutely work, but at that point I feel like I am not really gaining anything over taking the N GenServer approach. It was something I’d considered doing, but was hoping to find a way to pipe everything into a single bounded set of consumers while still meeting my requirements.

I’m very excited at all of the fantastic software being build up with/around Elixir, and I’m looking forward to finding a use case that makes sense with and can leverage GenStage, but this doesn’t seem to be it. Just feels like it would be a case of using something because it is flashy and new rather than because it provides a concrete benefit for the specific use case. I have some thoughts around doing some log processing though…

1 Like