Lake - Erlang RabbitMQ Streams Client

Hi!

Some of you may have read the RabbitMQ announcement of Streams.
To quote the announcement:

A RabbitMQ stream models an append-only log with non-destructive consuming
semantics. This means that – contrary to traditional queues in RabbitMQ –
consuming from a stream does not remove messages.

As far as I can tell, there are currently a Java and Go client to connect to RabbitMQ streams, but no Erlang or Elixir client. To take Streams for a spin, we thus wrote a simple Erlang library called lake. The library misses features such as handling clustering, but it works to give streams a try from Erlang and Elixir. See the repository for a usage example.

I hope somebody here finds lake useful!

18 Likes

Hi there!

I just released v0.2.0. With this release, all messages from the current protocol specification are supported. That also means that SuperStreams can now be used! The API of lake is very rough around the edges, but playing around with RabbitMQ Streams works.

If you plan to play with SuperStreams, note that a SuperStreams need to be created outside of lake. The binary protocol does not have a command to create SuperStreams. Either use rabbitmqctl, or see lake_SUITE for how to create a SuperStream using AMQP.

3 Likes

Thanks for writing this library, I look forward to playing with it from Elixir!

I’ve started rabbitmq with:

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' rabbitmq:3.9-management

and enabled the Streams management plugin with:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management

I can see in the RabbitMQ logs and admin that Streams are enabled. But the lake connect fails. I get an error on the RabbitMQ console of unknown command. Here’s my code:

def connect do
    :lake.connect("localhost", 5552, "guest", "guest", "/")
end

And the error I get:

** (CaseClauseError) no case clause matching: {{:close, 1, 13, "unknown frame"}, ""}
    (lake 0.2.1) /.../lake/src/lake_raw_connection.erl:98: :lake_raw_connection.exchange_command_versions/1

Any thoughts about what I might be doing wrong?

This is because of the RabbitMQ version. I tested lake’s v0.2 against RabbitMQ 3.11 to make use of SuperStreams. RabbitMQ 3.9 misses the EXCHANGE_COMMAND_VERSIONS command. So, to work around this, use the option {:exchange_command_versions, false} when connecting. This is one of the things where the API of lake is still quite rough around the edges. The following should work:

def connect do
    :lake.connect("localhost", 5552, "guest", "guest", "/", exchange_command_versions: false)
end

With that, you can take a look into lake’s lake_SUITE for some examples on using RabbitMQ Streams.

Note that the RabbitMQ 3.9 release series is EOL. When I run your docker command, I see the following (non-fatal) error message:

2023-03-12 17:27:13.077571+00:00 [error] <0.230.0> This release series has reached end of life and is no longer supported. Please visit https://rabbitmq.com/versions.html to learn more and upgrade

W.r.t. RabbitMQ 3.9 being EOL, maybe I should add a minimal RabbitMQ version supported by lake at some point in time. But a nicer API should have a higher priority.

1 Like

Awesome, @evnu, thanks! I totally should’ve checked RabbitMQ versions.

I’m now connecting and starting to play with the API. I’m not an erlang expert, just a few years of elixir, so I’m not super well-equipped to help you work on the API. I can read erlang well enough but haven’t had much call to write it. Playing with lake might motivate me to ramp up my erlang skills, though.

Many thanks for the fast response and for getting me pointed in the right direction!

You don’t need to be an erlang expert to help out - I am mostly struggling with understanding what the API should “feel like”. So, if you have comments what feels wrong and what is harder than it should be, that information would be great! :slight_smile: