GenStage: subscribing multiple stages on start-up

How does one link a multi-stage GenStage process on startup when the producer_consumer stages are dynamically started?

In a four stage process there are the following workers:

  1. producer (1)
  2. producer_consumer (1 to N)
  3. producer_consumer (1 to N)
  4. consumer (1)

On application start, I want the supervisor to start all workers. I also want the consumers to subscribe to their producers in their init functions for fault tolerance. However, the application seems to be having trouble setting these connections on start-up. I receive ** (EXIT) no process errors; the producer_consumer stages can’t subscribe to their producers. Below is my sample implementation for the dynamic start.

Can this be done?

Supervisor children specification (N = 5)

children = [worker(StageOne, []), worker(StageFour, [])]
stage_two =
  for <- 1..5 do
    worker(StageTwo, [id], id: id)
  end
stage_three = 
  for <- 1..5 do
    worker(StageThree, [{id, count}], id: id)
  end
children ++ stage_two ++ stage_three

Consumer start_link and init for StageThree

##StageThree
def start_link({id, count}) do
  name = :"#{__MODULE__}#{id}"
  GenStage.start_link(__MODULE, count, name: name)
end

def init(count) do
  stage_two = 
    for id <- 1..count do
      :"Elixir.StageTwo#{id}"
    end
  {:producer_consumer, 0, subscribe_to: stage_two}
end
2 Likes

Assuming you are starting in order (first producers, then producer_consumers, then consumers), the example above should work. Can you sanity check your ordering?

2 Likes

That did it! They need to ordered correctly in the list sent to Supervisor.start_link. If a producer_consumer crashes, it should automatically subscribe to its producers when it comes back online, but downstream consumers will need to manually subscribe to the restarted producer_consumer, correct?

If anyone is interested, I put together a sample app which demonstrates this dynamic linking of multiple stages. Below is the link to it on github (instead of posting all the code here).

1 Like

@wfgilman I notice that you say:

but downstream consumers will need to manually subscribe to the restarted producer_consumer

in the case a producer crashes. However, what is the correct way to detect that one of the producers has crashed?

I assume once this is detected then it is just a matter of invoking sync_subscribe/2 to do this.

1 Like

That’s a good question. I’m not sure what the best practice is.

1 Like