Supervised process initialisation that depends on information from external system?


I have a real-time data pipeline that consumes events from a Kafka queue and indexes them into a database.

For this I use GenStage with the PartitionDispatcher. As part of the producer startup, I fetch the number of partitions using the Kafka API, and use these to set the partitions options for the producer. Example:

  def init(_) do
    opts = [strategy: :rest_for_one, name: __MODULE__]
    supervise(child_spec(), opts)

  defp child_spec do
    children = 
        worker(KafkaEventCollector, [partitions: Kafka.get_partitions()])

Problem is that if the Kafka cluster is unavailable at startup, the supervisor will die.

What is the best way to handle a situation like this in which the supervised process (the producer) depends on information from an external service, as part of its initialization?

You need to crash or do it lazily. Doing it lazily can be done by starting a process that waits for kafka and then starts the rest of the tree. Here is how your tree starts:

* one for one supervisor
  * temporary worker (can be a genserver that pings kafka)

Then the temporary worker starts and waits until kafka is available. When kafka becomes available, it calls Supervisor.start_child with the GenStage child specification. You will end up with a tree like this:

* one for one supervisor
  * temporary worker (can be a genserver that pings kafka)
  * GenStage

You can start anything there, not only a GenStage.

The strategy is one for one because once you call start_child and it returns {:ok, _}, then the GenStage process is permanently added to the supervisor (until the supervisor itself shuts down or restarts). If you run the temporary worker again, it will try to call start_child with a type specification that already exists.


Works perfectly, thanks!

1 Like

There is an error in my previous explanation. The supervisor should be one_for_one because after start_child runs, the kafka child is permanently added to the supervisor. If you run the temporary worker again, then it would attempt to start a child that already exists. I have fixed the original comment as to not confuse others in the future.