Connecting to Azure Service Bus with rabbitmq-amqp1.0-client

Has anyone had any success connecting to Azure Service Bus with rabbitmq-amqp1.0-client? I’m trying with:

conf = %{
  address: "[namespace].servicebus.windows.net",
  container_id: "test_container",
  port: 5672,
  sasl: {:plain, "[keyname]", "[keyvalue"}
}
:amqp10_client.open_connection conf

but keep getting:
{:error, {:shutdown, {:failed_to_start_child, :reader, :badarg}}}

I’ve also tried with an additional tls_opts: {5671} but get the same result.

Mick

:wave:

According to

-type connection_config() ::
    #{container_id => binary(), % mandatory
      address => inet:socket_address() | inet:hostname(), % mandatory
      port => inet:port_number(), % mandatory
      % the dns name of the target host
      % required by some vendors such as Azure ServiceBus
      hostname => binary(),
      tls_opts => {secure_port, [ssl:ssl_option()]}, % optional
      notify => pid(), % Pid to receive protocol notifications. Set to self() if not provided
      max_frame_size => non_neg_integer(), % incoming max frame size
      idle_time_out => non_neg_integer(), % heartbeat
      sasl => none | anon | {plain, User :: binary(), Password :: binary(),
      % set this to a negative value to allow a sender to "overshoot" the flow
      % control by this margin
      transfer_limit_margin => 0 | neg_integer()}
  }.

address should be inet:socket_address() or inet:hostname().

In your example you use a binary.

Maybe try

conf = %{
  address: '[namespace].servicebus.windows.net', # note the single quotes
  container_id: "test_container",
  port: 5672,
  sasl: {:plain, "[keyname]", "[keyvalue"}
}
:amqp10_client.open_connection conf

That wouldn’t work according to the type spec above. You’d need something like tls_opts: {5671, []} instead.

Thanks - that definitely seemed to improve things. Now I’m getting an {:ok, pid} response, but that’s rapidly followed by:

[warn] Socket closed while in state 'expecting_frame_body'
[warn] terminating reader with ':normal'
[info] Conn received DOWN from Reader :normal :hdr_sent
[warn] terminating connection with ':normal'

Any ideas on that one? I seem to get it regardless of whether I use :tls_opts (your corrected version or mine) or not, so doesn’t seem to be related to that.

Also I read that typespec a little closer and finally picked up on the hostname comment, so I’m currently sending:

conf = %{
  address: '[namespace].servicebus.windows.net',
  container_id: "test_container",
  hostname: "[namespace].servicebus.windows.net",
  port: 5672,
  sasl: {:plain, "[keyname", "[keyvalue]"},
  tls_opts: {5671, []}
}

I’ll keep poking around

I wound up abandoning this approach for the time being as I couldn’t figure out what was causing the connection to close. I’m using the HTTP interface instead, but it has… quirks. The main one being that when you are receiving messages from a partitioned topic, you need to configure a timeout of at least 60s, and it may well take that long to receive a message that is there in the subscription/queue at the start of the request. I’ve reached out to MS for clarification - the REST API is definitely the poor cousin to AMQP.

Hey @mick
is there any update with your effort with rabbitmq-amqp1.0-client? I’m trying to connect with Azure bus as well and hit upon on the same error.

[warn] Socket closed while in state 'expecting_frame_body'

@shankardevy in the end I just went with the HTTP interface. I ironed out some of the quirks with MS support staff so it seems to work reasonably well. The main takeaways are to have long-ish timeouts on the HTTP connections (I use 150s) and in the query string (120s) and use a connection pool so the same tcp connection is re-used. I can’t really report on stability as my app is still in development, is essentially a toy, and therefore not well tested or used.

@mick if you are still after this, you could try this code. It works for me:

conf = %{
  :container_id => <<"test-container">>,
  :address => 'abc-shankardevy.servicebus.windows.net',
  :port => 5671,
  :hostname => <<"abc-shankardevy.servicebus.windows.net">>,
  :tls_opts => {:secure_port,[]},
  :sasl =>
    {:plain,<<"QUEUENAME">>,
           <<"keykey">>},
  :transfer_limit_margin => 100
}
{:ok, conn} = :amqp10_client.open_connection(conf)
{:ok, session} = :amqp10_client.begin_session(conn)
{:ok, sender } = :amqp10_client.attach_sender_link(session, "test-sender", "command")
out_msg = :amqp10_msg.new("my-tag", "my-body", false)
ok = :amqp10_client.send_msg(sender, out_msg)
4 Likes

I apologize for bumping such an old thread, but has anyone had success getting the rabbitmq-amqp1.0-client to build with mix lately? I keep getting make errors when it tries to build the lager dependency.

~/workspace/elixir/azservicebus/deps/amqp10_client @b09e35bf ?1                                                                                                                 root@MININT-D5HUE99 23:36:06 ❯ make                                                                                                                                                                                                       make[1]: Entering directory '/mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/rabbit_common'
make[2]: Entering directory '/mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/rabbitmq_codegen'
make[2]: Leaving directory '/mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/rabbitmq_codegen'
Error: No Makefile to build dependency /mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/lager.
erlang.mk:4433: recipe for target 'deps' failed
make[1]: *** [deps] Error 2
make[1]: Leaving directory '/mnt/c/Users/wisingle/workspace/elixir/azservicebus/deps/amqp10_client/deps/rabbit_common'
erlang.mk:4433: recipe for target 'deps' failed
make: *** [deps] Error 2

This comment helped me to build it rabbitmq-amqp1.0-common: publish to hex.pm · Issue #2577 · rabbitmq/rabbitmq-server · GitHub

Copy/pasting for reference:

  defp deps do
    [
      ...
      {:rabbitmq_server,
       github: "rabbitmq/rabbitmq-server",
       app: false,
       runtime: false,
       compile: false,
       override: true},
      {:amqp10_common, path: "deps/rabbitmq_server/deps/amqp10_common", compile: "make"},
      {:amqp10_client, path: "deps/rabbitmq_server/deps/amqp10_client", compile: "make"},

There has been some improvements, rabbitmq team has pushed amqp10_client to hex. it’s easier to include it to your project.

if you are given a connection string like:

Endpoint=sb://[namespace].servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=MyAccessKey;EntityPath=MyEntityPath

Then you can use it in your project like

    address = '[namespace].servicebus.windows.net'
    hostname = to_string(address)
    user = "MyKeyName"
    password = "MyAccessKey"
    port = 5671
    queue_name = "MyEntityPath"
    subscription_name = "MySubscriptionName" # This is not provided in the connection string but is an important value. With an invalid setting, you'll get "The messaging entity '....' could not be found.


    opn_conf = %{
      address: address,
      hostname: hostname,
      port: port,
      container_id: subscription_name,
      sasl: {:plain, user, password},
      tls_opts: {:secure_port, []},
      transfer_limit_margin: 100
    }

    {:ok, connection} = :amqp10_client.open_connection(opn_conf)
    {:ok, session} = :amqp10_client.begin_session(connection)

    {:ok, receiver} =
      :amqp10_client.attach_receiver_link(
        session,
        subscription_name,
        queue_name
      )

    :ok = :amqp10_client.flow_link_credit(receiver, 5, :never)

With this code snippet, the messages are sent to the caller’s process mailbox. For more advance usage checkout the source code
if you run it in iex, you can get the messages by running flush:

iex(1)> MyApp.run
iex(2)> flush
{:amqp10_event, {:connection, #PID<0.230.0>, :opened}}
{:amqp10_event, {:session, #PID<0.241.0>, :begun}}
{:amqp10_event, {:link, {:link_ref, :receiver, #PID<0.241.0>, 0}, :attached}}
2 Likes

A shameless plug! We have built a Broadway Producer for AMQP1.0 which simplifies lots of low level details, if you are already a Broadway user, highly recommend to try it out.

1 Like