Using GenStage to batch events

I am working with an external API (external from the Elixir server at least) and I want to minimize the amount of API calls to a particular endpoint. It feels like GenStage is a good fit for this use case but I’ve had a little trouble getting everything to line up.

I want to:

  • Collect all events every 100ms
  • Batch the events into batches of up to 150
  • Have three workers running in parallel to send these batched requests to the remote API

These seem to fit into three GenStage stages

  • Collector - GenStage :producer that collects events from other parts of the elixir system in a FIFO queue (based on :queue) and sends the events only in handle_demand/2
  • Batcher - GenStage :producer_consumer that asks the Collector for 1000 events every 100ms and batches them into batches of 150 (runs in :manual mode)
  • RequestSupervisor - GenStage ConsumerSupervisor that requests the batched events from the Batcher and starts workers that call the external API

The code I have seems to work but I feel like I may be going against the ethos of GenStage since I’m not really propagating demand all the way up the chain. Specifically the Batcher and the Collector both mostly ignore demand. The Batcher is set to :manual mode and asks for a static 1000 events every 100ms.

Another issue is that this setup will always incur a penalty of 100ms on each event even if there are more than 150 events that are added to the Collector at once.

But also keep in mind that I only expect about 5-10 events every 100ms (and maybe even less). But I want to have a good base for future scaling if necessary.

Any thoughts on this architecture? Is there anything that I haven’t considered that I should consider?

7 Likes

Or perhaps this setup would be better:

  • Collector - GenStage :producer that collects events from other parts of the elixir system in a FIFO queue (based on :queue).
    • When it receives events and has more than 150 events, then immediately emit the events.
    • Emit any full and partial batches of events in handle_demand/2
  • Delayer - GenStage :producer_consumer that asks the Collector for 1000 events every 100ms
    • Does this still need to run in :manual mode?
  • RequestSupervisor - GenStage ConsumerSupervisor that requests the batched events from the Delayer and starts workers that call the external API

I could probably find a better name than Delayer. Does this setup seem better? I think it might be because previously Batcher and Collector both needed to work together for Batcher to be able to actually batch events. Now batching events is the responsibility of the Collector. Also the demand can handled a little better in the Collector rather than just ignored, but now I need to prototype to make sure that I can get the batching to still work the way I want.

How about just using a plain GenServer with a FIFO queue and a timer?