After hearing @josevalim’s keynote at ElixirConf, I got inspired to re-write our application’s data-processing pipeline (which works OK but is pretty inefficient) to use GenStage. I initially imagined I’d build it doing something like this:
Here are the parts:
- The
StaleShardIDProducer
periodically reads from an external datasource to see what shards are stale due to newly available data snapshots, and produces stale shard IDs. - The
ShardWorker
stages are producer-consumers that receive stale shard IDs from theStaleShardIDProducer
and produce a list of sub-tasks that must be performed to finish building the shard. - The
SubTaskWorker
stages perform the sub-tasks, and send the results back to theShardWorker
to be injested into the shard. When all sub-tasks have completed, the built shard is persisted.
I got started prototyping this and quickly realized that there’s a problem: when a producer-consumer (such as my ShardWorker
) emits events from handle_events/3
–as I was planning on having it do–that is treated as “finishing” the events it was given, and it will turn around and send more demand upstream even though it hasn’t finished building the shard (it needs to wait for all subtasks to complete for that). That would have the effect of having it “move on” to the next shard when we really want to make it wait until the shard is complete before requesting a new one from the StaleShardIDProducer
.
From my understanding of GenStage at the time, I couldn’t figure out how to make this design work, so I tweaked it slightly:
This design has a couple differences:
- The
ShardWorker
stages are consumers, which allows them to wait to finish an event until all the sub tasks are complete and it can persist the shard. - There are actually two GenStage flows here instead of just one. The
SubTaskBuffer
is a new producer that theSubTaskWorker
stages subscribe to. TheShardWorker
stages send sub-tasks directly to the buffer so they will get worked on. Essentially, theSubTaskBuffer
andSubTaskWorker
stages form a worker pool with the buffer as the entry point for clients to send work to.
I prototyped this, got it working, and was pretty happy with it. I haven’t gotten to implement a production version of this, though. And today I was re-reading the GenStage docs and noticed something I hadn’t noticed before: the optional handle_subscribe/4
callback allows you to implement a :manual
mode, where demand is not automatically sent upstream. Instead, you send demand upstream when appropriate by manually calling GenStage.ask/3
. If I’m understanding this correctly, I think this means that my initial design is possible – I just have to make the ShardWorker
stages implement :manual
mode, where they only demand a new stale shard ID after finishing and persisting a shard.
So now I’m wondering which direction to go. What are the tradeoffs between these designs? I easily understand how the worker pool works (in fact, I’ve built a productionized version of it) but I’m a lot fuzzier on how things work when you have N consumers subscribed to M producers. I’m hoping @josevalim can weigh in with a recommendation :).