How to handle graceful shutdown of a supervised connection?

I’m working on the gnat library and one of the use cases I want to support is to act as an RPC server for other applications to call. In this scenario I need to open a connection to the nats brokers and then subscribe to a few topic keys.

I have a ConnectionSupervisor which deals with trying to establish a connection and handling connection failures, but the consumers will need to re-register their subscriptions whenever the connection gets re-established.

I also want to handle a graceful shutdown so that when the supervision tree is shutting down I can first un-subscribe my consumers from future messages, then wait for the current messages to finish processing and then finally shut down my connection to the broker overall.

I’m guessing that this use-case has come up before as it is a pretty common scenario with subscription-based systems, but it doesn’t quite seem to fit into any of the tools I’m familiar with.

Ideally I’d like to let client code add something to their supervision tree that just specifies where to connect and what function to call for each of their subscriptions and let the library code handle graceful shutdown and establishing those subscriptions on connect and re-connect.

import Supervisor.Spec
rpc_settings = %{
  connection: %{
    connection_settings: [%{host: "127.0.0.1", port: 4222}, %{host: "127.0.0.1", port: 4223}],
    name: :gnat_rpc, # the name to register for the connection whenever it is established
  },
  consumers: %{
    "rpc.AccountService.search" => {AccountService, :search},
    "rpc.AccountService.create" => {AccountService, :create},
  }
}
children = [
  supervisor(Gnat.ConsumerSupervisor, [rpc_settings], name: :rpc_supervisor)
]
supervise(children, :one_for_one)

Does anyone know of good examples of doing this type of thing? Do you usually put all of this into the same part of a supervision tree? Do you trap exits and handle unsubscribing and waiting in your terminate/2 function or do you make this part of the overall API and handle it in handle_call/3 functions?

1 Like

In the same vein I was wondering if I could use a Task.Supervisor and kick off all of the processing for received messages in that supervisor, but it terminates its children immediately. Would it be reasonable to make a custom supervisor that acts the same as Task.Supervisor, but it tries to wait for all of its children to complete before shutting down?

For now I’ve started a first cut at this by separating the concerns for keeping a connection available from keeping the consumers available. https://github.com/mmmries/gnat/pull/59

The overall design here is to supervise the consumers separately from the connection. The consumers poll for the existence of a healthy connection and setup their subscriptions and monitor the connection. If the connection crashes for any reason they will go back to polling for a healthy connection.

As long as the consumers are supervised later in the supervision tree than the connection they should be able to gracefully shut themselves down before having the connection shut down.

I would still love to hear about how other people have designed similar systems.

So are you wanting to build something similar to how Phoenix.Channels works? You could use it as an example.

I’d say internal polling is a code smell in Erlang/Elixir. Rather than have the supervisor implement reconnect-with-backoff, which means it has to keep state about children, I would consider having the connection process be implemented as a state machine (gen_statem or gen_server + connection state), and handle reconnection on its own. Then the connection can notify consumers when it goes up/down gracefully, or just re-establish the subs transparently. See It’s about the guarantees.

I think it would also make sense here for the connection process to monitor the consumer when it subscribes, then do the equivalent of unsub if it gets a down message. This ensures the state is correct no matter if the consumer cleans up after itself or not.

For shutdown, perhaps offer a graceful shutdown function that asks the consumers to stop and waits for their reply, rather than rely on the supervision tree’s shutdown? Otherwise as you noticed you need to trap exits in workers, which is not ideal.

(By the way the ConsumerSupervisor you linked is not a supervisor, so I would suggest naming it differently. It sounds closer to a manager.)

1 Like

@dom thanks for the suggestions. Renaming ConsumerSupervisor and ConnectionSupervisor definitely makes sense to avoid confusion with the OTP supervisor name.

I re-read “It’s about the Guarantees” and that makes a lot of sense about trying to guarantee that our local proxy to the remote service is available during startup, but let it return error messages when it isn’t connected yet.

You mentioned that the server should monitor the workers rather than the workers monitoring the server, but what happens if the whole server crashes for some reason other than a connection failure? If we add more logic to the server process it seems like we are increasing the probability of a bug somewhere in that server. If the whole thing crashes and gets restarted by its supervisor it won’t know anything about the previous subscriptions and the consumers will still be waiting for messages. Are there examples you know of where a connection proxy puts this long-lived data some place outside its local state like an ETS table?

I liked the idea of separating the concerns of managing transient state (ie the last 8 bytes we received that didn’t make up an entire protocol message) from the long-lived state of the consumers, but it looks like other nats clients follow your suggestion so maybe I should follow the road that’s been paved. :thinking:

You mentioned that the server should monitor the workers rather than the workers monitoring the server

Not “rather”, I meant “in addition to”.

I would suggest looking at Phoenix.PubSub.Local as an example. The subscription state is held in ETS, for performance rather than reliability reasons I believe, but the server still monitors the subscriber:

It gives the subscriber (i.e. the “worker”) the choice to link or not which is nice:

(note the Process.flag(:trap_exit, true) so the server does not crash if a worker crashes)

If we add more logic to the server process it seems like we are increasing the probability of a bug somewhere in that server. If the whole thing crashes and gets restarted by its supervisor it won’t know anything about the previous subscriptions and the consumers will still be waiting for messages.

Definitely consumers should link or monitor the server to avoid the last problem.

Are there examples you know of where a connection proxy puts this long-lived data some place outside its local state like an ETS table?

I can’t think of a specific example right now. Keeping the subscription state in a separate process / table is an option, but it’s not necessarily safer though. If the server process crashed (rather than just disconnect), presumably it’s for a reason - perhaps a subscription to an invalid topic name, or a topic that provides some data causing a crash in the parser? In cases like this if you preserve the subscription state the server may just crash in a loop until it brings the whole thing down. If you drop the subscription state and let the workers crash instead (or get a monitor down message), then the client app has more flexibility in how it handles the problem.

This is also a good read on the topic: JLOUIS Ramblings: On Erlang, State and Crashes

It’s really a tradeoff though, I think both approaches are potentially valid :slight_smile:

1 Like