Best way to create a buffer and routinely/batch flush to database?

I currently have a worker which is subscribed to a rabbitmq (RMQ) queue.

Each of the messages in RMQ is a result, and there could be thousands of messages coming through every second. However, I want to move this data into long-term storage (Postgres DB). I’m not sure if there are particular databases made for this kind of stream (open to suggestions), but I’m more interested in solving this particular problem with my current infrastructure.

I’m speaking to PG through Ecto. I cannot run an insert query for each result and so I want to batch them (let’s say into batches of 500 results).

The rules would be:

  1. when buffer reaches n results, flush to db
  2. regardless of whether the buffer has hit flush threshold, ensure frequent flush to db

I’ve heard GenStage is perhaps the way to go here?

Here is my inspiration taken from another aspect of my app (using stream):

Task.async_stream(&do_something/1) |> Stream.chunk(500, 500, [])

I’m still fleshing out the high-level concept of this so just looking for advice on how this might be approached even from a generic level.

1 Like