RabbitMQ Streams - RabbitMQ Streams Protocol Client

Overview

An Elixir Client for the RabbitMQ Streams Protocol.

Introduced in RabbiMQ 3.9, Streams is an alternative to Queues differing mainly by implementing “non-destructive consumer semantics”.

While this feature is avaiable when using the existing Queues, it shines when used with its dedicated protocol, allowing messages to be consumed extremelly fast.

This library implements a client for said protocol, managing the connection to the server and providing an API for producing, consuming and running available commands.

Installation

The package can be installed by adding rabbitmq_stream to your list of dependencies in mix.exs:

def deps do
  [
    {:rabbitmq_stream, "~> 0.4.0"},
    # ...
  ]
end

Consuming

First you define a connection

defmodule MyApp.MyConnection do
  use RabbitMQStream.Connection
end

You then can declare a consumer module by using the RabbitMQStream.Consumer:

defmodule MyApp.MyConsumer do
  use RabbitMQStream.Consumer,
    connection: MyApp.MyConnection,
    stream_name: "my_stream",
    initial_offset: :first

  @impl true
  def handle_message(_message) do
    :ok
  end
end

Producing

You can define a Producer with:

defmodule MyApp.MyProducer do
  use RabbitMQStream.Producer,
    stream_name: "stream-01",
    connection: MyApp.MyConnection
end

Then you can publish messages to the stream:

MyApp.MyProducer.publish("Hello World")

SuperStreams

A super stream is a logical stream made of individual, regular streams.

You can declare SuperStreams with:

:ok = MyApp.MyConnection.create_super_stream("my_super_stream", "route-A": ["stream-01", "stream-02"], "route-B": ["stream-03"])

And you can consume from it with:

defmodule MyApp.MySuperConsumer do
  use RabbitMQStream.SuperConsumer,
    initial_offset: :next,
    super_stream: "my_super_stream"

  @impl true
  def handle_message(_message) do
    # ...
    :ok
  end
end
10 Likes

Project Status

When I started this project, I antecipated that I had a use case for the project I currently help maintaing at my place of work. But after getting it to work, the scope of problems RabbitMQ-Streams helps me solve is smaller than I expected, and the current state of this library covers most of my current use cases.

Since then, I haven’t been having a lot of motivation to keep working on it, and adding the most recent features, besides just for the fun of working on an interesting project. That is the reason the project has not been to much active recently. And since I don’t have a use case, it has been hard to make some design decisions that I don’t personally have a use case for.

I decided to give this update as a call for feedback. If you are already using, please give some feedback on issues you maybe be encountering. And if you would like to but have been blocked by some missing features, feel free to tell me more about them. I like to solve problems, and these feedbacks would help motivate me to work more on this lib for the fun of it.

Feel free to @ or DM me on BlueSky @gaiva.bsky.social

2 Likes