Flow Executing Twice

Flow question. I’m downloading a file from S3 where each line in the file is a JSON object. I am streaming this into a flow where I decode the JSON object to a struct, migrate (insert) to my database and produce a migration event struct, then reduce all of the migration event structs to a list where I summarize what occurred.

download(@bucket, file_name)
|> File.stream!()
|> Flow.from_enumerable()
|> Flow.map(&decode/1)
|> Flow.map(&migrate/1)
|> Flow.reduce(
  fn -> [] end,
  fn migration_record, migration_records ->
    [migration_record | migration_records]
  end
)
|> process_summary(file_name)

The problem is I get double the lines processed. My logs in each function show if I have 10 lines in my file, I get 20 log statements for decode and migrate, and my process_summary see 20 records were “processed”.

Any ideas what I am doing wrong?

1 Like

From “Concurrent Data Processing in Elixir” by Svilen Gospodinov:

“With Flow.reduce/3 , we have many processes, where each one continuously receives batches of items to work on. This is so Flow can balance the workload and handle back-pressure.
However, items that should be counted together can end up being consumed by more than one process, so we get duplicates when all results are combined together in the end.”

“The real solution to our problem is partitioning. Flow provides a function called partition/2 , which creates another layer of stages that act like a router. You can specify a :key for each item going through the partition. When a key is computed, Flow will guarantee that same-key items will be sent to the same reducer
process”

So to solve this issue you need to call Flow.partition/2 before Flow.reduce/3 in your pipeline, e.g.:
|> Flow.partition(key: {:key, key})
or
|> Flow.partition(key: {:elem, position})

2 Likes