How to Ensure Broadway Starts and Stops in Sync with Ecto.Repo Connections

Hi everyone,

I’m working on a project that processes billing data using Broadway to consume messages from Kafka and stores them in a database via Ecto. I need help ensuring that Broadway’s lifecycle is tightly coupled to the connection state of the database. Here’s the problem:

  1. Ecto.Repo is a supervisor: It can be running even when no database connections are active.
  2. Broadway depends on Ecto: Starting Broadway after Ecto.Repo doesn’t guarantee that it will be able to write to the database, as connections to the database might not yet be established.
  3. Handling dropped connections: If database connections are dropped for any reason, I want Broadway to stop immediately and only restart after Ecto has successfully reconnected to the database.

The main motivation behind this behaviour is the nature of the data. Since we’re processing billing data, we need to avoid situations where Kafka messages are consumed but fail to be stored in the database. Additionally, Kafka in this setup doesn’t retry failed messages, so ensuring Broadway only runs when the database is available would be very nice.

I was thinking I could have a supervisor with a :rest_for_one strategy, have Project.Repo as its first child, and then add a “sentinel” process, that would not start until it checks that ecto is completely online, and would die if it notices that ecto connections are dying (by monitoring connections). Broadway would come after the sentinel. This feels like I’m depending on ecto internals a bit too much though, and maybe I’m just over complicating the issue. Any advice would be greatly appreciated

1 Like

I probably wouldn’t make it tied to connections because a disconnect is likely not the only way it can fail. For example, maybe the database is overloaded and it takes too long to reply. Maybe you shipped a bug that does the wrong query. Etc.

I’d consider using a circuit breaker or a similar tool with the specific purpose of dealing waiting and retrying operations against the “database system”.

3 Likes

We now decided to go with a circuit breaker, that will stop broadway if there are too many failures, and write failures to a different topic in kafka (we don’t want to loose them, so persistence is a must, and we expect kafka to be available since we are getting messages from it). A separate pipeline will consume the failure topic, and place its failures back on the same failure topic (we don’t care much about ordering). If a failed message succeeds, we will resurrect the main pipeline if it is currently dead. When we give up retrying on a message we will log the whole message, and deal with it manually.

Thank you for answering. I really appreciate the quick reply.