Iex.new
Data processing in Parallel (Hackathon project)
Hi,
in our Team at work we have Hackathons and for next year I am thinking about to introduce Elixir. As we have todo with data processing in our daily work I planning to propose a small app that should import data in parallel from a huge csv file (let say for example a list of persons) into a database. And maybe to transform the data in between.
I think I would also like to show the results in the front end using Pheonix.
During my research I came across Flow and Broadway.
As I have not so much experience with Elixir and never used Flow/Broadway, what could be a good fit for my experimentation ?
I find this article on dev.to which seems to close to what I want to do :
Thank you!
Marked As Solved
dimitarvp
Now that you posted the entire code, here’s exactly what’s going on.
You are spawning 18 parallel processes and each of them is trying to insert 500 records one by one which means 9000 DB transactions. Unless you configure your pool with 9000 connections then an incomplete ingestion is to be expected.
The idea of balancing out your Repo’s pool size and the streaming processing’s max_concurrency parameter is to avoid exactly what your code is doing: never go above the Repo’s pool size.
But your code is doing it, in fact exceeding Repo’s capacity by 500x.
Change your code to do this:
defmodule IngestCSV do
alias NimbleCSV.RFC4180, as: YourCSV
require Logger
NimbleCSV.define(YourCSV, separator: ";")
def load(path) do
path
|> Path.expand()
|> File.stream!(read_ahead: 524_288)
|> YourCSV.parse_stream()
# Remove `Stream.chunk_every`
|> Task.async_stream(fn [name, bar_code, price, currency] ->
# Remove `Enum.map`
FileWatchExample.Product.create_product(%{name: name, bar_code: bar_code, price: price, currency: currency})
end, max_concurrency: 18, on_timeout: :kill_task, ordered: false)
|> Stream.run()
end
end
Now your DB pool’s size is 20 and you will never have more than 18 records being inserted in parallel, thus you should never have dropped ingestions.
If that really works then you can proceed to bump up the Repo’s pool size to e.g. 100 and put max_concurrency at 95 - 98 and that should work fine and accelerate your workload.
I believe you are being tripped up by Task.async_stream’s semantics. The function that is passed to it is going to be working in 18 separate parallel independent processes. BUT, having Enum.map inside of it is still serial i.e. not parallelized. So you do have 18 parallel independent tasks each trying to open 500 DB transactions (inserts) one after another. As you have found that that mostly works but not always – because it’s a very wrongly written parallel code.
Also Liked
dimitarvp
If you don’t want to store future and current tasks state you can just get away with NimbleCSV and Task.async_stream/3, very easily:
defmodule IngestCSV do
alias NimbleCSV.RFC4180, as: YourCSV
def load(path) do
path
|> Path.expand()
|> File.stream!(read_ahead: 524_288)
|> YourCSV.parse_stream()
|> Task.async_stream(fn [name, age, address, email] ->
# do something with a single CSV record.
# possibly best to also hand off the results to another service?
# if you want to just process all records and receive a new list
# then you should just use Stream.map and Enum.to_list at the end.
end, max_concurrency: 100, on_timeout: :kill_task, ordered: false)
|> Stream.run()
end
end
Flow and Broadway are awesome but for a 2GB file the above will serve you just fine. I’ve processed files up until 17GB or so, if memory serves.
Again though, if each record – or a batch of records – takes more time to process then you’ll need to have more persistent workers where Oban will be much more suited.
D4no0
How big the CSV is? What is the policy on failure and restarts of the server, meaning do you need persistence in processing?
If you don’t have any special requirements, I would just recommend to use Oban. You can just simply stream the file contents into small to medium jobs, then process them concurrently with Oban without having to care about anything else, you will have features like rate-limiting and persistence out of the box.
dimitarvp
You are sorted. Here is the PR: Make it work by dimitarvp · Pull Request #1 · pascal-chenevas/file_watch_example · GitHub
I made it use batch inserting again and it works just fine. Only takes 108 seconds on my Intel Mac to insert exactly 10 million records.
Try my branch on your machine and let us know. No reason not to work unless your MySQL instance is broken, or you are not showing all your code and something else in it is bugging it.








