I am wondering if anyone has any suggestions or ideas regarding the best way to handle flow control in a real-time streaming app I am building. I apologize for the long post.
Generally, I believe I can solve at least some of my problems using a combination of 1 or more GenStage, worker pools, tasks, brokers/ready signaling. I have some other concerns such as GC and minimizing sends that I can save for responses.
Overall, I’m looking for a suggestion on the best way to organize processes for my pipeline.
I have a dynamic graph of processors possibly on multiple BEAM nodes that perform computations. Event data is read from a
ring buffer per-processor stored in
ETS. Each event has a corresponding
sequence number. Work is performed in order of sequence number and only if an event is interesting to a particular processor.
Output is sent downstream to a writer in the order it is processed - by sequence number. Each writer process inserts data into its own ring buffer and signals upstream whenever there is a given amount of minimum space in its ring buffer. This is a lot like GenStage except a bit more decoupled, optimized more for my specific use-cases, and supports load balancing/queueing across many different producers.
The output produced by computations can be 0 to n items in size, therefore the data is wrapped into a stream when possible. Data is batched in general - 1 or more batches per sequence number and per send to the writer. A sequence number is
done when its output has been fully written to the destination ring buffer. This may or may not include ACKing back asynchronously depending on config (as well as journaling) after each item or after n items.
My overall goal is I want to make sure that there are always computation results in-flight to be delivered downstream to the writer. A single sequence number may produce huge amounts of items, 0 items, or only 1 item; therefore I cannot split on sequence number alone. In other words, solutions that deliver all results eagerly to a process mailbox like using
Task.async_nolink without somehow splitting up things further won’t work.
Moreover, since the results of each sequence number are sent to a writer in order, I have a form of head of the line blocking. I experimented with delivering results out of order and re-ordering, but since it is hard to tell how many items might be produced, this both bloats memory/buffers and makes bookkeeping/rewinds much more tricky to the point where on average it is usually easier and often faster to just use head of the line blocking. The best I can do is deliver the results created by a single sequence number out of order which the stream already handles. Note that even though there is head of the line blocking, the system is otherwise hugely parallel in terms of many processes sending to writers - the ordering is a business requirement.
Here’s a simplified version of the steps taken:
- Get the next sequence number
- Use the sequence number to lookup an event’s data by sequence number in ETS and check if it is interesting. If not interesting, publish the sequence number.
- Perform work on the event. If there is no output, publish the sequence number and move on to the next.
- If the writer has already signaled it is ready, send at most the amount of items the writer asked, otherwise, wait.
- Repeat step 4 until the batch produced by the current sequence number(s) is drained.
- Publish the sequence number once an ACK is received. If a NACK is received, respond accordingly.
The key here is sending as much as possible to the writer in a single batch, but not loading in too much into memory or being forced to wait on a potentially blocking computation. The data should ideally be ready for the writer at all times without
realizing more than is needed. Simply put - be at least 1 output batch ahead of the writer when possible and dispatch realized results.
Ideally, whatever process is sending to the writer should have already received the ready signal before knowing it has new data so it may send ASAP without waiting for “ready” confirmation from the writer. The obvious exception is when the writer is actually backlogged.
My basic components, some of which are optional include:
- Seq number producer - Usually a GenStage producer or GenServer that just sends an integer representing the next sequence number. It can also be used to filter out uninteresting events earlier in the pipeline.
- Seq number consumer - Performs or launches computation (via task supervisor or worker pool) and usually a GenStage consumer or consumer supervisor.
- Broker - Used for any kind of async notification of 2 parties to signal ready + some request data
- Writer - Performs the write to the destination buffer. May or may not be on the same node and typically would use a broker to signal ready + max event count, but I will omit that here for brevity.
- Dispatcher - GenServer process used to just peform sending work and receive writer ready notifications typically. Some options will be used to pull data from another process, other times receive data.
Here are some rough options using GenStage in whole or part that can be discussed more later:
seq producer -> seq consumer -> writer
seq producer -> seq consumer -> dispatcher -> writer
seq producer -> seq consumer <-> broker <-> dispatcher -> writer
seq producer -> seq consumer -> task <-> broker <-> dispatcher -> writer
seq producer -> seq consumer <-> broker <-> dispatch prod.-> dispatcher cons. -> writer
seq producer -> seq consumer -> task -> writer
seq producer -> seq producer consumer (worker) -> seq consumer -> writer
seq producer -> worker(s) from pool of standby workers -> writer
Since this post is already too long, I’ll summarize some basic thoughts:
- Use GenStage to signal upstream when to do work/emit results
- Use a worker pool with 1 worker active or otherwise ensure there’s a sort of “send” mutex via message/ETS lookup.
- Use a dispatcher so we have a process always ready to send to the writer and can handle the back/forth with the writer + bookkeeping like monitors.
- Use ETS to shovel data to avoid send overhead. Seems redundant since destination is eventually a different ETS table. Plus side is it makes tracking progress easy + restart survival/resume. Already have a lot of ETS tables so want to avoid more.
- Use intermediate processes/GenStages for the pre-fetch and/or compressing data using
term_to_binaryto cut down on mem size/send overhead potentially.
- Use a broker to signal to parties when they are ready, and to add probes/dropping/request queueing to some degree if/when necessary. Ex:
- Use Tasks/Pooled Workers that can be destroyed each run or after some given time/hibernated to deal with GC and avoid blocking if necessary.
- Avoid polling
- Avoid giving workers too much responsibility/stuff to do.
- Try to send directly from what created the data without extra message passing.
- Minimize number of processes when possible.
My current solution is essentially a more complicated version of the last one listed. The gist is that output is sent directly to the writer from the worker that is computing the results. In this case, I’m using GenServers/gen_statem to monitor workers I grab from a pool, sending each 1 or more sequence numbers (or alternatively I can fetch the event first from ETS). I’ve also experimented with having each worker itself be a GenStage to ensure there’s data in flight.
My other major alternative is one mainly using manual asking and multiple GenStages. My original design used a consumer supervisor, but I found this to be a bit obtuse with regard to restarts/worker control. Overall, a lot of the GenStage solutions mandate more message passing of output than I’d like unless I send to the writer from a task/worker directly.
At this point I’m a little unsure if there’s a better way. I suspect there are simpler or more performant ways. Suggestions? Thoughts? Thanks.