Connecting to Azure Service Bus with rabbitmq-amqp1.0-client

Has anyone had any success connecting to Azure Service Bus with rabbitmq-amqp1.0-client? I’m trying with:

conf = %{
  address: "[namespace].servicebus.windows.net",
  container_id: "test_container",
  port: 5672,
  sasl: {:plain, "[keyname]", "[keyvalue"}
}
:amqp10_client.open_connection conf

but keep getting:
{:error, {:shutdown, {:failed_to_start_child, :reader, :badarg}}}

I’ve also tried with an additional tls_opts: {5671} but get the same result.

Mick

:wave:

According to

-type connection_config() ::
    #{container_id => binary(), % mandatory
      address => inet:socket_address() | inet:hostname(), % mandatory
      port => inet:port_number(), % mandatory
      % the dns name of the target host
      % required by some vendors such as Azure ServiceBus
      hostname => binary(),
      tls_opts => {secure_port, [ssl:ssl_option()]}, % optional
      notify => pid(), % Pid to receive protocol notifications. Set to self() if not provided
      max_frame_size => non_neg_integer(), % incoming max frame size
      idle_time_out => non_neg_integer(), % heartbeat
      sasl => none | anon | {plain, User :: binary(), Password :: binary(),
      % set this to a negative value to allow a sender to "overshoot" the flow
      % control by this margin
      transfer_limit_margin => 0 | neg_integer()}
  }.

address should be inet:socket_address() or inet:hostname().

In your example you use a binary.

Maybe try

conf = %{
  address: '[namespace].servicebus.windows.net', # note the single quotes
  container_id: "test_container",
  port: 5672,
  sasl: {:plain, "[keyname]", "[keyvalue"}
}
:amqp10_client.open_connection conf

That wouldn’t work according to the type spec above. You’d need something like tls_opts: {5671, []} instead.

Thanks - that definitely seemed to improve things. Now I’m getting an {:ok, pid} response, but that’s rapidly followed by:

[warn] Socket closed while in state 'expecting_frame_body'
[warn] terminating reader with ':normal'
[info] Conn received DOWN from Reader :normal :hdr_sent
[warn] terminating connection with ':normal'

Any ideas on that one? I seem to get it regardless of whether I use :tls_opts (your corrected version or mine) or not, so doesn’t seem to be related to that.

Also I read that typespec a little closer and finally picked up on the hostname comment, so I’m currently sending:

conf = %{
  address: '[namespace].servicebus.windows.net',
  container_id: "test_container",
  hostname: "[namespace].servicebus.windows.net",
  port: 5672,
  sasl: {:plain, "[keyname", "[keyvalue]"},
  tls_opts: {5671, []}
}

I’ll keep poking around

I wound up abandoning this approach for the time being as I couldn’t figure out what was causing the connection to close. I’m using the HTTP interface instead, but it has… quirks. The main one being that when you are receiving messages from a partitioned topic, you need to configure a timeout of at least 60s, and it may well take that long to receive a message that is there in the subscription/queue at the start of the request. I’ve reached out to MS for clarification - the REST API is definitely the poor cousin to AMQP.

Hey @mick
is there any update with your effort with rabbitmq-amqp1.0-client? I’m trying to connect with Azure bus as well and hit upon on the same error.

[warn] Socket closed while in state 'expecting_frame_body'

@shankardevy in the end I just went with the HTTP interface. I ironed out some of the quirks with MS support staff so it seems to work reasonably well. The main takeaways are to have long-ish timeouts on the HTTP connections (I use 150s) and in the query string (120s) and use a connection pool so the same tcp connection is re-used. I can’t really report on stability as my app is still in development, is essentially a toy, and therefore not well tested or used.

@mick if you are still after this, you could try this code. It works for me:

conf = %{
  :container_id => <<"test-container">>,
  :address => 'abc-shankardevy.servicebus.windows.net',
  :port => 5671,
  :hostname => <<"abc-shankardevy.servicebus.windows.net">>,
  :tls_opts => {:secure_port,[]},
  :sasl =>
    {:plain,<<"QUEUENAME">>,
           <<"keykey">>},
  :transfer_limit_margin => 100
}
{:ok, conn} = :amqp10_client.open_connection(conf)
{:ok, session} = :amqp10_client.begin_session(conn)
{:ok, sender } = :amqp10_client.attach_sender_link(session, "test-sender", "command")
out_msg = :amqp10_msg.new("my-tag", "my-body", false)
ok = :amqp10_client.send_msg(sender, out_msg)
4 Likes

I apologize for bumping such an old thread, but has anyone had success getting the rabbitmq-amqp1.0-client to build with mix lately? I keep getting make errors when it tries to build the lager dependency.

~/workspace/elixir/azservicebus/deps/amqp10_client @b09e35bf ?1                                                                                                                 root@MININT-D5HUE99 23:36:06 ❯ make                                                                                                                                                                                                       make[1]: Entering directory '/mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/rabbit_common'
make[2]: Entering directory '/mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/rabbitmq_codegen'
make[2]: Leaving directory '/mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/rabbitmq_codegen'
Error: No Makefile to build dependency /mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/lager.
erlang.mk:4433: recipe for target 'deps' failed
make[1]: *** [deps] Error 2
make[1]: Leaving directory '/mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/rabbit_common'
erlang.mk:4433: recipe for target 'deps' failed
make: *** [deps] Error 2

This comment helped me to build it rabbitmq-amqp1.0-common: publish to hex.pm · Issue #2577 · rabbitmq/rabbitmq-server · GitHub

Copy/pasting for reference:

  defp deps do
    [
      ...
      {:rabbitmq_server,
       github: "rabbitmq/rabbitmq-server",
       app: false,
       runtime: false,
       compile: false,
       override: true},
      {:amqp10_common, path: "deps/rabbitmq_server/deps/amqp10_common", compile: "make"},
      {:amqp10_client, path: "deps/rabbitmq_server/deps/amqp10_client", compile: "make"},

There has been some improvements, rabbitmq team has pushed amqp10_client to hex. it’s easier to include it to your project.

if you are given a connection string like:

Endpoint=sb://[namespace].servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=MyAccessKey;EntityPath=MyEntityPath

Then you can use it in your project like

    address = '[namespace].servicebus.windows.net'
    hostname = to_string(address)
    user = "MyKeyName"
    password = "MyAccessKey"
    port = 5671
    queue_name = "MyEntityPath"
    subscription_name = "MySubscriptionName" # This is not provided in the connection string but is an important value. With an invalid setting, you'll get "The messaging entity '....' could not be found.


    opn_conf = %{
      address: address,
      hostname: hostname,
      port: port,
      container_id: subscription_name,
      sasl: {:plain, user, password},
      tls_opts: {:secure_port, []},
      transfer_limit_margin: 100
    }

    {:ok, connection} = :amqp10_client.open_connection(opn_conf)
    {:ok, session} = :amqp10_client.begin_session(connection)

    {:ok, receiver} =
      :amqp10_client.attach_receiver_link(
        session,
        subscription_name,
        queue_name
      )

    :ok = :amqp10_client.flow_link_credit(receiver, 5, :never)

With this code snippet, the messages are sent to the caller’s process mailbox. For more advance usage checkout the source code
if you run it in iex, you can get the messages by running flush:

iex(1)> MyApp.run
iex(2)> flush
{:amqp10_event, {:connection, #PID<0.230.0>, :opened}}
{:amqp10_event, {:session, #PID<0.241.0>, :begun}}
{:amqp10_event, {:link, {:link_ref, :receiver, #PID<0.241.0>, 0}, :attached}}
2 Likes

A shameless plug! We have built a Broadway Producer for AMQP1.0 which simplifies lots of low level details, if you are already a Broadway user, highly recommend to try it out.

1 Like

Sorry to revive this topic again but I think it’s better to keep the info in one place.

@slashmili your library is working great for me when using rabbitmq (which I use locally for testing) but I’m unable to connect to azure event hubs. I think the main problem might be that I don’t know how to specify the tls opts correctly.

So I went and tried to make a connection using only the amqp10_client library.

Without specifying tls_opts I’ll get an “connection socket was closed, connection state: 'expecting_frame_header`” warning and then the connection fails. I assume this is due to the fact that this is simply not allowed in azure.

When I specify the tls_opts as @shankardevy mentioned here I’ll get the error {:shutdown, {:failed_to_start_child, :reader, {:options, :incompatible, [verify: :verify_peer, cacerts: :undefined]}}} (and something about lib/code.ex:1525: Code.require_file/2).

I have made sure that the connection and credentials are working from my machine with two nodejs libraries (the official SDK and the rhea AMQP client) so I’m confident that the credentials are working as expected.

Has someone currently a working setup for this or any idea on how to solve the aforementioned error so that I’m able to debug this further?

the SSL setting is part of what ssl module in erlang expect. It’s a bit cryptic but if you have it working once, you can reuse it :grimacing:

this is the settings for OTP 26: Erlang/OTP 26 Highlights - Erlang/OTP

If you are using older version this might come handy

    ssl: [
      verify: :verify_peer,
      cacertfile: ~c"/etc/ssl/certs/ca-certificates.crt",
      customize_hostname_check: [
        match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
      ]
    ]

the crt files comes from ca-certificates package in Debian.

1 Like

Very nice! Thank you so much. I don’t know how this article slipped past me… When I deactivate the cert check with tls_opts: {:secure_port, [{:verify, :verify_none}]} it’s working - for testing this is currently enough.

Have you, by chance, ever used the managed identity authentication system in azure for the service bus? The pre-shared key variant is now working fine, but it doesn’t work with the oauth token one can get at the central identity management system.

I’ll whip up some documentation PRs for the off_broadway_amqp10 repo once I have understood and solved my issues if you’re interested :slight_smile:

1 Like

Glad that you made it working.

Sorry I don’t have any experience with OAuth credentials and amqp10_client.

If had to guess you need to find out how the OAuth token needs to be passed as:

sasl: {:plain, "<ServiceBusName>", "<access token>"},

Better to check the docs as this is just a guess.