Hey everyone,
I have a GenStage producer that receives real-time events from a 3rd party websocket API and produces GenStage events. I want to process, then broadcast these events via a Phoenix channel and persist them to the DB as well.
For the latter, I wonder whether there is a way to both:
- Create chunks with a max size, e.g. ~20,000 events
- Create chunks of smaller sizes when a timeout occurs before the chunk is “full”
Both types of chunks should be persisted immediately.
I already have a working GenStage consumer that uses Process.send_after/4
to schedule a flush when the first event of a chunk is processed. However, I hope that a stream-based implementation could be terser.
Is this at all possible with Streams? AFAIK, Stream.interval/1
blocks the caller and is therefore not an option + I’d need to receive these events inside Stream.chunk_while/4
.