Chunk stream by count OR by timeout, whichever comes first?

Hey everyone,

I have a GenStage producer that receives real-time events from a 3rd party websocket API and produces GenStage events. I want to process, then broadcast these events via a Phoenix channel and persist them to the DB as well.

For the latter, I wonder whether there is a way to both:

  1. Create chunks with a max size, e.g. ~20,000 events
  2. Create chunks of smaller sizes when a timeout occurs before the chunk is “full”

Both types of chunks should be persisted immediately.

I already have a working GenStage consumer that uses Process.send_after/4 to schedule a flush when the first event of a chunk is processed. However, I hope that a stream-based implementation could be terser.

Is this at all possible with Streams? AFAIK, Stream.interval/1 blocks the caller and is therefore not an option + I’d need to receive these events inside Stream.chunk_while/4.

Broadway has batching capabilities by amount and timeout.

IIUC, you’re referring to the batch_size and batch_timeout options.

Right now, Broadway would add too much complexity to the project. I’d like to avoid adding services like Redis or Kafka just to get some convenience for batching events.

WDYT, is this feasible with Streams? In the meanwhile, I also found Flow.Window but am not sure how to combine periodic windows and count windows.

Broadway can consume genstage producers. You seem to already be using genstage, so it shouldn‘t be a large change.

Gotcha. Just curious, how would you solve it without Broadway?