Handling errors in Flow using :ok, :error tuples

I’m new to Flow and am trying to figure out the best way to deal with errors that may occur during processing.

For example, let’s say I’m processing rows of a CSV file and ultimately writing them to a new file, in a flow like below.

file
|> File.stream!()
|> CSV.decode!()
|> Flow.from_enumerable()
|> Flow.map(&process_row/1)
|> Flow.map(&more_process_row/1)
|> Flow.each(&write_to_file/1)
|> Flow.run

If process_row/1 runs into an error, I don’t want the stage to die. I want that row to be tagged as an error and eventually logged to a file. Since the error is taking place in a Flow.map call I need to return something for the failed row, so I was thinking I would return {:ok, ...} and {:error, ...} tuples in each function. But then it seems I would need to add an additional Flow.map and Flow.reject after each function that potentially returns an error, to allow for logging the errors and removing them from the flow to avoid tripping up subsequent function calls, like the following.

file
|> File.stream!()
|> CSV.decode!()
|> Flow.from_enumerable()
|> Flow.map(&process_row/1)
|> Flow.map(&log_error_tuples/1)**
|> Flow.reject(&error_tuples/1)**
|> Flow.map(&more_process_row/1)
|> Flow.map(&log_error_tuples/1)
|> Flow.reject(&error_tuples/1)
|> Flow.each(&write_to_file/1)
|> Flow.run

Alternatively I could have each function in the flow (other than the initial call of process_rows) assume an :ok, :error tuple will be passed in and just pass the errors through unmodified, and then do the logging and filtering at the end once before writing the results out. This feels awkward though.

Does anyone have better ideas for how to handle this?

Thanks.
Justin

Here’s some example code that handles errors by matching {:error, error} tuples, handling them, and rejecting them from the flow.

https://github.com/balduncle/flow_errors/blob/master/lib/flow_errors.ex