I am quite new to Elixir and trying to understand GenStage flow.
I created a producer that emits a finite list of numbers
defmodule Producer do
use GenStage
def start_link(_) do
GenStage.start_link(Producer, Enum.to_list(1..10), name: Producer)
end
def init(list) do
{:producer, list}
end
def handle_demand(demand, state) when demand > 0 do
case Enum.split(state, demand) do
{[],[]} ->
GenStage.async_info(self(), :stop)
{:noreply, [], []}
{pulled, remaining} ->
{:noreply, pulled, remaining}
end
end
def handle_info(:stop, state) do
{:stop, :normal, state}
end
end
and ConsumerSupervisor that starts new children for each number
defmodule ConSupervisor do
use ConsumerSupervisor
@opts [
strategy: :one_for_one,
subscribe_to: [{Producer, max_demand: 2, min_demand: 0}],
max_restarts: 20
]
def start_link(arg) do
ConsumerSupervisor.start_link(__MODULE__, arg)
end
def init(_arg) do
children = [%{id: Consumer, start: {Consumer, :start_link, []}, restart: :transient}]
ConsumerSupervisor.init(children, @opts)
end
end
Children are just tasks with 30% rate of success
defmodule Consumer do
def start_link(event) do
Task.start_link(fn ->
Process.sleep(1000)
if :rand.uniform(3) == 3 do
IO.puts("task failed with #{event}")
raise("ooops")
end
num = Kernel.inspect(event)
IO.puts("Inserted #{num}")
end)
end
end
I want to terminate Producer the way that it can be assured that all Tasks are completed. For now when it terminates it emits cancel for ConsumerSupervisor that discard all Tasks retrying to finish successfully.
[info] GenStage consumer #PID<0.433.0> is stopping after receiving cancel from producer #PID<0.431.0> with reason: :normal
I feel like it should be tricky and require some consecutive cast/call messaging, but maybe there is more straightforward way to do this?