Ecto Multi in a Loop

I load a CSV into two separate tables in my database and it works great.

Now that I have moved the database to the cloud, it’s time to use batch inserts/updates, but all of the examples I find for Ecto.Multi are trivial examples and I’m not sure how to go about building a process that can ultimately do one Repo.transaction for each of the two tables, using the data that I have collected inside of a loop that steps through each row of the CSV.

I reckon I have to, somehow, incorporate the use of Multi.append but don’t really know where to start.

Does each loop create its own Multi and before the next iteration, do a Multi.append into a ‘master’ multi? Or can I just pass the ‘master’ multi, which is created outside of the loop, to the sub-processes that run ‘inner multis’ inside the loop?

I found this example, which is close to what I’m after, but it runs the transaction each time which isn’t saving me any trips to the database server.

Here is some pseudocode to show you what I’m doing:

  def parse_csv(conn, file) do
    stage_date = get_date_from_filename(file)

    File.stream!("uploads/#{file}", [:trim_bom])
    |> CSV.decode(headers: true)
    |> Stream.each(fn row -> _process_csv_row(conn, row, stage_date) end)
    |> Stream.run()

    conn
  end

  defp _process_csv_row(conn, row, stage_date) do
    tenant = tenant(conn)

    # Set ticket_params from row data

    # Build ticket changeset and do a Repo.insert_or_update
    _process_ticket(ticket_params, tenant)

    # Get the id of the ticket just inserted or updated

    # Set activity_params from row data and ticket id

    # Build activity changeset and do a Repo.insert
    _process_activity(activity_params, tenant)
  end

Hey @bodhilogic,

Ecto.Multi is for executing multiple statements in a single transaction. It doesn’t batch inserts and updates. For that, you’d want to use insert_all or update_all defined on your repo. If all of your rows are inserted/updated in a single insert_all/update_all, you won’t need a transaction, because it’ll be part of a single atomic statement. However, if your CSV is large, you will likely want to batch insert in chunks. You can split your stream into chunks by using Stream.chunk_every/2. Then you can do a batch db operation with that chunk. It is likely best to not wrap the update of all chunks in a transaction either, because inserting/updating a large amount of rows in a table, will acquire write locks for all of those rows, preventing reads for as long as the transaction is open. That can usually affect other parts of your system negatively.

7 Likes

Fantastic answer! Thank you so much :slight_smile:

To answer your question “how do I use Ecto.Multi in a loop”, you would use Enum.reduce/3 to build up the Multi as you process elements in an enumerable.

def parse(_) do
  multi = Ecto.Multi.new()

  multi = 
    File.stream!("uploads/#{file}", [:trim_bom])
    |> CSV.decode(headers: true)
    |> Enum.reduce(multi, &process_row/2)
  
  Repo.transaction(multi)
end

defp process_row(row, multi) do
  parsed_row = ...

  Ecto.Multi.insert(multi, ...)
end
7 Likes

isn’t update_all only used for updating multiple rows with the same value?

It can use SQL functions to set different values. Though, if you’re looking to pass data to update, insert_all + the upsert options on_conflict and conflict_target are probably what you want.

1 Like

I will try with the insert_all function, thx.

For maybe other people, reason I am asking is that I have a list of changesets, that I want to bulk update, not insert.
Ecto.Multi is not suited, because it is for executing multiple statements in a single transaction.
update_all/3 Is not suited because it is for updating multiple rows with the same value.
So I am a bit lost how to do a bulk update for all kinds of differents ids.

For example, how to bulk update this:

[
  %MyStruct{id: 23, distance: 45},
  %MyStruct(id: 46, odometer: 37181}
  .. etc
]

I can not update thm one-by-one, as this is far too much DB IO.

What is your problem with that exactly? That’s less DB concurrency and allows you to do more DB operations in parallel.

When you say “X is too much DB I/O”, have you measured it?