Data processing in Parallel (Hackathon project)

yes, I will try to find something in the Mysql log or somewhere else. I will come back :slight_smile:

Ah, I don’t mean that. Surely there must be more errors in the Elixir’s console / log?

But in any case I at my wits end here because I’ve done work similar to yours, at least 7 times in the last 7.5 years and was always able to fix even the occasional missed inserts.

You can try pool_size = 16

Just now noticed that pool_size and max_concurrency are the same. As already mentioned above, I advise you to always have max_concurrency be little less than pool_size.

I’d bump pool_size to 20 and then set max_concurrency to 18.

I wish you all a happy and healthy new year!

I have configured the pool size as advised to 20 and the max_concurrency to 18.
This seems to have no effect as I still have the same issue as before.

config/config.exs

import Config


config :file_watch_example, :ecto_repos, [Store]
config :file_watch_example, Store,
  database: "store",
  username: "",
  password: "",
  hostname: "db",
  pool_size: 20,
  migration_timestamps: [type: :utc_datetime_usec],
  migration_lock: nil,
  queue_target: 10_000

config :logger, :console,
  level: :debug,
  format: "[$level] $message $metadata\n",
  metadata: [:error_code, :file]

config :logger,
  backends: [{LoggerFileBackend, :debug}]

config :logger, :debug,
  path: "/var/log/elixir_app/debug.log",
  level: :debug

lib/ingest_csv.ex

defmodule IngestCSV do
  alias NimbleCSV.RFC4180, as: YourCSV

  require Logger

  NimbleCSV.define(YourCSV, separator: ";")

  def load(path) do
    path
    |> Path.expand()
    |> File.stream!(read_ahead: 524_288)
    |> YourCSV.parse_stream()
    |> Stream.chunk_every(500)
    |> Task.async_stream(fn rows ->
      Enum.map(rows, fn [name, bar_code, price, currency] ->
         FileWatchExample.Product.create_product(%{name: name, bar_code: bar_code, price: price, currency: currency})
      end)
    end, max_concurrency: 18, on_timeout: :kill_task, ordered: false)
    |> Stream.run()
  end
end

grep -i error /var/log/elixir_app/debug.log | tail -3

07:43:22.538 [info] MyXQL.Connection (#PID<0.230.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.5886.0> exited
07:43:22.541 [info] MyXQL.Connection (#PID<0.226.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.5889.0> exited
07:43:22.542 [info] MyXQL.Connection (#PID<0.240.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.5890.0> exited

The MyXQL.Connection Error is the only one I am facing currently:

grep -i error /var/log/elixir_app/debug.log | grep -v MyXQL.Connection | wc -l
0

MySQL “show variables:” :

concurrent_insert          | AUTO
connect_timeout            | 10
max_connect_errors         | 100
max_connections            | 1000
max_insert_delayed_threads | 20
max_user_connections       | 0

The Elixir-App and MySQL run in Docker and no other containers are running when doing my tests.

Do not insert 500 records in a loop. Maybe just remove the Stream.chunk_every and Enum.map calls and and just insert records one by one. I think the way you’re doing it now increases contention for DB connections.

Working with batches of records in your scenario only makes sense if you’re going to use Repo.insert_all (without validation). If you need to validate and thus insert records one by one then there’s no point in working with batches.

2 Likes

thank you, I will try that.
For which use cases make it sense to do not do validation?

In your current code.

It tries to do two things that conflict: (1) work with the records in batches – via the Stream.chunk_every(500) call, and (2) insert records one by one.

You don’t do that.

You either:

  1. Use Stream.chunk_every(500) and Repo.insert_all(rows) (which skips validation).

OR

  1. Do NOT use Stream.chunk_every(500) and do NOT use Enum.map(rows, ... Repo.insert(one_record)). Just do Repo.insert(row) directly.

I’ll follow up with a bigger comment explaining why you can’t ingest all records. Looking at your complete code it’s now perfectly clear. Incoming.

1 Like

Now that you posted the entire code, here’s exactly what’s going on.

You are spawning 18 parallel processes and each of them is trying to insert 500 records one by one which means 9000 DB transactions. Unless you configure your pool with 9000 connections then an incomplete ingestion is to be expected.

The idea of balancing out your Repo’s pool size and the streaming processing’s max_concurrency parameter is to avoid exactly what your code is doing: never go above the Repo’s pool size.

But your code is doing it, in fact exceeding Repo’s capacity by 500x.

Change your code to do this:

defmodule IngestCSV do
  alias NimbleCSV.RFC4180, as: YourCSV

  require Logger

  NimbleCSV.define(YourCSV, separator: ";")

  def load(path) do
    path
    |> Path.expand()
    |> File.stream!(read_ahead: 524_288)
    |> YourCSV.parse_stream()
    # Remove `Stream.chunk_every`
    |> Task.async_stream(fn [name, bar_code, price, currency] ->
       # Remove `Enum.map`
       FileWatchExample.Product.create_product(%{name: name, bar_code: bar_code, price: price, currency: currency})
    end, max_concurrency: 18, on_timeout: :kill_task, ordered: false)
    |> Stream.run()
  end
end

Now your DB pool’s size is 20 and you will never have more than 18 records being inserted in parallel, thus you should never have dropped ingestions.

If that really works then you can proceed to bump up the Repo’s pool size to e.g. 100 and put max_concurrency at 95 - 98 and that should work fine and accelerate your workload.

I believe you are being tripped up by Task.async_stream’s semantics. The function that is passed to it is going to be working in 18 separate parallel independent processes. BUT, having Enum.map inside of it is still serial i.e. not parallelized. So you do have 18 parallel independent tasks each trying to open 500 DB transactions (inserts) one after another. As you have found that that mostly works but not always – because it’s a very wrongly written parallel code.

4 Likes

I wonder if postgres can handle this abuse better.

Likely yes but with that code the odds are against any DB.

1 Like

thank you very much! , that’s solve my issue. I get no errors anymore logged.
Now I have to find out why the process still creating records although the end of the file was reached.

The file containing products:

cat file.csv | wc -l
10000000

Total products created in the database:

mysql> select count(*) from products;
+-----------+
| count(*)  |
+-----------+
| 109958881 |
+-----------+
1 row in set (15.69 sec)

Can’t know but obviously empty the DB before starting a run.

2 Likes

You can also split the CSV file on smaller chunks and experiment with them.

Can’t know but obviously empty the DB before starting a run.

That is what I do before each run. I also clear the log before to check that the amount of INSERT is equal to the amount of data records created into the database :slight_smile:

As said above, start chopping that CSV file on smaller chunks and experiment.

I can’t help you further unless you give me full access to the file, inform me of your exact MySQL version, what Erlang / Elixir versions you use, and give me access to the code.

You’ll have to track this down yourself because even if I was willing to help further and drill deeper, I am simply out of bandwidth to do so. That might change in just a few days or might not change for months – my work load varies.

I am pretty sure the problem is easy to solve though.

Oh btw, one final thing you can try just so we’re 100% sure:

Don’t do parallelism. Just stream the CSV file records and insert them one by one and see if you get 10M records.

BTW the amount above is mind-blowingly different
 you expect 10M records but get almost 110M? Something is super weird, man, you must have overlooked something in the code.

I say make a public GitHub project with all your code and include a link to where the big CSV file can be downloaded (outside GitHub) so people can try and help you.

I have uploaded the file separately as advised. The file is available for download until 12.01.2024: products.csv

I have now created a repo with the code I use.

Thank you so much for your help!

What MySQL engine are you using? Is it MariaDB or good old MySQL version? And what version?

Dude, you have no database migration to create the table. Put one in! How can anyone help you if you haven’t took care to create the DB schema?

EDIT: I mean, it’s pretty simple, you should have included it:

defmodule Store.Migrations.AddProducts do
  use Ecto.Migration

  def change do
    create table(:products) do
      add(:name, :string)
      add(:bar_code, :string)
      add(:price, :string)
      add(:currency, :string)
      timestamps(type: :utc_datetime_usec)
    end
  end
end