Pachka - a message batching library

Hi everyone!

I’ve just released Pachka - a library that allows to buffer messages coming from different processes and deliver them together in a single batch. A classic optimization that trades a bit of latency for a significant throughput increase :slight_smile:

Features

  • Configurable batch sizes and timeouts
  • Customizable message sinks for different delivery targets
  • Automatic retries with customizable backoff
  • Overload protection with queue size limits
  • Graceful shutdown with message draining

Quick example

# Define a Sink module
defmodule MyApp.MessageSink do
  @behaviour Pachka.Sink
  
  @impl true
  def send_batch(messages, _server_value) do
    # Process messages in batch
    :ok
  end
end

# Start Pachka server
{:ok, _pid} = Pachka.start_link(
  name: MyPachka,
  sink: MyApp.MessageSink,
  max_batch_size: 100,
  max_batch_delay: :timer.seconds(1)
)

# Send messages
:ok = Pachka.send_message(MyPachka, %{event: "user_login"})
:ok = Pachka.send_message(MyPachka, %{event: "page_view"})

You can find concrete examples in the benchmarks folder.

Let me know if you have any questions!

Repo: GitHub - vegris/pachka: Message batching library for Elixir applications
Hex: pachka | Hex
HexDocs: Pachka v1.0.0 — Documentation

1 Like

Looks great!

It would be great to highlight what problem does it solve that GenStage doesn’t.

Also, the throttler built on top of the process might quickly become a bottleneck.

1 Like

Thank you, @mudasobwa

Good points, let me give you my thoughts on these:

As I understand, GenStage is driven from bottom to top by demand from consumers. This approach works great when producers have a “read me x events” operation at their disposal - you can think of a database or a message queue for an example.

However in cases like processing HTTP events from multiple clients you don’t have that.
In that case you need to implement some kind of a buffer where you would store these messages, then you have to split them into demand sized chunks, then you also have to decide what to do with that buffer when there are still events left and no one is reading them. And that’s basically what Pachka is :smile:

There is an example in gen_stage where they implement such in-producer buffer. I’ll leave a link to illustrate my point: gen_stage/examples/gen_event.exs at v1.2.1 · elixir-lang/gen_stage · GitHub

Speaking of throttler, as I understand you are referring to the way server checks critical_queue_size on receiving new messages and responds with {:error, :overloaded} in Pachka.send_message/2 ?
I wouldn’t really call that a throttler by itself. The user is expected to react to the error in some way of course, be it retrying immediately (probably not a good idea), throttling or signaling this condition to the client (sending 429 or 503 HTTP errors for example). But Pachka does nothing by itself to throttle the flow.

If you’re worried that the responsiveness of the Pachka process will degrade if flooded with messages then yeah it certainly can, but that’s true for any individual BEAM process.
There are two things in Pachka to try to prevent this:

  1. Actually exporting the batch is done in another process.
  2. Overload check is really lightweight and is the first thing the code does when it receives new message.

There are also a couple of things a programmer can do to prevent this:

  1. Partition the load (using PartitionSupervisor for example)
  2. Impose limits upstream to exclude the possibility of a message flood. See ThousandIsland num_acceptors and num_connections options if you’re using Bandit for example.

Hope my answers help!

2 Likes