The general problem I face is that I have N producers emitting events which must be processed in order with respect to each producer, but can be interleaved with the events emitted from other producers in the final consumer step. In addition, the number of events emitted from each producer must be able to be buffered up to a configurable limit, again on a per producer basis. This upper limit must be enforced in such a way that code pushing events into this pipeline can detect when the buffer is full and take appropriate action. I’m also writing this code to be consumed by another application.
In the general approach I’m considering, the application using mine as a dependency will start up a Producer per connected user and in my application I will start up a Buffer (:consumer_producer) per Producer for the purposes of buffering the input. These buffering stages will then forward the input to the final Consumer stage, which will have a number of consumers equal to some multiple of the number of cores on the machine and will process the input from all user specific stages.
It seems like my requirement of being able to buffer the input is an easy one to achieve. As long as I set the max_demand in my Buffer correctly and as long as the Producer only acknowledges events when they are emitted to the Buffer, code submitting events into the pipeline should be able to correctly handle the case that the buffer is full.
One assumption I’m working on is that given a pipeline with a chain of single producers/consumers that any series of events will be processed in the order that they are produced. Working on this assumption, the only thing I then have left to do is wire the stages up in such a way that no further events can be emitted past the buffer until the last emitted event is processed. The only thing is I can’t figure out how to do this without having the pipeline end with the Buffer, having it as a consumer, and then having the Buffer submit work one-by-one to a worker pool for processing. So rather than having N producers piping into a single set of consumers in one large pipeline, I’d have N separate pipelines and a separate worker pool.
Is there a way to fulfill my requirements using GenStage for the entire process, or does my need to serialize the processing of each event on a per-producer basis throw too big of a wrench in the works?