Handling errors in Flow, knowing which stage raised the exception

Hi everyone,

I have a Flow pipeline with several stages and when an exception is raised I would like to know which stage was concerned?

For instance, in my use case, I have 5 stages, 4 only reading/searching/filtering data, and the last one for writing data. Like this:

retrieve()
    |> Flow.from_enumerable()
    |> Flow.partition(hash: fn {u, s} -> {{u, s}, rem(u.id, 10)} end)
    |> Flow.uniq_by(fn {user, _} -> user.id end)
    |> Flow.partition()
    |> Flow.map(&retrieve_data/1)
    |> Flow.partition(window: Flow.Window.count(100))
    |> Flow.reduce(fn -> "" end, &create_bulk_data/2)
    |> Flow.emit(:state)
    |> Flow.partition()
    |> Flow.map(&save/1)
    |> Flow.run

If any of my stages fail, how can I know which one failed? So that, if a “reader” stage fails I can just relaunch the Flow, but if the “writer” stage fails I try a rollback before retrying.

It would be nice to be able to give a name to each stage, and to get it in the catch clause.

Thank you for your help!

3 Likes