Comparing two GenStage Designs

After hearing @josevalim’s keynote at ElixirConf, I got inspired to re-write our application’s data-processing pipeline (which works OK but is pretty inefficient) to use GenStage. I initially imagined I’d build it doing something like this:

Here are the parts:

  • The StaleShardIDProducer periodically reads from an external datasource to see what shards are stale due to newly available data snapshots, and produces stale shard IDs.
  • The ShardWorker stages are producer-consumers that receive stale shard IDs from the StaleShardIDProducer and produce a list of sub-tasks that must be performed to finish building the shard.
  • The SubTaskWorker stages perform the sub-tasks, and send the results back to the ShardWorker to be injested into the shard. When all sub-tasks have completed, the built shard is persisted.

I got started prototyping this and quickly realized that there’s a problem: when a producer-consumer (such as my ShardWorker) emits events from handle_events/3–as I was planning on having it do–that is treated as “finishing” the events it was given, and it will turn around and send more demand upstream even though it hasn’t finished building the shard (it needs to wait for all subtasks to complete for that). That would have the effect of having it “move on” to the next shard when we really want to make it wait until the shard is complete before requesting a new one from the StaleShardIDProducer.

From my understanding of GenStage at the time, I couldn’t figure out how to make this design work, so I tweaked it slightly:

This design has a couple differences:

  • The ShardWorker stages are consumers, which allows them to wait to finish an event until all the sub tasks are complete and it can persist the shard.
  • There are actually two GenStage flows here instead of just one. The SubTaskBuffer is a new producer that the SubTaskWorker stages subscribe to. The ShardWorker stages send sub-tasks directly to the buffer so they will get worked on. Essentially, the SubTaskBuffer and SubTaskWorker stages form a worker pool with the buffer as the entry point for clients to send work to.

I prototyped this, got it working, and was pretty happy with it. I haven’t gotten to implement a production version of this, though. And today I was re-reading the GenStage docs and noticed something I hadn’t noticed before: the optional handle_subscribe/4 callback allows you to implement a :manual mode, where demand is not automatically sent upstream. Instead, you send demand upstream when appropriate by manually calling GenStage.ask/3. If I’m understanding this correctly, I think this means that my initial design is possible – I just have to make the ShardWorker stages implement :manual mode, where they only demand a new stale shard ID after finishing and persisting a shard.

So now I’m wondering which direction to go. What are the tradeoffs between these designs? I easily understand how the worker pool works (in fact, I’ve built a productionized version of it) but I’m a lot fuzzier on how things work when you have N consumers subscribed to M producers. I’m hoping @josevalim can weigh in with a recommendation :).

4 Likes

@myronmarston first let me clarify that, although the producer_consumer sends demand as soon as handle_events is done, those events are not consumed until demand is received from downstream. This is a form of pre-fetching to ensure we always have data in flux.

That said, the issue with your second design is that you no longer have back-pressure all the way. It won’t be a problem if the ShardWorker is the slowest layer in your pipeline but it also means you are not gaining anything by having two layers of GenStage.

For example, instead of a second pipeline, you could directly start the shard worker children directly in a supervisor:

          /[consumer]\
[producer]-[consumer]-[supervisor]
          \[consumer]/

It would work similarly to what you have designed: the shard worker will start multiple children in the supervisor and wait for those children to reply back.

However, my preferred solution would be to simply not do any asynchronous work in the stages. The question is: does the stale shard id producer provides enough events to make all shard workers busy, using 100% of your machine resources without the need to start subworkers?

Imagine the stale shard id producer can provide events faster than they can be consumed. In this case, you have enough work on each shard worker to use all cores without needing to break each shard worker in a subtask worker. In this scenario, if you have 4 cores and 4 shard workers, that will be enough. Breaking it into smaller tasks won’t buy you anything because all of the tasks will still be working towards the same 4 shard workers.

However, this may not be the case. You may process events faster than the producer can emit them. Or maybe the subtask workers are IO bound. In this case, you can also keep with a single stale shard id and multiple subworkers except you start multiple tasks (Task.async or Task.Supervisor.async) and await for them inside handle_events/3 for each subworker. This way you keep the synchronicity and still can break the work apart.

3 Likes

@myronmarston first let me clarify that, although the producer_consumer sends demand as soon as handle_events is done, those events are not consumed until demand is received from downstream. This is a form of pre-fetching to ensure we always have data in flux.

Thanks, that’s a nice clarification. That said, I’m not sure it changes anything about my design, because the downstream stages will be triggered to ask for more demand based on min_demand/max_demand rather than waiting until the shard is finished being built. (At least, that’s my understanding).

That said, the issue with your second design is that you no longer have back-pressure all the way. It won’t be a problem if the ShardWorker is the slowest layer in your pipeline

It is the slowest layer – after all, it waits until all N sub-tasks are complete before finishing.

The question is: does the stale shard id producer provides enough events to make all shard workers busy, using 100% of your machine resources without the need to start subworkers?

Sometimes, but not always. I think there’s some important context here I forgot to mention that would help explain why I was considering these two designs instead of something simpler like what you suggest.

  • The sub-tasks fetch data snapshots from S3 and process them. Therefore, they are IO-bound and there is a significant benefit to having more sub-tasks running than the number of cores.
  • The stale shard ID producer is very “bursty” due to the scheduled nature of the external system that collects new S3 snapshots and notifies (via the “external data source” in the diagrams above) the stale shard ID producer. There are portions of the day where there are hundreds of stale shards to build. At other times of day, there is only a trickle of individual stale shards.
  • The S3 snapshots can get quite large (say, 1-5 GB for the largest ones).
  • Often times, building a stale shard only involves fetching and processing 1 S3 snapshot (as when there is an existing persisted shard to start from and we have been notified of one new S3 snapshot). At other times, building a stale shard can involve fetching and processing 200+ snapshots (as when a user activates an old archived campaign). Sometimes, all shard builds must be done from-scratch due to schema changes in the shard data structure, and those builds will all involve fetching and processing N S3 snapshots (where N is the age in weeks of the campaign).

Given the size of the S3 snapshots and number of them involved in building some shards, we have to carefully limit the total number of in-flight snapshots being processed to prevent the BEAM from crashing due to memory exhaustion. As a point of comparison, the first version of our current build pipeline–which builds only one shard at a time, but uses a naive parallel map to fetch and process S3 snapshots in parallel–was unable to build shards for the largest, oldest campaigns. Once we put some concurrency limits in place (limiting it to process 20 S3 snapshots at a time) it was able to successfully build all shards.

When I set out to build a new version of our build pipeline using GenStage, my goal was to build a system that would concurrently build up to N shards and concurrently process up to M S3 snapshots. For example, in my prototype, it built 10 shards concurrently and processed 20 S3 snapshots concurrently. With the approach you’ve suggested (using Task.async for the sub-tasks), I don’t see a way to achieve the goal of having it process 20 S3 snapshots at a time. I can either allow each ShardWorker to process a small number of concurrent snapshots (say, 2) to ensure that N * 2 <= M–in which case builds for shards that have 200+ snapshots will take a very long time, which would be very undesirable in those situations where it’s the only shard being built–or I can allow each to process a larger number of concurrent snapshots and risk crashing the BEAM due to memory exhaustion. Having a set number of ShardWorker gen stages and SubTaskWorker gen stages allows me to carefully limit the concurrency of each of these, while ensuring that they are generally saturated.

Given what I’ve said above, does one of my two original designs make sense? Or is there something I’m not seeing that would make you still recommend what you outlined above?

I have one last question then: in your second design, how do you avoid the same issue you had in the first one but this time in the consumer? The consumer will also request for more items as soon as handle_events/3 is done. Are you waiting in handle_events/3 then? Because otherwise you could use the :manual mode in any of producer_consumer and consumer.

Given everything you said, it seems both designs are fine. You can also consider a third and fourth designs which are the similar to the two you proposed except the last step is a DynamicSupervisor. The DynamicSupervisor will allow you to spawn one child process per subtask, which may give better results (unless you are connecting to S3 on the init function of each SubTaskWorker). The amount of currently running tasks on this case will be on average (max_demand - min_demand / 2). I would recommend using master though since we have recently fixed a bug to make sure min_demand is respected (instead of always asking). 0.6.0 should be out this week.

I am still studying the GenStage mechanism, but from my still-limited understanding, each stage is its own unidirectional transformation, yes? I understand that signaling still occurs backwards for demand, kind of like back propagation in a neural network, but the actual transformation flows from the producer >> consumer. So the “send the results back to the ShardWorker to be injested into the shard” part seems to me to indicate that the “second stage” (See Notes 1) is not complete. This would seem to me to be the fundamental reason that the initial design acts as it does. Is this not the case?

This thought, I believe, is a symptom exposing the underlying misinterpretation of the system. The ShardWorker as first conceived is not the “slowest stage”, as it is only compiling the list of work that needs to be done (since it isn’t actually waiting on the subtask workers completion, )(See Notes 2). Conceptually to us, yes it seems like the slowest “stage” in the general sense of the term, but within the GenStage definition of “stage”, it is not the slowest…did I word that correctly? :thinking:

So you could then have the naive parallelization in the implementation of the ShardWorker that blocks, but this would only allow for the N shards back pressure, not addressing your overall goal of also limiting M concurrent snapshots. This is how I read what @josevalim is saying with not having back pressure “all the way”.

So I would think overall that it’s the age old problem of “naming things” here with the GenStage paradigm, and we would have to be very careful about our naming of the stages.

One GenStage To Rule Them All…

Rename ShardWorker to ShardWorkJobCompiler (or whatever) to indicate that its job is to create the list of work to consume.

This “list” information is the “job state” event that this stage produces. It would contain both the overall list of jobs to be completed, as well as the target subtask’s list item to be executed. But it doesn’t produce a single event, it produces multiple events, one for each subtask to be executed.

Create a ShardWorkerSubTask stage that processes each of these events. It knows from the job state which particular subtask is its responsibility, but it also contains the global job’s information. It does its designated snapshot subtask, and it produces the the same job state event it received, except modified to indicate that its subtask is complete. It then produces the modified job state event to the next stage, call it ShardWorkerAggregator.

The ShardWorkerAggregator is the consumer. It consumes the events, and it knows if it is complete or not, because it receives completion notices from each stage. It is called an aggregator because it reduces all of the subtask events to a single “job done” event.

Or…I could just be talking out of my ::cough::…

So, that’s my take on things, and I’m going ahead and posting this in case it either helps you, or I (and anyone who reads this) could learn what I’m totally missing here. :smile:

Notes

*1) Does the producer count as a stage when saying stage 1, 2, …, n?
*2) This is akin in my experience with spawning a thread and continuing executing, not blocking for the spawned thread to complete.

Yep. In my prototype, my handle_events/3 was defined as:

def handle_events([stale_shard_id], _from, state) do
  ShardProcess.perform_build(stale_shard_id)
  {:noreply, [], state}
end

Where ShardProcess implements a GenServer. It’s perform_build function is defined like so:

def perform_build(stale_shard_id) do
  {:ok, pid} = start_link(stale_shard_id)
  GenServer.call(pid, :await_build_completion, 600_000)
end

Besides the fact that the ShardProcess made it easy to have ShardWorker.handle_events/3 block while it waits for the shard build to complete, I thought this would avoid some of the garbage collection problems I’ve ready about when you have large ref counted binaries touched by multiple processes. As we all know, the most efficient garbage collector is a process exiting, and I thought it would work best to limit the data touched by the long-running processes (such as the ShardWorker stages) to only the build pipeline metadata and not any of the data used by an individual shard.

The DynamicSupervisor will allow you to spawn one child process per subtask, which may give better results (unless you are connecting to S3 on the init function of each SubTaskWorker). The amount of currently running tasks on this case will be on average (max_demand - min_demand / 2). I would recommend using master though since we have recently fixed a bug to make sure min_demand is respected (instead of always asking). 0.6.0 should be out this week.

I hadn’t really considered the DynamicSupervisor yet–until I re-read the docs, I thought of it just as a replacement for :simple_on_for_one, but now that I’ve read the current docs, I think it sounds like exactly what I want. I was actually worried about one thing with the worker pool design I had prototyped: if the min_demand/max_demand used by the SubTaskWorker consumers is 10/20, then they will ask for 10 to 20 subtasks at a time to work on, but if there is only one shard build being worked on by a ShardWorker (due to the lack of stale shards at a particular time of day), then we could have a situation where the sub-tasks for the only in-flight shard build are processing serially in a single SubTaskWorker. Since DynamicSupervisor takes care of spawning separate processes for each event, such a problem should not occur with it, if I’m understanding it correctly.

So I think I will look into using DynamicSupervisor. Thanks for the suggestion!

@ibgib – thanks for the reply. I don’t have time to respond right now but will try to reply later.

I would say that GenStage, as a library, only provides a unidirectional flow, but ultimately, each stage is a process, and can of course do normal Elixir/Erlang stuff like send messages. While it’s perhaps a bit non-standard to send results back, there are certainly valid reasons for doing so. (As I’ll explain below).

While I can see why you would think this, the ShardWorker stage does in fact wait on the sub task workers. In my prototype, it did so by calling this, as I showed in my answer to @josevalim above:

ShardProcess.perform_build(stale_shard_id)

ShardProcess here is a gen server, so this did the following:

  • Started the gen server
  • Waited for the gen server to completely finish building the shard and exit

The ShardProcess was responsible for sending sub tasks into the worker pool, keeping track of pending tasks, and receiving results until it got all results back–at which point it would persist the shard and exit.

Such a design would probably work, and might be conceptually simpler, but I don’t think it’s the one we will go with. The shard data structure we are building can get quite large (in the worst cases, multiple GB) and as such, we really only want it to exist in one process. Including it in a message to another process would involve copying the entire data structure, which would be quite slow.

In addition, we have designed the shard so that individual pieces (such as individual values in a map within the shard data structure) can be compressed individually. We get a pretty good compression ratio (about 20:1) so by having each sub task send its results back to the ShardProcess, it allows the ShardProcess to compress the results, and put it in the shard as sub tasks complete, which works out nicely to keep things from using more memory than needed. I don’t think we could get the same kind of benefits out of the design you have proposed.

1 Like

Thanks for taking the time to explain your feedback to me! :smile:

I’m using your use case as something to test GenStage and your additional info should be very helpful. Thanks again!