Flow - error handling

Let’s say I have the following:

[A]
 |
 |
 |
[B]
 |\
 | \
 |  \
[C] [E]
 |   |
 |   |
 |   |
[D] [F]

A is a list of strings
B fetches data from an API using the given string
C performs some kind of work on the data
D writes to a db
E logs errors to a webservice
F posts a message to slack if there are a lot of errors in a small time window

Is it possible to handle errors like with Flow? As in, B can output 2 types of events: data and errors. Data is consumed by C and errors are consumed by E. In GenStage I know I can use a GenStage.PartitionDispatcher with a hash function that assigns events to one partition or the other depending on if they are regular data or errors. But I’m wondering if this is possible within the Flow API?

The best I can come up with is that B would send messages to a producer stage on error, which would be the start of a separate Flow (that consists of E and F). Is there a cleaner way?

1 Like

I’ve been thinking about this a bit and come up with something I like, so I’m gonna answer my own question here in case other people run into this.

So, there’s 2 things to know. This is demand driven and I’m trying to utilise backpressure to avoid a build up of data when the next step in the flow is busy. By it’s very nature, if C is busy and E has run out of stuff to process, E can’t demand more data. Because there’s no way of knowing if B will produce data for C or E, B asking for demand could just add a build up of data on C even though C is trying to apply backpressure to avoid that in the first place. Put simpler: you can’t ask an API (B) for errors (E) :slight_smile: So, C and E may as well be the same step so to speak. There’s no branch, at least ask far as Flow is concerned.

Then how does the right data get to C and E? This is the second thing. I make B output “tagged tuples” like {:ok, data} and {:error, data}. This way C and E can pattern match to receive the data relevant for them. As long as they include a “pass through” default pattern match, the data for later steps flows down. So if you arrange the steps A -> B -> C -> E with C passing through (or “ignoring”) {:error, data}s, then the data makes it to E. Put D -> F on the end and ensure they also passthrough and you’re done.

It looks something like this (removing partitioning and simplifying to all maps):

Flow.from_enumerable(A.data)
|> Flow.map(&B.do_stuff/1)
|> Flow.map(&C.do_stuff/1)
|> Flow.map(&E.do_stuff/1)
|> Flow.map(&D.do_stuff/1)
|> Flow.map(&F.do_stuff/1)
|> Flow.run

# Example passthrough work module
defmodule C do
  def do_stuff({:ok, data}) do
    # actually do stuff
  end
  # passthrough stuff that isn't for C (ie, errors)
  def do_stuff(other_data_passthrough), do: other_data_passthrough
end

There’s also different ways of arranging the overlapping steps. Like, if it’s really important that errors propagate to your services quickly, you’d arrange it A -> B -> E -> F -> C -> D so that C and D can’t hold up E and F. And vice versa.

I’m starting to wonder if there’s a way of abstracting this structure up into Flow. Like extending map and it’s friends to take an atom, and Flow handles the pattern matching and passthrough for you. Hmm.

1 Like

Actually, yes, abstracting this higher is possible and really straightforward:

defmodule TaggedFlow do
  def map(flow, tag, transform) do
    Flow.map(flow, fn tagged_data ->
      case tagged_data do
        {^tag, data} -> transform.(data)
        _ -> tagged_data
      end
    end)
  end

  # And each/3, filter/3, flat_map/3, reduce etc.
end

Then we can go:

Flow.from_enumerable(A.data)
|> Flow.map(&B.do_stuff/1)
|> TaggedFlow.map(:ok, &C.do_stuff/1)
|> TaggedFlow.each(:ok, &D.do_stuff/1)
|> TaggedFlow.each(:error, &E.do_stuff/1)
|> TaggedFlow.each(:error, &F.do_stuff/1)
|> Flow.run

Then there’s no modifications for passthrough or tag pattern matching needed on the actual work modules.

7 Likes

FWIW, I came to a similar conclusion separately at Handling errors in Flow using :ok, :error tuples, though yours is cleaner and allows the error handling to be pushed to the end of the flow. If you’ve come up with any other approaches or insights on Flow error handling I’d love to hear. Thanks.