Overview
An Elixir Client for the RabbitMQ Streams Protocol.
Introduced in RabbiMQ 3.9, Streams is an alternative to Queues differing mainly by implementing “non-destructive consumer semantics”.
While this feature is avaiable when using the existing Queues, it shines when used with its dedicated protocol, allowing messages to be consumed extremelly fast.
This library implements a client for said protocol, managing the connection to the server and providing an API for producing, consuming and running available commands.
Installation
The package can be installed by adding rabbitmq_stream to your list of dependencies in mix.exs:
def deps do
[
{:rabbitmq_stream, "~> 0.4.0"},
# ...
]
end
Consuming
First you define a connection
defmodule MyApp.MyConnection do
use RabbitMQStream.Connection
end
You then can declare a consumer module by using the RabbitMQStream.Consumer:
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first
@impl true
def handle_message(_message) do
:ok
end
end
Producing
You can define a Producer with:
defmodule MyApp.MyProducer do
use RabbitMQStream.Producer,
stream_name: "stream-01",
connection: MyApp.MyConnection
end
Then you can publish messages to the stream:
MyApp.MyProducer.publish("Hello World")
SuperStreams
A super stream is a logical stream made of individual, regular streams.
You can declare SuperStreams with:
:ok = MyApp.MyConnection.create_super_stream("my_super_stream", "route-A": ["stream-01", "stream-02"], "route-B": ["stream-03"])
And you can consume from it with:
defmodule MyApp.MySuperConsumer do
use RabbitMQStream.SuperConsumer,
initial_offset: :next,
super_stream: "my_super_stream"
@impl true
def handle_message(_message) do
# ...
:ok
end
end






















