With the new functions through_specs and into_specs you can quickly build a Flow and connect it to GenStage producer_consumers and consumers.
Something like:
def start_link(_params) do
flow
|> Flow.through_specs(producer_consumer_spec)
|> Flow.into_specs(consumer_spec)
end
I added this worker to my supervisor because I want to restart the consumers and producer_consumers if they crash.
But I have noticed this worker also starts a process called Flow.Coordinator with its own supervisor which “supervise” (I guess) the consumers and producer_consumers.
As you can see, after MySupervisor I have process 315 which is a Flow.Coordinator, and process 316 which is a “Elixir.Supervisor.Default” and then the consumers/producer_consumers.
The issue is, if I kill one of those consumers at the right, 336, it doesnt get restarted, also if I kill a producer_consumer everthing is killed, included the Flow.Coordinator, but never gets restart.
However, if I kill the Flow.Coordinator or MySupervisor everything get restarted correctly because it’s being supervised.
How can I then supervise these consumers? can it be done with any option when using Flow.into_specs or though_specs?
Should I instead start up the producer_consumers and consumers workers, add them in MySupervisor and then use Flow.into_stages and through_stages to subscribe to already running processes?