Supervisor not restarting flow worker afterexception raised

I have this testing Flow, just reading a file, transforming its content into a Flow and sending it through a producer_consumer and a consumer (genstages both)

defmodule TestDep.Worker do
  use Flow

  alias TestDep.{Consumer, ProducerConsumer}

  def start_link() do
    flow =
      File.stream!("file.txt")
      |> Flow.from_enumerable()

    {:ok, consumer} = Consumer.start_link()
    {:ok, producer_consumer} = ProducerConsumer.start_link()
    {:ok, _ref} = GenStage.sync_subscribe(consumer, to: producer_consumer)

    Flow.into_stages(flow, [producer_consumer])
  end

end

defmodule TestDep.Consumer do
  use GenStage

  def start_link() do
    IO.puts "starting link consumer"
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    IO.puts "starting worker consumer"
    {:consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do
    10/0
    IO.puts "inspecting consumer events"
    Enum.each(events, fn(event) ->
      IO.inspect event
    end)
    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end

end

defmodule TestDep.ProducerConsumer do
  use GenStage

  def start_link() do
    IO.puts "starting link producer consumer"
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    IO.puts "starting worker producer consumer"
    {:producer_consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do
    #10/0
    IO.puts "inspecting producer consumer events"
    Enum.each(events, fn(event) ->
      IO.inspect event
    end)
    {:noreply, events, state}
  end

end

I then added it to a supervisor, and added restart permanent option (also tried with the default):

defmodule TestDep.Application do
  @moduledoc false

  use Application
  use Supervisor

  def start(_type, _args) do
    children = [worker(TestDep.Worker, [], restart: :permanent)]
    opts = [strategy: :one_for_one, name: TestDep.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

This is how I launch the application from the mix.ex file

  def application do
    [
      extra_applications: [:logger],
      mod: {TestDep.Application, []}
    ]
  end

The thing I have noticed is every unhandled exception I have in the handle_events method crashes the flow and dont restart it again, for example, if you didnt notice in my code I have in the consumer:

  def handle_events(events, _from, state) do
    #10/0
    IO.puts "inspecting producer consumer events"
    Enum.each(events, fn(event) ->
      IO.inspect event
    end)
    {:noreply, events, state}
  end

a commented 10/0, if I uncomment it then it totally crashes with an ArithmeticError:

16:29:40.126 [info]  GenStage consumer #PID<0.196.0> is stopping after receiving cancel from producer #PID<0.197.0> with reason: {:badarith, [{TestDep.ProducerConsumer, :handle_events, 3, [file: 'lib/worker.ex', line: 59]}, {GenStage, :consumer_dispatch, 6, [file: 'lib/gen_stage.ex', line: 2315]}, {GenStage, :take_pc_events, 3, [file: 'lib/gen_stage.ex', line: 2488]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 616]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 686]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}

 
16:29:40.126 [error] GenServer #PID<0.196.0> terminating
** (ArithmeticError) bad argument in arithmetic expression
    (testdep) lib/worker.ex:59: TestDep.ProducerConsumer.handle_events/3 
    (gen_stage) lib/gen_stage.ex:2315: GenStage.consumer_dispatch/6
    (gen_stage) lib/gen_stage.ex:2488: GenStage.take_pc_events/3
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:DOWN, #Reference<0.2120331187.593494020.180438>, :process, #PID<0.197.0>, {:badarith, [{TestDep.ProducerConsumer, :handle_events, 3, [file: 'lib/worker.ex', line: 59]}, {GenStage, :consumer_dispatch, 6, [file: 'lib/gen_stage.ex', line: 2315]}, {GenStage, :take_pc_events, 3, [file: 'lib/gen_stage.ex', line: 2488]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 616]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 686]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}
State: :the_state_does_not_matter

How can I handle the restarting? is not done automatically? should I subscribe to def terminate or launch the flow differently?

The issue here is that you’re starting a bunch of processes within a single start link, which isn’t a proper way to supervise things. You need to have each entity there as a worker of the supervisor, instead of doing it all in a single start link.

1 Like

Placing it as separate workers how can you subscribe the consumer to the producer_consumer, I mean I need the pid or process name to link the consumer the producer_consumer

In the init/1 callback of every GenStage you can return processes you want it to subscribe to. There is an example in GenStage moduledoc showing how to start multiple stages under a supervisor.

ok, for GenStage is clear, I could do something like {:consumer, :the_state_does_not_matter, subscribe_to: [Producer.Consumer]} on the init function and subscribe to the producer-consumer

But what about the Flow I am using as producer on top of the producer-consumer and consumer, Flow.into_stages(flow, [producer_consumer]) from the doc:

Starts and runs the flow as a separate process which will be a producer to the given consumers.

that worker should be defined also separated, something like:

children = [
worker(this worker should be the Flow as producer),
worker(ProducerConsumer, ),
worker(Consumer, ),
]

Should I also define the subscribe_to in the init/1 in the producer-consumer to subscribe it to the flow stages process?

I am now trying to add the workers into the Supervisor to dont start all processes in a single start_link

defmodule TestDep.Worker do
  use GenStage
  def start_link() do
    IO.puts "starting producer"
    File.stream!("file.txt")
    |> GenStage.from_enumerable()
  end
end
defmodule TestDep.Consumer do
  use GenStage
  def start_link do
    IO.puts "starting link consumer"
    GenStage.start_link(__MODULE__, :ok)
  end
  def init(:ok) do
    IO.puts "starting worker consumer"
    {:consumer, :the_state_does_not_matter, subscribe_to: [TestDep.ProducerConsumer]}
  end
  def handle_events(events, _from, state) do
    Enum.each(events, fn(event) ->
      IO.inspect event
    end)
    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end
end
defmodule TestDep.ProducerConsumer do
  use GenStage
  def start_link do
    IO.puts "starting link producer consumer"
    GenStage.start_link(__MODULE__, :ok)
  end
  def init(m) do
    IO.puts "starting worker producer consumer"
    {:producer_consumer, :the_state_does_not_matter, subscribe_to: [TestDep.Worker]}
  end
  def handle_events(events, _from, state) do
    Enum.each(events, fn(event) ->
      IO.inspect event
    end)
    {:noreply, events, state}
  end
end
defmodule TestDep.Application do
  @moduledoc false
  use Application
  use Supervisor
  def start(_type, _args) do
    children = []
    children = [
      worker(TestDep.Worker, []),
      worker(TestDep.ProducerConsumer, []),
      worker(TestDep.Consumer, []),
    ]
    opts = [strategy: :one_for_one, name: TestDep.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

For the producer I am using GenStage.from_enumerable() since I dont have the producer_consumer reference to define Flow.into_stages(flow, [producer_consumer]) so I am not sure about that part.

The issue I am having is when starting the application it fails to start the ProducerConsumer worker

returned an error: shutdown: failed to start child: TestDep.ProducerConsumer

Apparently it couldnt connect correctly to its producer.

I am not sure if the issue is the GenStage.from_enumerable() from the Worker.start_link() or is the ProducerConsumer is just bad configured.

Some help would be appreciated!