mfclarke

mfclarke

Flow - error handling

Let’s say I have the following:

[A]
 |
 |
 |
[B]
 |\
 | \
 |  \
[C] [E]
 |   |
 |   |
 |   |
[D] [F]

A is a list of strings
B fetches data from an API using the given string
C performs some kind of work on the data
D writes to a db
E logs errors to a webservice
F posts a message to slack if there are a lot of errors in a small time window

Is it possible to handle errors like with Flow? As in, B can output 2 types of events: data and errors. Data is consumed by C and errors are consumed by E. In GenStage I know I can use a GenStage.PartitionDispatcher with a hash function that assigns events to one partition or the other depending on if they are regular data or errors. But I’m wondering if this is possible within the Flow API?

The best I can come up with is that B would send messages to a producer stage on error, which would be the start of a separate Flow (that consists of E and F). Is there a cleaner way?

Marked As Solved

mfclarke

mfclarke

Actually, yes, abstracting this higher is possible and really straightforward:

defmodule TaggedFlow do
  def map(flow, tag, transform) do
    Flow.map(flow, fn tagged_data ->
      case tagged_data do
        {^tag, data} -> transform.(data)
        _ -> tagged_data
      end
    end)
  end

  # And each/3, filter/3, flat_map/3, reduce etc.
end

Then we can go:

Flow.from_enumerable(A.data)
|> Flow.map(&B.do_stuff/1)
|> TaggedFlow.map(:ok, &C.do_stuff/1)
|> TaggedFlow.each(:ok, &D.do_stuff/1)
|> TaggedFlow.each(:error, &E.do_stuff/1)
|> TaggedFlow.each(:error, &F.do_stuff/1)
|> Flow.run

Then there’s no modifications for passthrough or tag pattern matching needed on the actual work modules.

Also Liked

mfclarke

mfclarke

I’ve been thinking about this a bit and come up with something I like, so I’m gonna answer my own question here in case other people run into this.

So, there’s 2 things to know. This is demand driven and I’m trying to utilise backpressure to avoid a build up of data when the next step in the flow is busy. By it’s very nature, if C is busy and E has run out of stuff to process, E can’t demand more data. Because there’s no way of knowing if B will produce data for C or E, B asking for demand could just add a build up of data on C even though C is trying to apply backpressure to avoid that in the first place. Put simpler: you can’t ask an API (B) for errors (E) :slight_smile: So, C and E may as well be the same step so to speak. There’s no branch, at least ask far as Flow is concerned.

Then how does the right data get to C and E? This is the second thing. I make B output “tagged tuples” like {:ok, data} and {:error, data}. This way C and E can pattern match to receive the data relevant for them. As long as they include a “pass through” default pattern match, the data for later steps flows down. So if you arrange the steps ABCE with C passing through (or “ignoring”) {:error, data}s, then the data makes it to E. Put DF on the end and ensure they also passthrough and you’re done.

It looks something like this (removing partitioning and simplifying to all maps):

Flow.from_enumerable(A.data)
|> Flow.map(&B.do_stuff/1)
|> Flow.map(&C.do_stuff/1)
|> Flow.map(&E.do_stuff/1)
|> Flow.map(&D.do_stuff/1)
|> Flow.map(&F.do_stuff/1)
|> Flow.run

# Example passthrough work module
defmodule C do
  def do_stuff({:ok, data}) do
    # actually do stuff
  end
  # passthrough stuff that isn't for C (ie, errors)
  def do_stuff(other_data_passthrough), do: other_data_passthrough
end

There’s also different ways of arranging the overlapping steps. Like, if it’s really important that errors propagate to your services quickly, you’d arrange it ABEFCD so that C and D can’t hold up E and F. And vice versa.

I’m starting to wonder if there’s a way of abstracting this structure up into Flow. Like extending map and it’s friends to take an atom, and Flow handles the pattern matching and passthrough for you. Hmm.

Where Next?

Popular in Questions Top

sen
Hi All, I set a environment variables in dev.exs , like below code. when i start server, how can i set the ${enable} value? thanks. d...
New
Tee
can someone please explain to me how Enum.reduce works with maps
New
chrisalley
ExUnit now has describe blocks which is a welcome addition coming from RSpec. In the docs, it states that nested hierarchies of describe ...
New
tduccuong
Hi, is there any work on GUI with Elixir, that is similar to Electron/Javascript? My idea is to bundle Phoenix and BEAM into a single se...
New
nobody
How to bind a phoenix app to a specific ip address? could not find anything about that, nowhere, unfortunately, but for me this is quite...
New
jaysoifer
Is there a way to rollback a specific migration and only that one (“skipping” all the other ones)? Would mix ecto.rollback -v 200809061...
New
aalberti333
As the title describes, I’m trying to run Enum.map() over a list of key/value pairs, where the value is a map. My data looks like this: ...
New
nobody
Hi! In PHP: $_SERVER[‘SERVER_ADDR’] - in Elixir? Searched the docs for ip address and the web, no good results. Thanks!
New
joaquinalcerro
Hi there, I am working with Ecto-Postgresql and I need to call all of the records from a specific table but the table has 40,000 records...
New
openscript
Hello! Sorry for this astonishing simple question, but I’m really stuck. I try to set up the intellij-elixir plugin, but I don’t know ho...
New

Other popular topics Top

lastday4you
I wanted to check elixir version in phoenix because i found that my elixir is 1.5 but when i use Enum.chunk_by it said the function is un...
New
AstonJ
Posting this to see if we can make things easier for people to get into Neovim. If you use Neovim and have a favourite distro please let ...
New
gshaw
What is the idiomatic way of matching for not nil in Elixir? E.g., First way: defp halt_if_not_signed_in(conn, signed_in_account) when...
New
jerry
Good day to you all. I have been struggling to get a query involving like and ilike to work. Can anyone assist me on this, please? pro...
New
stefanchrobot
What’s the safe way to decode a JSON string into a struct? I want to avoid calling String.to_atom. Jason.decode can give me a map with st...
New
klo
Got a question about when to concat vs. prepending items to list then reversing to achieve appending. So i know lists boil down to [1 | ...
New
openscript
Hello! Sorry for this astonishing simple question, but I’m really stuck. I try to set up the intellij-elixir plugin, but I don’t know ho...
New
AstonJ
Seen any cool LiveView demos, sample apps or examples? Please post them here! :003:
New
jononomo
For some reason my phoenix channels are working for me in my local dev environment, but as soon as I deploy via Docker, I get a 403 error...
New
vonH
In asking this question I am more interested about the expressiveness of the language itself and less concerned about the availability of...
New

We're in Beta

About us Mission Statement