mfclarke

mfclarke

Flow - error handling

Let’s say I have the following:

[A]
 |
 |
 |
[B]
 |\
 | \
 |  \
[C] [E]
 |   |
 |   |
 |   |
[D] [F]

A is a list of strings
B fetches data from an API using the given string
C performs some kind of work on the data
D writes to a db
E logs errors to a webservice
F posts a message to slack if there are a lot of errors in a small time window

Is it possible to handle errors like with Flow? As in, B can output 2 types of events: data and errors. Data is consumed by C and errors are consumed by E. In GenStage I know I can use a GenStage.PartitionDispatcher with a hash function that assigns events to one partition or the other depending on if they are regular data or errors. But I’m wondering if this is possible within the Flow API?

The best I can come up with is that B would send messages to a producer stage on error, which would be the start of a separate Flow (that consists of E and F). Is there a cleaner way?

Marked As Solved

mfclarke

mfclarke

Actually, yes, abstracting this higher is possible and really straightforward:

defmodule TaggedFlow do
  def map(flow, tag, transform) do
    Flow.map(flow, fn tagged_data ->
      case tagged_data do
        {^tag, data} -> transform.(data)
        _ -> tagged_data
      end
    end)
  end

  # And each/3, filter/3, flat_map/3, reduce etc.
end

Then we can go:

Flow.from_enumerable(A.data)
|> Flow.map(&B.do_stuff/1)
|> TaggedFlow.map(:ok, &C.do_stuff/1)
|> TaggedFlow.each(:ok, &D.do_stuff/1)
|> TaggedFlow.each(:error, &E.do_stuff/1)
|> TaggedFlow.each(:error, &F.do_stuff/1)
|> Flow.run

Then there’s no modifications for passthrough or tag pattern matching needed on the actual work modules.

Also Liked

mfclarke

mfclarke

I’ve been thinking about this a bit and come up with something I like, so I’m gonna answer my own question here in case other people run into this.

So, there’s 2 things to know. This is demand driven and I’m trying to utilise backpressure to avoid a build up of data when the next step in the flow is busy. By it’s very nature, if C is busy and E has run out of stuff to process, E can’t demand more data. Because there’s no way of knowing if B will produce data for C or E, B asking for demand could just add a build up of data on C even though C is trying to apply backpressure to avoid that in the first place. Put simpler: you can’t ask an API (B) for errors (E) :slight_smile: So, C and E may as well be the same step so to speak. There’s no branch, at least ask far as Flow is concerned.

Then how does the right data get to C and E? This is the second thing. I make B output “tagged tuples” like {:ok, data} and {:error, data}. This way C and E can pattern match to receive the data relevant for them. As long as they include a “pass through” default pattern match, the data for later steps flows down. So if you arrange the steps ABCE with C passing through (or “ignoring”) {:error, data}s, then the data makes it to E. Put DF on the end and ensure they also passthrough and you’re done.

It looks something like this (removing partitioning and simplifying to all maps):

Flow.from_enumerable(A.data)
|> Flow.map(&B.do_stuff/1)
|> Flow.map(&C.do_stuff/1)
|> Flow.map(&E.do_stuff/1)
|> Flow.map(&D.do_stuff/1)
|> Flow.map(&F.do_stuff/1)
|> Flow.run

# Example passthrough work module
defmodule C do
  def do_stuff({:ok, data}) do
    # actually do stuff
  end
  # passthrough stuff that isn't for C (ie, errors)
  def do_stuff(other_data_passthrough), do: other_data_passthrough
end

There’s also different ways of arranging the overlapping steps. Like, if it’s really important that errors propagate to your services quickly, you’d arrange it ABEFCD so that C and D can’t hold up E and F. And vice versa.

I’m starting to wonder if there’s a way of abstracting this structure up into Flow. Like extending map and it’s friends to take an atom, and Flow handles the pattern matching and passthrough for you. Hmm.

Where Next?

Popular in Questions Top

Tee
can someone please explain to me how Enum.reduce works with maps
New
electic
Hi, I am new to Elixir. I am trying to use the DateTime component to insert a date into MySQL however the there seems to be no way to fo...
New
johnnyicon
Hi all, I’ve just started learning Elixir and Phoenix Framework, so please pardon my n00bness at this stage. I’m trying to use Postgres...
New
vegabook
I’m brand new to Phoenix and I have stripped one of the demo applications to the bone. I just want to get an svg up on the screen. Here i...
New
belgoros
I’m not a pro in using Regex and can’t figure out why the following behaviour happens, especially if we take into account the difference ...
New
aalberti333
As the title describes, I’m trying to run Enum.map() over a list of key/value pairs, where the value is a map. My data looks like this: ...
New
fayddelight
I tried installing elixir 1.11.2 erlang 23.3.4 via asdf in my zsh shell. Enabled the versions locally and globally. When I list them ...
New
JDanielMartinez
Hi! May someone helps me, please! I have two apps into an umbrella project: the first one is Database, which manages queries, and the se...
New
hariharasudhan94
Lets say i have map like this fetching from my database %{"_id" => #BSON.ObjectId<58eb1a7a9ad169198c3dXXXX>, "email" => "XXX...
New
lanycrost
Hi everyone! I need implement if…else if…else condition from my elixir code, and anymore of this control flow structures not work proper...
New

Other popular topics Top

aadeshere1
I have a another noob question about loop. Since elixir is immutable, while loop is not directly possible. total = 10 while total != 0 ...
New
senggen
Erlang/OTP 25 [erts-13.2.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] 15:22:35.803 [error] gen_event {lager_file_backend...
New
Darmani72
If I have a post route which an argument: post /my_post_route/:my_param1, MyController.my_post_handler How would get the post params ...
New
johnnyicon
Hi all, I’ve just started learning Elixir and Phoenix Framework, so please pardon my n00bness at this stage. I’m trying to use Postgres...
New
Fl4m3Ph03n1x
About me? ( if you have nothing better to do than reading about some random guy in the internet :stuck_out_tongue: ) Hello all, this is ...
New
jay1
Why is it that the mnesia database isn’t the most preferred database for use in Elixir/Phoenix?
New
aalberti333
As the title describes, I’m trying to run Enum.map() over a list of key/value pairs, where the value is a map. My data looks like this: ...
New
nobody
Hi! In PHP: $_SERVER[‘SERVER_ADDR’] - in Elixir? Searched the docs for ip address and the web, no good results. Thanks!
New
joaquinalcerro
Hi there, I am working with Ecto-Postgresql and I need to call all of the records from a specific table but the table has 40,000 records...
New
hariharasudhan94
Lets say i have map like this fetching from my database %{"_id" => #BSON.ObjectId<58eb1a7a9ad169198c3dXXXX>, "email" => "XXX...
New

We're in Beta

About us Mission Statement