Hi. I’m just learning Elixir, and tackling a script to do async processing. What I’m trying to build is a command line processor for tagging images. Here are a simplified version of the stages:
1 Read list of files from a directory
2 Load Image analysis for each file (slow). if it fails, skip the rest of the process
3 Separate image data into ColorInfo or SizeInfo by pattern matching on the returned lines
4 convert SizeInfo into a number (total area)
5 covert ColorInfo into a list of tags (colors)
6 combine size and color tags into a single map along with the file name
7 update tag store for each file (slow)
8 move file into completed directory if successful
9 output summary # of files sucessfully tagged / # errored when complete
Operation 1 is simple
Operations 2 is a point at which I need to spawn multiple processes. Errors jump to operation 8, otherwise we continue with 3
Operation 3-6 process the result of 2
Operation 7 works on the processed result, and should run async because slow
Operation 8 behaves differently on errors/success, this is simple pattern matching and filling an aggregator log
Operation 9 outputs the log created in 8
Going off Building Product Recommendations and Tuning Elixir Genstage Flow Pipepline I’m planning the following flow:
# start with retrieving the file names
get_file_names
# then convert to Flow
|> Flow.from_enumerable()
# distribute one item at a time
|> Flow.partition(max_demand: 1)
# call the data generator to get a flow of generated binary strings, each in it's own process
|> Flow.map(&generate_data/1)
# send a progress update
|> Flow.each(¬ify/1)
# split into individual lines
|> Flow.map(&split_lines/1)
# build tags from each line, pattern matching line structure
|> Flow.map(&build_tags/1)
# partition the data by file key so we can merge the generated tags by file
|> Flow.partition(key: {:key, :file})
# group tags together by the filename
|> Flow.group_by_key(:file)
# switch to working with the groups (state)
|> Flow.emit(:state)
# send a progress update
|> Flow.each(¬ify/1)
# work one at a time again
|> Flow.partition(max_demand: 1)
# save tagging data
|> Flow.map(&save_tags/1)
# send a progress update
|> Flow.each(¬ify/1)
# move completed files
|> Flow.map(&move_file/1)
# run until complete
|> Flow.run
I have two specific questions
1.) Am I correctly partitioning by making sure the slow stuff happens one at a time, or is that a question that can only be answered by tuning after it’s built?
2.) How do I catch errors? Is there something like OK that works with flow, or would I need to match on each step.
I appreciate any responses or feedback very much.