Asynchronous process cleanup

I’m building an app that consumes messages from RabbitMQ. Using the amqp library, I’ve got a gen_server set up that subscribes to Rabbit, and handles incoming messages. What I want, is for each incoming message to be processed asynchronously. That’s easy - I can grab the message, stick it in a Task and fire it off. What’s making it a bit more complicated is that I also want to ack or nack the message after the task that’s processing it finishes. To do that, I need to supervise the task (either through an actual Task.Supervisor or just Process.monitor), and then wait for an EXIT signal.

Here’s where things get a bit confusing for me. I can’t seem to find a way to asynchronously wait for a process exit. I either have to use Task.yield to catch the exit signal, or run a receive message loop (which is blocking). The way out seems to be to run an intermediate task/supervisor to wrap the cleanup logic, but it seems ugly.

So, as I understand it, I need:

:my_app → parent Task.Supervisor → cleanup task → some form of supervision → work Task

Is that right? If not, what is?

(Cross posting from stack overflow)

1 Like

gen_server already runs a receive loop for you. Any non-OTP messages such as EXITs or Task replies will arrive in the handle_info callback.

2 Likes

You might also want to review the way you’re doing it (it doesn’t mean you’re approaching it in the wrong manner, but without code and seeing what actually is being done it’s difficult to reason about…).

If you have a central Genserver which processes all queue messages, you might want to transform this into a Monitor that dynamically creates Genservers for each message (or a central genserver that then creates individual genservers). Then each genserver can use its state to hold the original message, cast to itself (or even call since they’re independent processes they won’t block each other) and basically handle the processing of the message from start to finish (and once finished exit normally).

Since it would be independent to each message in the queue you could initialise each genserver with all the needed relevant info (original ref, or id of the message in rabbitmq so you can requeue it in case of need, the message itself, etc), so this would be available in the state of the genserver throughout the whole process, in handle_call, cast, info and terminate (and you could reference itself at any point with self()). If you still need to create tasks (because each queued message might need a different kind of processing) you could pass them the ref of the genserver and once finished they could cast/call/send to it, keeping it all asynchronous.

1 Like

My concern there is that in order to be able to correlate an EXIT signal to a particular message, I have to keep track of all in-flight messages, instead of having all of that information live in the closure.

1 Like

I think that’s the path I’m headed down - a central genserver that spins up supervised tasks that then create monitored processes for the actual work and cleanup:

(from inside an async_nolink task execution

pid = spawn(fn -> apply(route.controller, route.action, [tag, redelivered, payload, routing_key]) end)

# Set up a monitor for this pid
ref = Process.monitor(pid)

# Wait for a down message for given ref/pid
receive do
  {:DOWN, ^ref, :process, ^pid, :normal} ->
    IO.puts "Normal exit from #{inspect pid}"
    Consumer.ack(consumer, tag)
  {:DOWN, ^ref, :process, ^pid, msg} ->
    IO.puts "Received :DOWN from #{inspect pid}"
    Consumer.nack(consumer, tag, !redelivered)
end
1 Like

This is what Tasks do, they set up a monitor, and optionally a link, and then automatically send a reply back. The only reason to use a GenServer instead of a Task in this instance is if you need the GenServer behavior to do the message processing.

This is not what you should use to decide to use a Task or GenServer. Both can be used even if you need different kinds of processing or not.

There is no way around this with your approach. Another approach would be to have the Task in charge of ACKing, it can call the original GenServer to send an ACK. The downside is that you can’t reliably send NACKs if you don’t monitor the Task from the original GenServer.

2 Likes

What about two layers as I mentioned above? GenServer spawns monitored tasks for each inbound message, includes reference to itself in the task. Task spawns monitored process, traps exit signal, tells GenServer how to respond (ACK/NACK). It’s three layers of processes then - the GenServer, the task itself, and the actual process.

1 Like

Oh, I get it now… In the event that the task responsible for ACKing dies (not the process within it), I still need a way to NACK the message, which brings with it the message tracking issue.

1 Like

What can be done if RabbitMQ’s channel gets restarted while the worker-GenServers, which are in charge of ACKing, are still doing the work? If they are allowed to finish their work, their final ACK is now attempted on a different/restarted channel which causes a PRECONDITION_FAILED - unknown delivery tag error:

Another scenario in which the broker will complain about an “unknown delivery tag” is when an acknowledgement, whether positive or negative, is attempted on a channel different from that on which the delivery was received on. Deliveries must be acknowledged on the same channel.

I’m using the amqp library and monitoring when a channel goes :DOWN. Is best one can do to terminate all the mentioned worker-GenServers (presuming idempotency)?

Most problematic is the negative feedback loop, as until all the outstanding deliveries (from the previous-channel’s lifetime) are ACKed, the channel will keep getting closed & reopened. Per docs:

Acknowledging on a different channel will result in an “unknown delivery tag” protocol exception and close the channel.

Here’s the processes setup:

Rabbit.Receiver (a GenServer) whose :basic_deliver calls
Task.Supervisor.start_child/2 which calls
DynamicSupervisor which starts
GenServer children/workers.

(Above Task.Supervisor is used to make sure that Rabbit.Receiver wouldn’t become a bottleneck, as the RabbitMQ message metadata’s headers are being inspected. Let me know if this is perhaps overly defensive.)