How to add Error Handling to Flow process

Hi. I’m just learning Elixir, and tackling a script to do async processing. What I’m trying to build is a command line processor for tagging images. Here are a simplified version of the stages:

1 Read list of files from a directory
2 Load Image analysis for each file (slow). if it fails, skip the rest of the process
3 Separate image data into ColorInfo or SizeInfo by pattern matching on the returned lines
4 convert SizeInfo into a number (total area)
5 covert ColorInfo into a list of tags (colors)
6 combine size and color tags into a single map along with the file name
7 update tag store for each file (slow)
8 move file into completed directory if successful
9 output summary # of files sucessfully tagged / # errored when complete

Operation 1 is simple
Operations 2 is a point at which I need to spawn multiple processes. Errors jump to operation 8, otherwise we continue with 3
Operation 3-6 process the result of 2
Operation 7 works on the processed result, and should run async because slow
Operation 8 behaves differently on errors/success, this is simple pattern matching and filling an aggregator log
Operation 9 outputs the log created in 8

Going off Building Product Recommendations and Tuning Elixir Genstage Flow Pipepline I’m planning the following flow:

# start with retrieving the file names
get_file_names
# then convert to Flow
|> Flow.from_enumerable()
#  distribute one item at a time
|> Flow.partition(max_demand: 1)
# call the data generator to get a flow of generated binary strings, each in it's own process
|> Flow.map(&generate_data/1)
# send a progress update 
|> Flow.each(&notify/1)
# split into individual lines
|> Flow.map(&split_lines/1) 
# build tags from each line, pattern matching line structure
|> Flow.map(&build_tags/1) 
# partition the data by file key so we can merge the generated tags by file
|> Flow.partition(key: {:key, :file})
# group tags together by the filename 
|> Flow.group_by_key(:file) 
# switch to working with the groups (state)
|> Flow.emit(:state)
# send a progress update 
|> Flow.each(&notify/1)
# work one at a time again
|> Flow.partition(max_demand: 1)
# save tagging data
|> Flow.map(&save_tags/1)
# send a progress update 
|> Flow.each(&notify/1)
# move completed files
|> Flow.map(&move_file/1)
# run until complete
|> Flow.run

I have two specific questions

1.) Am I correctly partitioning by making sure the slow stuff happens one at a time, or is that a question that can only be answered by tuning after it’s built?

2.) How do I catch errors? Is there something like OK that works with flow, or would I need to match on each step.

I appreciate any responses or feedback very much. :slight_smile:

3 Likes

Update on this, here’s the code I ended up with and a few notes

def process(search_export, %SaveLocation{} = save_to) do

search_export
|> build_accumulators
|> Flow.from_enumerable()
|> Flow.partition(max_demand: 1, stages: 2)
# call the data generator to get a flow of generated binary strings, each in it's own process
|> Flow.map(&(ColorSummarizer.load(&1, save_to.system)))
# send a progress update
|> Flow.each(&notify/1)
# split into individual color summary maps
|> Flow.map(&ColorParser.parse_summary/1)
# report prgress
|> Flow.each(&notify/1)
# work one at a time again
|> Flow.partition(max_demand: 1, stages: 2)
# save the data
|> Flow.map(&(save_colors(&1, save_to)))
# send a progress update
|> Flow.each(&notify/1)
# run until complete
|> Flow.run

end

  • I ended up using a struct as an accumulator for the transformed version at each stage. This made it far easier to debug
  • The partition by key / group by key was difficult to get working consistently, and I ended up abandoning that route as the middle steps are all super-fast text updates
  • Testing with multiple items was key - things that worked fine with a single item list ended up dying unexpectedly when I had multpile items. This was related to the partition/group code, as that generated empty items
  • Errors are passed as {:error, image: %Image{}, message: bar}, while success are wrapped with {:ok, %Image{}}. Individual steps unwrap or skip based on the tuple.

To Dos:

  • It still dies if I run up against a pattern match failure, so I should wrap it in a try/catch
  • At some point I’ll benchmark a flow version vs a stream/enum version, but I can see it using all 4 cores consistently.