Iex.new

Iex.new

Data processing in Parallel (Hackathon project)

Hi,

in our Team at work we have Hackathons and for next year I am thinking about to introduce Elixir. As we have todo with data processing in our daily work I planning to propose a small app that should import data in parallel from a huge csv file (let say for example a list of persons) into a database. And maybe to transform the data in between.
I think I would also like to show the results in the front end using Pheonix.

During my research I came across Flow and Broadway.
As I have not so much experience with Elixir and never used Flow/Broadway, what could be a good fit for my experimentation ?

I find this article on dev.to which seems to close to what I want to do :

Thank you!

Marked As Solved

dimitarvp

dimitarvp

Now that you posted the entire code, here’s exactly what’s going on.

You are spawning 18 parallel processes and each of them is trying to insert 500 records one by one which means 9000 DB transactions. Unless you configure your pool with 9000 connections then an incomplete ingestion is to be expected.

The idea of balancing out your Repo’s pool size and the streaming processing’s max_concurrency parameter is to avoid exactly what your code is doing: never go above the Repo’s pool size.

But your code is doing it, in fact exceeding Repo’s capacity by 500x.

Change your code to do this:

defmodule IngestCSV do
  alias NimbleCSV.RFC4180, as: YourCSV

  require Logger

  NimbleCSV.define(YourCSV, separator: ";")

  def load(path) do
    path
    |> Path.expand()
    |> File.stream!(read_ahead: 524_288)
    |> YourCSV.parse_stream()
    # Remove `Stream.chunk_every`
    |> Task.async_stream(fn [name, bar_code, price, currency] ->
       # Remove `Enum.map`
       FileWatchExample.Product.create_product(%{name: name, bar_code: bar_code, price: price, currency: currency})
    end, max_concurrency: 18, on_timeout: :kill_task, ordered: false)
    |> Stream.run()
  end
end

Now your DB pool’s size is 20 and you will never have more than 18 records being inserted in parallel, thus you should never have dropped ingestions.

If that really works then you can proceed to bump up the Repo’s pool size to e.g. 100 and put max_concurrency at 95 - 98 and that should work fine and accelerate your workload.

I believe you are being tripped up by Task.async_stream’s semantics. The function that is passed to it is going to be working in 18 separate parallel independent processes. BUT, having Enum.map inside of it is still serial i.e. not parallelized. So you do have 18 parallel independent tasks each trying to open 500 DB transactions (inserts) one after another. As you have found that that mostly works but not always – because it’s a very wrongly written parallel code.

Also Liked

dimitarvp

dimitarvp

If you don’t want to store future and current tasks state you can just get away with NimbleCSV and Task.async_stream/3, very easily:

defmodule IngestCSV do
  alias NimbleCSV.RFC4180, as: YourCSV

  def load(path) do
    path
    |> Path.expand()
    |> File.stream!(read_ahead: 524_288)
    |> YourCSV.parse_stream()
    |> Task.async_stream(fn [name, age, address, email] ->
      # do something with a single CSV record.
      # possibly best to also hand off the results to another service?
      # if you want to just process all records and receive a new list
      # then you should just use Stream.map and Enum.to_list at the end.
    end, max_concurrency: 100, on_timeout: :kill_task, ordered: false)
    |> Stream.run()
  end
end

Flow and Broadway are awesome but for a 2GB file the above will serve you just fine. I’ve processed files up until 17GB or so, if memory serves.

Again though, if each record – or a batch of records – takes more time to process then you’ll need to have more persistent workers where Oban will be much more suited.

D4no0

D4no0

How big the CSV is? What is the policy on failure and restarts of the server, meaning do you need persistence in processing?

If you don’t have any special requirements, I would just recommend to use Oban. You can just simply stream the file contents into small to medium jobs, then process them concurrently with Oban without having to care about anything else, you will have features like rate-limiting and persistence out of the box.

dimitarvp

dimitarvp

You are sorted. Here is the PR: Make it work by dimitarvp · Pull Request #1 · pascal-chenevas/file_watch_example · GitHub

I made it use batch inserting again and it works just fine. Only takes 108 seconds on my Intel Mac to insert exactly 10 million records.

Try my branch on your machine and let us know. No reason not to work unless your MySQL instance is broken, or you are not showing all your code and something else in it is bugging it.

Where Next?

Popular in Questions Top

_russellb
I want to try my hand at web scraping. What tools/libraries do I need to use. I’m hoping to turn this into something professional so don’...
New
marius95
Hello everyone, I try to use an Javascript Event Handler in my root.html.leex file. Therefore I created a function in the app.js file: ...
New
Tee
can someone please explain to me how Enum.reduce works with maps
New
Harrisonl
We have an ECS cluster with 4 services, where each task joins a single cluster, via discovery ECS discovery service. Currently when I de...
New
skosch
To my knowledge, put_in, Map.update etc. all have the one limitation of not automatically creating intermediate keys when needed (for exa...
New
lessless
I believe there are people here who are dealing with CSV files import on the daily basis, and since Excel is a really popular tool there ...
New
beno
I will often find my self writing things similar to: case some_value do nil -> something() "" -> something() _ -> somethi...
New
joeerl
Hello again - after a longish gap I’ve decided I really must dig into Elixir and see what’s been happening here - so I have a few questio...
New
itssasanka
Hi all, Trying to get some more clarity over utc_datetime and naive_datetime for Ecto: The documentation above suggests that while ...
New
freewebwithme
Using vs code and installed ElixirLS: support and debugger. And I got an error popped up on start up says Failed to run ‘elixir’ comma...
New

Other popular topics Top

aadeshere1
I have a another noob question about loop. Since elixir is immutable, while loop is not directly possible. total = 10 while total != 0 ...
New
senggen
Erlang/OTP 25 [erts-13.2.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] 15:22:35.803 [error] gen_event {lager_file_backend...
New
Darmani72
If I have a post route which an argument: post /my_post_route/:my_param1, MyController.my_post_handler How would get the post params ...
New
johnnyicon
Hi all, I’ve just started learning Elixir and Phoenix Framework, so please pardon my n00bness at this stage. I’m trying to use Postgres...
New
Fl4m3Ph03n1x
About me? ( if you have nothing better to do than reading about some random guy in the internet :stuck_out_tongue: ) Hello all, this is ...
New
jay1
Why is it that the mnesia database isn’t the most preferred database for use in Elixir/Phoenix?
New
aalberti333
As the title describes, I’m trying to run Enum.map() over a list of key/value pairs, where the value is a map. My data looks like this: ...
New
nobody
Hi! In PHP: $_SERVER[‘SERVER_ADDR’] - in Elixir? Searched the docs for ip address and the web, no good results. Thanks!
New
joaquinalcerro
Hi there, I am working with Ecto-Postgresql and I need to call all of the records from a specific table but the table has 40,000 records...
New
hariharasudhan94
Lets say i have map like this fetching from my database %{"_id" => #BSON.ObjectId<58eb1a7a9ad169198c3dXXXX>, "email" => "XXX...
New

We're in Beta

About us Mission Statement