How to deal with exceptions inside Flow?

Hello,

I do understand that it’s an another process and thus matter is quite complicated, but, nevertheless can’t stop myself from asking a question here: is it possible to somehow “rescue” from error raised inside the Flow.reduce?

3 Likes

It depends on if you want to rescue all errors or only specific exceptions so you need to elaborate on why you want to catch errors and what kind of errors. But you can safely have try/rescue and try/catch inside flow.

2 Likes

That’s a nice tip! And I didn’t post the example, so let me fix this:

try do 
  1..10
  |> Flow.from_enumerable
  |> Flow.each(fn _ -> raise  CSV.RowLengthError end)
  |> Enum.to_list
rescue 
  e in CSV.RowLengthError -> IO.puts "Got ya!"
end

In our case, we want to rescue outside of the flow, and it will be really cool to have all of them terminated even if an error occurs only in one of them.

2 Likes

You can’t rescue exceptions from processes the way you are doing it above.

It’s not clear from Flow docs what happens for errors (or I may be missing something) but my guess is that the stages are linked to the process that started the flow. That means that if one stage crashes then the calling process and all other stages will crash due to links.

If you want to check the crash reason you need to trap exits and check the EXIT message, but keep in mind that if you trap exits then the other stages won’t be automatically killed by the links.

@josevalim who worked on Flow can explain better how errors in Flow should be handled.

2 Likes

I need more information about why are you raising an error and why you want to rescue it. An error in a flow will cause the whole flow to abort.

3 Likes

@josevalim, we have two module - one for CSV parsing which return a stream of entries and other for bulk insertion in the database.
Bulk Insertion module takes a stream and validates each entry using Flow:

entries
    |> Flow.from_enumerable
    |> Flow.partition
    |> Flow.reduce(fn -> %{valid: [], invalid: [], emails: []} end, &(push_entry(&1, &2, columns)))
    |> Flow.departition(fn -> %{valid: [], invalid: [], emails: []} end, &merge_entries/2, &done_parsing/1)
    |> Enum.to_list
    |> Enum.at(0)

If CSV is malformed that will cause the library to raise and error, and because of parsing is lazy we can’t handle it earlier.
Our goal here is to inform user gracefully rather than going down with 500 error.

1 Like

Can you please open up an issue? It is not straight-forward right now, so I believe we should improve that.

1 Like

I have pushed a commit to master that makes sure the Flow will exit with exit reason instead of relying on a linked process. So you should be able to catch those like this:

flow = ... build your flow ...
try do
  flow |> Enum.to_list |> Enum.at(0)
catch
  :exit, exit -> ...
end
3 Likes

done https://github.com/elixir-lang/gen_stage/issues/132