Understanding GenStage/Flow Error Propagation

I am building a data processing pipeline which may sporadically fail. I would like to know when Producers/Consumers fail, so I can keep track of running pipelines. When I crash my producer, other processes crash with the same error message, which is confusing.

Consider the following setup:

defmodule CrashingProducer do
  use GenStage
  def init([]), do: {:producer, 0}
  def handle_demand(_, _), do: raise RuntimeError, "producer crashed"
end

# single error when commented, as expected, 
#   two error messages when included?
# Process.flag(:trap_exit, true)

{:ok, prod} =
  GenStage.start_link(CrashingProducer, [])
  |> IO.inspect(label: "prod pid")

spawn_link(fn ->
  Flow.from_stages([prod], max_demand: 1, stages: 1)
  |> Flow.run(link: false)
end)
|> IO.inspect(label: "flow pid")

:timer.sleep(1000)

When running it as is, the producer crashes, as expected with a single error.
When adding the exit trap, the GenStages started by Flow also crash, repeating the same error message. It appears as if the error originated in another process. What is the design decision behind this? It does not make sense to me.
Would it not be easier if the consumers :shutdown upon an error in the consumer? Can I do this manually?

Can I ignore these errors somehow? I already know that my producer failed, so I can shut everything else down manually. It is inconvenient to trap and handle these exits as well.

This is especially annoying when there are multiple Flow Stages and the terminal is spammed with the same error message over and over.

I am now using Task.Supervisor.async_stream and a pipeline definition using Keyword lists. Something like this:

stages = [
  add_one: fn i -> i + 1 end,
  sub_one: fn i -> i - 1 end
]
input_stream = 1..10 |> Enum.to_list()

{:ok, sup} = Task.Supervisor.start_link(name: :tasksup)

reduce_fn = &Enum.reduce(stages, &1, fn {_name, stage}, item -> stage.(item) end)
Task.Supervisor.async_stream(sup, input_stream, reduce_fn)
|> Stream.take_while(fn
  {:ok, _} -> true
  _ -> false
end)
|> Stream.map(fn {:ok, item} -> item end)
|> Enum.to_list()
|> IO.inspect()

But this too will stop on the first return value that’s not {:ok, stuff}, is that your intention?

Sometimes yes, sometimes no :slight_smile:

When failed items can be discarded the In others I would replace

|> Stream.take_while(fn
  {:ok, _} -> true
  _ -> false
end)
|> Stream.map(fn {:ok, item} -> item end)

with a filter dropping everything but {:ok, item}s.

When I need to react to failures I can return the :ok/:exit tuples to the caller as well.

My bad, I misread the function name. Apologies for the noise.

1 Like