How does one link a multi-stage GenStage process on startup when the producer_consumer stages are dynamically started?
In a four stage process there are the following workers:
- producer (1)
- producer_consumer (1 to N)
- producer_consumer (1 to N)
- consumer (1)
On application start, I want the supervisor to start all workers. I also want the consumers to subscribe to their producers in their init functions for fault tolerance. However, the application seems to be having trouble setting these connections on start-up. I receive ** (EXIT) no process
errors; the producer_consumer stages can’t subscribe to their producers. Below is my sample implementation for the dynamic start.
Can this be done?
Supervisor children specification (N = 5)
children = [worker(StageOne, []), worker(StageFour, [])]
stage_two =
for <- 1..5 do
worker(StageTwo, [id], id: id)
end
stage_three =
for <- 1..5 do
worker(StageThree, [{id, count}], id: id)
end
children ++ stage_two ++ stage_three
Consumer start_link
and init
for StageThree
##StageThree
def start_link({id, count}) do
name = :"#{__MODULE__}#{id}"
GenStage.start_link(__MODULE, count, name: name)
end
def init(count) do
stage_two =
for id <- 1..count do
:"Elixir.StageTwo#{id}"
end
{:producer_consumer, 0, subscribe_to: stage_two}
end