How to shutdown MyApp.Repo after a Broadway pipeline has drained

I have a Broadway Pipeline with a custom producer that is just using postgres to store tasks in normal table.

When I restart my application, I’m losing access to MyApp.Repo in different ways. Sometimes individual Processors cannot write to the database, but they will get acknowledged fine. Other times, acknowledgements will get lost. No matter what always with errors:

[error]
[info] Postgrex.Protocol (#PID<0.3666.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.4048.0> exited

This makes sense why it’s happening — MyApp.Repo is getting shut down prior to MyApp.Pipeline. However, MyApp.Repo comes before MyApp.Pipeline in my supervision tree, so I’m not sure what the issue is. I assume this is because it takes time to drain the messages, but that doesn’t help me much in finding a solution.

I imagine there is something incredibly simple I can do with OTP to resolve this, but I’m unsure of what. My current attempt is to add the following to my application.ex

@impl true
def prep_stop(state) do
  :ok = Broadway.stop(MyApp.Pipeline, :shutdown)
  state
end

but based on how little information I can find on the prep_stop callback, I’m assuming this is not the best OTP way to go about things.

If what you’re saying is your tree looks like:

[
...
YourApp.Repo,
...
Broadway
]

Then that should work properly. Children are shut down in reverse order of boot, and the shutdown process is synchronous. That is a to say that the Broadway pipeline should be totally shutdown by then.

Is there any extra task spawning or async work you’re doing in the broadway pipeline that might leave processes floating around?

1 Like

Correct, that is what my tree looks like (although I’m not requiring the Broadway module in my supervision tree, but MyApp.Pipeline, where MyApp.Pipeline starts with use Broadway).

I’m not doing anything too crazy in the pipeline (at least not task wise). Most of them are API requests. There are some messages that use FLAME, but this error happens on all messages. When a restart occurs during a FLAME call, the FLAME machines err appropriately, the messages fail, get acked, then the ack fails due to Postgres being disconnected:

The issue occurs regardless of the task contents though

EDIT: It should be noted that these 4 failures in particular are because of application defined above MyApp.Pipeline all terminating before Broadway can drain.

UPDATE: Many iterations later, and I believe I was able to resolve it by extending the kill_timeout configuration setting in my fly.toml. So the top of my fly.config.toml file looks like:

app = "my-app"
kill_signal = "SIGTERM"
kill_timeout = 300

300 (seconds) is the maximum value on fly, the default value is 5 (seconds).

Will report back if I notice it regress!

1 Like