I’ve just released Pachka - a library that allows to buffer messages coming from different processes and deliver them together in a single batch. A classic optimization that trades a bit of latency for a significant throughput increase
Features
Configurable batch sizes and timeouts
Customizable message sinks for different delivery targets
Automatic retries with customizable backoff
Overload protection with queue size limits
Graceful shutdown with message draining
Quick example
# Define a Sink module
defmodule MyApp.MessageSink do
@behaviour Pachka.Sink
@impl true
def send_batch(messages, _server_value) do
# Process messages in batch
:ok
end
end
# Start Pachka server
{:ok, _pid} = Pachka.start_link(
name: MyPachka,
sink: MyApp.MessageSink,
max_batch_size: 100,
max_batch_delay: :timer.seconds(1)
)
# Send messages
:ok = Pachka.send_message(MyPachka, %{event: "user_login"})
:ok = Pachka.send_message(MyPachka, %{event: "page_view"})
You can find concrete examples in the benchmarks folder.
Good points, let me give you my thoughts on these:
As I understand, GenStage is driven from bottom to top by demand from consumers. This approach works great when producers have a “read me x events” operation at their disposal - you can think of a database or a message queue for an example.
However in cases like processing HTTP events from multiple clients you don’t have that.
In that case you need to implement some kind of a buffer where you would store these messages, then you have to split them into demand sized chunks, then you also have to decide what to do with that buffer when there are still events left and no one is reading them. And that’s basically what Pachka is
Speaking of throttler, as I understand you are referring to the way server checks critical_queue_size on receiving new messages and responds with {:error, :overloaded} in Pachka.send_message/2 ?
I wouldn’t really call that a throttler by itself. The user is expected to react to the error in some way of course, be it retrying immediately (probably not a good idea), throttling or signaling this condition to the client (sending 429 or 503 HTTP errors for example). But Pachka does nothing by itself to throttle the flow.
If you’re worried that the responsiveness of the Pachka process will degrade if flooded with messages then yeah it certainly can, but that’s true for any individual BEAM process.
There are two things in Pachka to try to prevent this:
Actually exporting the batch is done in another process.
Overload check is really lightweight and is the first thing the code does when it receives new message.
There are also a couple of things a programmer can do to prevent this:
Impose limits upstream to exclude the possibility of a message flood. See ThousandIslandnum_acceptors and num_connections options if you’re using Bandit for example.