Error in Flow not stopping process timely

hey, perhaps somebody can help me. The following is a super simplified version of some code that deals with external apis.
my problem is that as you can see, the function inside the stream is raising an error. But, sometimes the inspects at the bottom are reached, before the process is killed.

defmodule Test do
  def run do
    result =
      (fn (page) ->
        IO.inspect page, label: "page"
        raise "ups"
      |> create_stream
      |> Flow.from_enumerable()
      |> -> %{} end)
      |> Enum.to_list
    IO.inspect "It should not reach here, but sometimes does"
    IO.inspect result, label: "result"

  def create_stream(api_func) do
    Stream.resource(fn -> 1 end, &api_func.(&1), fn _ -> :ok end)

When executing, sometimes I get:

$ mix run test.exs
page: 1
"It should not reach here, but sometimes does"
result: []
[error] GenServer #PID<0.502.0> terminating
** (RuntimeError) ups
    test.exs:6: anonymous fn/1 in
    (elixir) lib/stream.ex:1285: Stream.do_resource/5
    (gen_stage) lib/gen_stage/streamer.ex:18: GenStage.Streamer.handle_demand/2
    (gen_stage) lib/gen_stage.ex:2170: GenStage.noreply_callback/3
    (gen_stage) lib/gen_stage.ex:2209: GenStage."-producer_demand/2-lists^foldl/2-0-"/3
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:"$demand", :forward}}
State: #Function<0.55142349/1 in GenStage.Streamer.init/1>

if any of the work done by flow raised, I would expect the whole thing to fail in the Enum.to_list call, not afterwards.
Is there anything I can do stop the processing when running Enum.to_list?

The error is raised in a process being spawn by Flow, not in the process that is being executed in. So that Flow process gets halted rather than your calling code, at least initially. If that raise is caught by a rescue somewhere else,
the error will not causing to fail.

(… and the reason you see the error after the IO output has to do with when the logger gets around to printing the output that has been queued (it is also in a different process).)

So it is, in a fashion, failing in the Flow pipeline and the Enum.to_list call is generating an empty list …

I believe what is happening to cause the difference in output between runs: When I run this in an iex shell, the first time the exception bubbles up to the shell and causes it to die which then also prevents the rest of the run function from being called. Interestingly, this also happens if I first launch observer:start() … so … nothing is catching the thrown exception the first time, and then after that it is being caught at the top level somewhere with a rescue type statement, though I am not sure exactly where, preventing the run() function from being interrupted.

In any case … if something does not have a rescue block in your code, when run as an application (e.g. not in iex) then this should indeed end up crashing whatever process it is running in.

Is this seemingly non-deterministic behavior only happening when you run it in e.g. iex -S mix or also when run directly with mix run or similar?

hey @aseigo, thanks for your response

The error happens both when running it through iex -S mix and through mix run ...
I do expect the flow pipeline to fail, and therefore they process to fail as well. My problem is that process can continue after flow process dies, and that the Enum.to_list returns an empty list.
It is my understanding that it shouldn’t return a list at all, as the flow process failed and therefore there is no way we can return a list.

Under the impression that i’m hitting a bug, i’ve opened a bug in Flow itself

I see it is a race condition between the supervisors. Fun, fun. At least it looks like progress is being made in that bug report, though! Best of luck, and thanks for reporting it upstream … better software to come :slight_smile: