I have this testing Flow
, just reading a file, transforming its content into a Flow
and sending it through a producer_consumer
and a consumer
(genstages both)
defmodule TestDep.Worker do
use Flow
alias TestDep.{Consumer, ProducerConsumer}
def start_link() do
flow =
File.stream!("file.txt")
|> Flow.from_enumerable()
{:ok, consumer} = Consumer.start_link()
{:ok, producer_consumer} = ProducerConsumer.start_link()
{:ok, _ref} = GenStage.sync_subscribe(consumer, to: producer_consumer)
Flow.into_stages(flow, [producer_consumer])
end
end
defmodule TestDep.Consumer do
use GenStage
def start_link() do
IO.puts "starting link consumer"
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
IO.puts "starting worker consumer"
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
10/0
IO.puts "inspecting consumer events"
Enum.each(events, fn(event) ->
IO.inspect event
end)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
defmodule TestDep.ProducerConsumer do
use GenStage
def start_link() do
IO.puts "starting link producer consumer"
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
IO.puts "starting worker producer consumer"
{:producer_consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
#10/0
IO.puts "inspecting producer consumer events"
Enum.each(events, fn(event) ->
IO.inspect event
end)
{:noreply, events, state}
end
end
I then added it to a supervisor, and added restart permanent option (also tried with the default):
defmodule TestDep.Application do
@moduledoc false
use Application
use Supervisor
def start(_type, _args) do
children = [worker(TestDep.Worker, [], restart: :permanent)]
opts = [strategy: :one_for_one, name: TestDep.Supervisor]
Supervisor.start_link(children, opts)
end
end
This is how I launch the application from the mix.ex file
def application do
[
extra_applications: [:logger],
mod: {TestDep.Application, []}
]
end
The thing I have noticed is every unhandled exception I have in the handle_events method crashes the flow and dont restart it again, for example, if you didnt notice in my code I have in the consumer:
def handle_events(events, _from, state) do
#10/0
IO.puts "inspecting producer consumer events"
Enum.each(events, fn(event) ->
IO.inspect event
end)
{:noreply, events, state}
end
a commented 10/0, if I uncomment it then it totally crashes with an ArithmeticError
:
16:29:40.126 [info] GenStage consumer #PID<0.196.0> is stopping after receiving cancel from producer #PID<0.197.0> with reason: {:badarith, [{TestDep.ProducerConsumer, :handle_events, 3, [file: 'lib/worker.ex', line: 59]}, {GenStage, :consumer_dispatch, 6, [file: 'lib/gen_stage.ex', line: 2315]}, {GenStage, :take_pc_events, 3, [file: 'lib/gen_stage.ex', line: 2488]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 616]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 686]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}
16:29:40.126 [error] GenServer #PID<0.196.0> terminating
** (ArithmeticError) bad argument in arithmetic expression
(testdep) lib/worker.ex:59: TestDep.ProducerConsumer.handle_events/3
(gen_stage) lib/gen_stage.ex:2315: GenStage.consumer_dispatch/6
(gen_stage) lib/gen_stage.ex:2488: GenStage.take_pc_events/3
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:DOWN, #Reference<0.2120331187.593494020.180438>, :process, #PID<0.197.0>, {:badarith, [{TestDep.ProducerConsumer, :handle_events, 3, [file: 'lib/worker.ex', line: 59]}, {GenStage, :consumer_dispatch, 6, [file: 'lib/gen_stage.ex', line: 2315]}, {GenStage, :take_pc_events, 3, [file: 'lib/gen_stage.ex', line: 2488]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 616]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 686]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}
State: :the_state_does_not_matter
How can I handle the restarting? is not done automatically? should I subscribe to def terminate
or launch the flow differently?