RabbitMQ Streams - RabbitMQ Streams Protocol Client

Overview

An Elixir Client for the RabbitMQ Streams Protocol.

Introduced in RabbiMQ 3.9, Streams is an alternative to Queues differing mainly by implementing “non-destructive consumer semantics”.

While this feature is avaiable when using the existing Queues, it shines when used with its dedicated protocol, allowing messages to be consumed extremelly fast.

This library implements a client for said protocol, managing the connection to the server and providing an API for producing, consuming and running available commands.

Installation

The package can be installed by adding rabbitmq_stream to your list of dependencies in mix.exs:

def deps do
  [
    {:rabbitmq_stream, "~> 0.4.0"},
    # ...
  ]
end

Consuming

First you define a connection

defmodule MyApp.MyConnection do
  use RabbitMQStream.Connection
end

You then can declare a consumer module by using the RabbitMQStream.Consumer:

defmodule MyApp.MyConsumer do
  use RabbitMQStream.Consumer,
    connection: MyApp.MyConnection,
    stream_name: "my_stream",
    initial_offset: :first

  @impl true
  def handle_message(_message) do
    :ok
  end
end

Producing

You can define a Producer with:

defmodule MyApp.MyProducer do
  use RabbitMQStream.Producer,
    stream_name: "stream-01",
    connection: MyApp.MyConnection
end

Then you can publish messages to the stream:

MyApp.MyProducer.publish("Hello World")

SuperStreams

A super stream is a logical stream made of individual, regular streams.

You can declare SuperStreams with:

:ok = MyApp.MyConnection.create_super_stream("my_super_stream", "route-A": ["stream-01", "stream-02"], "route-B": ["stream-03"])

And you can consume from it with:

defmodule MyApp.MySuperConsumer do
  use RabbitMQStream.SuperConsumer,
    initial_offset: :next,
    super_stream: "my_super_stream"

  @impl true
  def handle_message(_message) do
    # ...
    :ok
  end
end
9 Likes