Help regarding catching exceptions and catch-all handle_info in GenServer?

I currently have a simple Genserver which connects to RabbitMQ. I’ve followed the documentation about creating a stable connection:

This essentially amounts to a handle_info function which matches the :DOWN event and then retries the connection:

  def handle_info({:DOWN, _, :process, _pid, exception}, _) do
    IO.puts "[*] connection DOWN, retrying"

    {:ok, chan} = rabbitmq_connect
    {:noreply, chan}
  end

This works fine. However, I now want to capture these events. As an aside, I’d be interested in whether others would treat this as an exception or an event? I’m using Sentry to capture the exception like so:

  def handle_info({:DOWN, _, :process, _pid, exception}, _) do
    IO.puts "[*] connection DOWN, retrying"

    Sentry.capture_exception(
      exception,
      [stacktrace: System.stacktrace(), extra: %{extra: "results"}])

    {:ok, chan} = rabbitmq_connect
    {:noreply, chan}
  end

However, the issue here is Sentry.capture_exception returns something such as (from a Task):

{#Reference<0.4047852952.893648897.15807>,
{:ok, “9593df97c5ee40d182621ffc79b511af”}}

…and my Genserver crashes/complains because I’m not handling it. It then restarts and the connection is back up. Great, but I’d prefer it didn’t crash and skipped over Sentry.capture_exception to run rabbitmq_connect.

I then added a catch-all handle_info/2 but that doesn’t work because it is now catching {:basic_consume_ok, etc} which the RabbitMQ library is producing when it connects.

Error:

[error] GenServer #PID<0.509.0> terminating
** (FunctionClauseError) no function clause matching in App.Channels.Results.handle_info/2
App.Channels.Results.handle_info({#Reference<0.3115891151.2776891395.93003>, {:ok, “cba31070d98a4e75aa4dec6b1b627b56”}}, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.531.0>}, pid: #PID<0.542.0>})
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {#Reference<0.3115891151.2776891395.93003>, {:ok, “cba31070d98a4e75aa4dec6b1b627b56”}}
State: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.531.0>}, pid: #PID<0.542.0>}

My question is - how do I handle this properly? Can I not have my GenServer ignore the return from Sentry? Will I have to create a handle_info specifically for that? Little confused.

Issue seems to be that ^^

Would really appreciate any insight on where I’m going wrong here. Cannot seem to figure it out at all.

The following (with handle_info/2 specifically handling the Sentry call):

  def handle_info({:DOWN, _, :process, _pid, exception}, _) do
    IO.puts "[*] connection DOWN, retrying"

    Sentry.capture_exception(
      exception,
      [stacktrace: System.stacktrace(), extra: %{extra: "results"}])

    {:ok, chan} = rabbitmq_connect
    {:noreply, chan}
  end

  def handle_info({_, {:ok, _}}, chan) do
    {:noreply, chan}
  end

…results in a retry loop, which also creates a new connection every time (which means rabbitmq_connect is being called).

[] connection DOWN, retrying
[
] connection DOWN, retrying
[*] connection DOWN, retrying

Anybody care to explain how this is happening?

All works as desired if I wrap the Sentry call in spawn.

@BitGonzo is right. If you’re starting tasks you need to handle messages that they failed. Unless you have started the Task with start_child.

Not sure how far along you are, but stuff like this is handled in a framework I maintain called Conduit, which allows you to connect to rabbit.

Here’s an example of what it looks like to setup queues, exchanges, consumers, and pipelines: conduit_amqp_example/lib/conduit_amqp_example/broker.ex at master · conduitframework/conduit_amqp_example · GitHub

PM me if you’d like to know more. If you don’t want to use it, you might find the implementation of the AMQP adapter useful. Specifically, how it starts tasks for processing a message:

1 Like

Hey. I did actually run into Conduit when I revisited my AMQP code. Only thing which has me sceptical is that it isn’t as mature of a project.

This does make my life a whole lot easier though:

defmodule MyApp.Broker do
  incoming MyApp do
    subscribe :my_subscriber, MySubscriber, from: "my.queue"
    subscribe :my_other_subscriber, MyOtherSubscriber,
      from: "my.other.queue",
      prefetch_size: 20
  end
end

I’d be interested to know what gives you that impression.

Your versions. amqp is on 1.x, conduit_amqp looks like it is on 0.3?

Do you have examples of production applications it is being used in?

conduit_amqp is on 0.6.1, but the README does says 0.3.0. Just fixed that. conduit_amqp uses amqp under the hood. It basically adds connection, channel, and subscriber process management on top of that. Only reason it’s not 1.0, is because conduit isn’t 1.0. The reason conduit isn’t 1.0 is because I want to build two other adapters first. One for kafka, which I’ve started, and another for mqtt, which a friend is working on.

The company I work at has been using it for 2 years. We have 6 applications that use it and process ~1M messages daily and end up being idle most of the time. I know of 6 other companies that are using it, but I’m not sure if they’d want me to mention them.

I believe @MrDoops is using it. Perhaps he can share his experience.

3 Likes

Thanks for the explanation.

Good to know you’re using amqp behind the scenes and not reinventing that wheel.

“The reason conduit isn’t 1.0 is because I want to build two other adapters first”

If conduit didn’t have this gap in adapters and was just made for rabbitmq, would you consider it 1.0 stable? Is that particular adapter stable and working in production environments?

I’ll have a go at integrating and testing conduit now with the equivalent code and let you know how it goes.

I’m using Conduit with RabbitMQ as an API client worker queue and a data sync between two CRUD systems. I didn’t have to spend much time setting or configuring Conduit; it’s nice to have something like this be an low-mental-energy detail.

3 Likes

Yes. The only outstanding thing I’d like to address, is that currently RabbitMQ must be available when the application starts. That’s necessary because of how queues and exchanges are created currently. This would be a change that’s completely invisible to the user and I’m actually planning to work on this tomorrow. Note that, if RabbitMQ goes down while your application is running and comes back up, it will re-establish a connection successfully.

Hey, I’ve been playing around with conduit last couple of hours and have a few questions/suggestions…

#1 I think adding the message publishing example to conduit_amqp and conduit READMEs would be very helpful. I couldn’t find it any of the docs, but would have expected it under the publisher examples. Instead it is here: https://github.com/conduitframework/conduit_amqp_example

import Conduit.Message
alias Conduit.Message
alias ConduitSqsExampleQueue.Broker
message = put_body(%Message{}, %{"my" =&gt; "message"})
Broker.publish(:message, message)

#2 I tried to use the generator mix conduit.gen.broker --adapter amqp. Added the config, added it to my application supervision tree, attempted to start the application and received:

** (Mix) Could not start application pingerly: MyApp.Application.start(:normal, []) returned an error: shutdown: failed to start child: :broker_2
    ** (EXIT) shutdown: failed to start child: MyApp.Broker2.Adapter
        ** (EXIT) shutdown: failed to start child: MyApp.Broker2.Adapter.PubSub
            ** (EXIT) already started: #PID<0.829.0>

Note: this is if I attempt to run it with original broker. What is the suggested way to run multiple brokers? I’m still fairly new to Elixir (6 months) and still wrapping my head around the processes, so unsure why two separate brokers are using the same PID.

I have managed to setup an equivalent of what I had using amqp and a GenServer (and poolboy which I reverted due to timeouts) and I do much prefer conduit. I force-dropped some connections and it handled that well, and didn’t have a problem running through the number of messages I’m dealing with at the moment. Any suggestions on how to make the initial start-up resilient to lack of connection (deferred retry)?

Would just like to get those unnecessary publisher connections sorted now.

If you can throw insight on any of the above I’d be very grateful.

2 Likes

I’ve about finished up, found the docs regarding connection settings, and everything is working well.

I did realise you’ve used poolboy for conduit_amqp, so my timeouts must have been unrelated.

The code is a lot cleaner and easier to understand so thanks!

One question - where can I capture connection errors using a custom handler (such as Sentry)? I’ve been looking through the source code but not entirely sure.

1 Like

Hey, sorry I didn’t get to this sooner.

You can create a plug to capture errors. Here’s one I wrote for appsignal: conduit_appsignal/lib/plug/capture_error.ex at master · conduitframework/conduit_appsignal · GitHub

Agreed. The more high level docs need to be expanded. I’ll be writing guides for creating a simple app with both amqp and sqs. Those will most likely live with the main conduit docs. The tricky part is that the options vary between amqp and sqs, which is why they both individually list their options.

I assume based on your second message, this was resolved. My best guess at what’s happening here is that you’re starting the broker twice, but I’d need more info to tell. The broker itself will start up multiple consumers for a queue (configureable), so it shouldn’t be necessary to start multiple brokers. If you did want two brokers, perhaps because you’re using SQS and AMQP or you have two Rabbit instances, you’d need to create two separate broker modules with separate config.

Awesome

I’m working on that today. Here’s the ticket tracking it: Application goes down on startup when it can not connect to rabbitmq · Issue #7 · conduitframework/conduit_amqp · GitHub

Did you resolve this? If not could you provide more info? I’m not sure what this is referring to.

1 Like

Little more about this. Exceptions are treated as nack’s for messages. This plug reraises to allow earlier plugs to decide how to handle that. So, for example, you might want to use AckException to ack the message.

1 Like

Appreciate the response!

Yeh I think so. It could have been because I was using subscribe :name without a unique :name. As I say, still learning :stuck_out_tongue:

Perfect! Thanks.

This was mainly regarding the unnecessary publisher channels which were created with a subscribe-only broker (4/5 by default). But I did find the options to tune down the number of publisher channels so all is well.

I’ll keep my eye on that ticket, cheers! Overall very happy with how conduit has simplified my code, and can now focus on business logic rather than GenServer handling.

2 Likes

Good to know, cheers!

Weird, v0.12.5 of conduit should have given you a compile error if you defined two routes with the same name. Maybe you’re not on the newest?

Gotcha.

That’s absolutely the goal of the project, so I’m happy it’s working out for you!

1 Like

I’m on 0.12.5, so it is likely some other mis-configuration on my part! I’ll let you know if I run into it again and cannot figure it out.

1 Like

Cool! FYI, I just published 0.12.6 of conduit.

2 Likes

That ticket is closed now. See the announcement: Conduit - A framework for building reliable, event-based systems - #6 by blatyo

1 Like