Supervising a flow built with into_specs and through_specs

With the new functions through_specs and into_specs you can quickly build a Flow and connect it to GenStage producer_consumers and consumers.

Something like:

def start_link(_params) do
  flow
  |> Flow.through_specs(producer_consumer_spec)
  |> Flow.into_specs(consumer_spec)
end

I added this worker to my supervisor because I want to restart the consumers and producer_consumers if they crash.

But I have noticed this worker also starts a process called Flow.Coordinator with its own supervisor which “supervise” (I guess) the consumers and producer_consumers.

supervisor

As you can see, after MySupervisor I have process 315 which is a Flow.Coordinator, and process 316 which is a “Elixir.Supervisor.Default” and then the consumers/producer_consumers.

The issue is, if I kill one of those consumers at the right, 336, it doesnt get restarted, also if I kill a producer_consumer everthing is killed, included the Flow.Coordinator, but never gets restart.

However, if I kill the Flow.Coordinator or MySupervisor everything get restarted correctly because it’s being supervised.

How can I then supervise these consumers? can it be done with any option when using Flow.into_specs or though_specs?

Should I instead start up the producer_consumers and consumers workers, add them in MySupervisor and then use Flow.into_stages and through_stages to subscribe to already running processes?

Can you please provide a small app that builds this tree so I can take a look at it?

Hello,

I managed to reproduce this issue with this code:

My application:

defmodule TestDep.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = [
      %{
        id: TestDep.Flow,
        start: {TestDep.Flow, :start_link, [[]]}
      }
    ]
    opts = [strategy: :one_for_one, name: TestDep.Supervisor]
    Supervisor.start_link(children, opts)
  end

end

My flow:

defmodule TestDep.Flow do

  use Flow

  def start_link(_param) do
    consumer_spec = {TestDep.Consumer, []}
    specs = [{consumer_spec, []}]

    stream = File.stream!("file.txt")

    stream
    |> Flow.from_enumerable()
    |> Flow.into_specs(specs)
  end
end

My consumer:

defmodule TestDep.Consumer do
  use GenStage

  def start_link(_param) do
    IO.puts "starting link consumer"
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    IO.puts "starting worker consumer"
    {:consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do
    Process.sleep(3000)
    IO.puts "inspecting consumer events"
    Enum.each(events, fn(event) ->
      IO.inspect event
    end)
    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end

end

Then my observer looks like this:

Where 181 is the Flow.Coordinator, 182 is the Elixir.Supervisor.Default which I didnt launch, 183 is a producer and the other one is my consumer.

If I kill now my consumer, Elixir.TestDep.Consumer it doesnt get restarted:

However if I kill the Flow.Coordinator 181 it recreates whole flow with the consumer, as expected.

I think I am missing something obvious here, the consumer should be restarted right?

Killing the consumer should restart the flow. What is the exit reason of the consumer?

Hello Jose,

I tried adding terminate/2 callback but when killing it from the observer it didnt ouput any log

   def terminate(reason, stats) do
     IO.puts "consumer terminated because of #{inspect reason}"
       inspect stats
     :ok
   end

Anyway I can for example crash it provoking an exception and it will output something like this log:

15:08:38.249 [error] GenServer TestDep.Consumer terminating
** (ArithmeticError) bad argument in arithmetic expression
(testdep) lib/worker.ex:64: TestDep.Consumer.handle_events/3
(gen_stage) lib/gen_stage.ex:2315: GenStage.consumer_dispatch/6
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_consumer", {#PID<0.198.0>, #Reference<0.3584978031.2936012803.46743>}, [state]

And in the observer I can see the consumer died and didnt restart

For example, in terminate/2 I receive this output in my prod generating an undef exception:

consumer terminated because of {:undef, [{Poison, :encode!, [], []}, {Consumer, :handle_events, 3, [file: 'consumer.ex', line: 36]}, {GenStage, :consumer_dispatch, 6, [file: 'lib/gen_stage.ex', line: 2315]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 616]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 686]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}

Can you push the small repo to github so i can clone it and try it out? Thanks.

Can you check this repo? https://github.com/lapinkoira/supervised_flow

Just run iex -S mix and kill the consumer through the :observer.start, it wont be recreated as it should be

1 Like

Fixed in Flow master. Can you please give it a try? if it also works for you, I will do a new release. Thanks for the mechanism to reproduce the bug!

4 Likes

Works like a charm! awesome! its restarting correctly the consumer.

1 Like

v0.14.2 released!

2 Likes

Thanks, awesome time response