Data processing in Parallel (Hackathon project)

Hi,

in our Team at work we have Hackathons and for next year I am thinking about to introduce Elixir. As we have todo with data processing in our daily work I planning to propose a small app that should import data in parallel from a huge csv file (let say for example a list of persons) into a database. And maybe to transform the data in between.
I think I would also like to show the results in the front end using Pheonix.

During my research I came across Flow and Broadway.
As I have not so much experience with Elixir and never used Flow/Broadway, what could be a good fit for my experimentation ?

I find this article on dev.to which seems to close to what I want to do :

Thank you!

1 Like

How big the CSV is? What is the policy on failure and restarts of the server, meaning do you need persistence in processing?

If you don’t have any special requirements, I would just recommend to use Oban. You can just simply stream the file contents into small to medium jobs, then process them concurrently with Oban without having to care about anything else, you will have features like rate-limiting and persistence out of the box.

5 Likes

I think something like 2GB.
I do not have any policies at the moment as I am just starting to think about the “workshop” and things that could be nice to show. As we do not have so much tome (2 days) I would like something small but that also give a good overview of what is possible to do.

Flow would be a good fit for this

If you don’t want to store future and current tasks state you can just get away with NimbleCSV and Task.async_stream/3, very easily:

defmodule IngestCSV do
  alias NimbleCSV.RFC4180, as: YourCSV

  def load(path) do
    path
    |> Path.expand()
    |> File.stream!(read_ahead: 524_288)
    |> YourCSV.parse_stream()
    |> Task.async_stream(fn [name, age, address, email] ->
      # do something with a single CSV record.
      # possibly best to also hand off the results to another service?
      # if you want to just process all records and receive a new list
      # then you should just use Stream.map and Enum.to_list at the end.
    end, max_concurrency: 100, on_timeout: :kill_task, ordered: false)
    |> Stream.run()
  end
end

Flow and Broadway are awesome but for a 2GB file the above will serve you just fine. I’ve processed files up until 17GB or so, if memory serves.

Again though, if each record – or a batch of records – takes more time to process then you’ll need to have more persistent workers where Oban will be much more suited.

6 Likes

@dimitarvp, thank you!

Yesterday I was able to implement what I wanted to do with the solution you proposed :slight_smile:
In my process I check with the lib file_system if a new file was detected in a specific directory and if yes then it will proceed. Each line of the CSV file is correctly imported into a MySQL database. For that, I have used Ecto.

When trying to set up Ecto, it was proposed to set up my Repo in the application.ex file like :

 children = [
       FileWatcher,
       Store # is my Repo
 ]

config/config.exs:

import Config

config :file_watch_example, :ecto_repos, [Store]
config :file_watch_example, Store,
  database: "store",
  username: "user",
  password: "passwd,
  hostname: "db",
  pool_size: 100

In the Ecto documentation, the namespace is not the same:

 children = [
    MyApp.Repo,
  ]

In my case, I have tried to define MyApp.Store but this did not work for me.

So I have some questions … :slight_smile: :

  1. Must I have used MyApp.Store instead of only Store ?
  2. in my config/config.exs file the 2 lines seem almost the same, it is possible to refactor them into one?:
config :file_watch_example, :ecto_repos, [Store]
config :file_watch_example, Store,

If I understand correctly the first one is to set Store as available Ecto Repo and the second one is for the database configuration. Correct?

This is the code I ended with based on what you wrote:

defmodule IngestCSV do
  alias NimbleCSV.RFC4180, as: YourCSV

  NimbleCSV.define(YourCSV, separator: ";")

  def load(path) do
    path
    |> Path.expand()
    |> File.stream!(read_ahead: 524_288)
    |> YourCSV.parse_stream()
    |> Task.async_stream(fn [name, bar_code, price, currency] ->
      # do something with a single CSV record.
      # possibly best to also hand off the results to another service?
      # if you want to just process all records and receive a new list
      # then you should just use Stream.map and Enum.to_list at the end.
      FileWatchExample.Product.create_product(%{name: name, bar_code: bar_code, price: price, currency: currency})
    end, max_concurrency: 100, on_timeout: :kill_task, ordered: false)
    |> Stream.run()
  end
end

Glad you made it work!

I can offer you something that can accelerate the code further: you can put Stream.chunk_every(500) after YourCSV.parse_stream() and then the Task.async_stream will accept a batch of 500 records (not a single record).

And then you can do batch inserts – provided you don’t do Ecto validation that is. If you need validation then inserting one by one is still better.

Also Task.async_stream’s option max_concurrency will depend on your repo pool size in this case. F.ex. if your repo has a pool of only 20 connections then you should change the max_concurrency to 20 (or even 18-19). So have that in mind as well, it’s important and you might see a lot of timeout failures if max_concurrency is too high.

1 Like

thanks for the hint !
I have tried with 100 000 000 rows but only 62 733 738 have been imported.

I got some errors like:

01:14:22.102 [info] MyXQL.Connection (#PID<0.302.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.726857.0> exited

01:14:22.096 [error] MyXQL.Connection (#PID<0.227.0>) failed to connect: ** (MyXQL.Error) (1040) Too many connections

I think I will have to tune better the pool (100) and the number of max_concurrency (100).

I will play a bit with it.

What do you think about the questions I asked in my previous answer regarding the namespace and configuration?

If the MySQL instance itself is telling you “too many connections” maybe you should look into its own config – maybe it is not configured to accept that many? I mean you can allow Elixir to connect to 100 separate MySQL connections (on the same server) but if MySQL is configured to e.g. allow maximum 80 (just an example) then you’d get an error like that.

So it’s time to check MySQL’s config itself.

Yes, always scope your app’s modules. Never leave top-level modules unless you’re 100% convinced you’ll never get a collision. Which if you say “I am sure” would be famous last words because who knows who will try to integrate with your app / library in the future…

Yes, correct, hence it’s not possible to remove one or the other. The first setting comes in handy if you have several repositories.

2 Likes

Thank you @dimitarvp !

I’m curious if you fix the connection errors. Please let us know.

Until now I hadn’t managed :frowning:

Resources I found to help me to try to solve the issue:

My configuration:

my.cnf (mysql configuration)

max_connections=100000

file config/config.exs

pool_size: 500,
migration_timestamps: [type: :utc_datetime_usec],
migration_lock: nil,
queue_target: 500

into the file that reads the CSV file I set max_conccurency to 500:

def load(path) do
  path
  |> Path.expand()
  |> File.stream!(read_ahead: 524_288)
  |> YourCSV.parse_stream()
  |> Stream.chunk_every(500)
  |> Task.async_stream(fn rows ->
    # do something with a single CSV record.
    # possibly best to also hand off the results to another service?
    # if you want to just process all records and receive a new list
    # then you should just use Stream.map and Enum.to_list at the end.
    Enum.map(rows, fn [name, bar_code, price, currency] ->
       FileWatchExample.Product.create_product(%{name: name, bar_code: bar_code, price: price, currency: currency})
    end)
  end, max_concurrency: 500, on_timeout: :kill_task, ordered: false)
  |> Stream.run()

products.ex :

... some code

 def create_product(attrs) do
   %FileWatchExample.Product{}
   |> FileWatchExample.Product.changeset(attrs)
   |> Store.insert(timeout: 1000 * 60)
 end

First of all, don’t use a huge pool of connections, start with 10, having a bigger pool doesn’t mean necessarily that the queries will run faster.

2 Likes

These numbers are crazy. First of all, give MySQL a more lax limit: 100k is too much. Make it something like 1000 or 2000.

Furthermore, even 100 parallel tasks was already a lot before because you can likely saturate your CPU or even network (depending on what machine you’re running the workload). Even if you don’t saturate them you might overwhelm the runtime scheduler – very doubtful, I’ve very rarely seen it but depends on the machine – so just reduce the number of tasks.

Finally, make the db_connection configuration options a bit more liberal e.g. increase queue_target to something like 10_000 (10 seconds) or 2_000.

But do all these at the same time, not isolated.

And let us know. I’ve done plenty of such data ingestion work in Elixir and I am puzzled that you are having so much trouble. Worst case I had was a laggy remote Postgres which was easily fixed by increasing queue_target and queue_interval values.

I have updated my configuration based on what you adviced but I still have some DBConnection.ConnectionError. I was able to create ca 5 899 575 records from a total of 100 000 000.

config.exs:

  • pool_size: 10,
  • queue_target: 10_000

mysql config:

  • max_connections=1000

ingest_csv.ex :

  • max_concurrency: 10

Hm, that is strange and should not be the case. What are the errors this time around?

pool_size is related to your cpu… how many cores? It should be something around 2 x cores

There is a formula, but I don’t remember it exactly

I have a CPU AMD Ryzen 7 pro 5850U 8 core for Lenovo laptop.
I run the app locally in WSL.

I still have errors like:

MyXQL.Connection (#PID<0.302.0>) disconnected: ** (DBConnection.ConnectionError)

Only that? No other details?