Ecto Multi in a Loop

ecto
ecto_multi

#1

I load a CSV into two separate tables in my database and it works great.

Now that I have moved the database to the cloud, it’s time to use batch inserts/updates, but all of the examples I find for Ecto.Multi are trivial examples and I’m not sure how to go about building a process that can ultimately do one Repo.transaction for each of the two tables, using the data that I have collected inside of a loop that steps through each row of the CSV.

I reckon I have to, somehow, incorporate the use of Multi.append but don’t really know where to start.

Does each loop create its own Multi and before the next iteration, do a Multi.append into a ‘master’ multi? Or can I just pass the ‘master’ multi, which is created outside of the loop, to the sub-processes that run ‘inner multis’ inside the loop?

I found this example, which is close to what I’m after, but it runs the transaction each time which isn’t saving me any trips to the database server.

Here is some pseudocode to show you what I’m doing:

  def parse_csv(conn, file) do
    stage_date = get_date_from_filename(file)

    File.stream!("uploads/#{file}", [:trim_bom])
    |> CSV.decode(headers: true)
    |> Stream.each(fn row -> _process_csv_row(conn, row, stage_date) end)
    |> Stream.run()

    conn
  end

  defp _process_csv_row(conn, row, stage_date) do
    tenant = tenant(conn)

    # Set ticket_params from row data

    # Build ticket changeset and do a Repo.insert_or_update
    _process_ticket(ticket_params, tenant)

    # Get the id of the ticket just inserted or updated

    # Set activity_params from row data and ticket id

    # Build activity changeset and do a Repo.insert
    _process_activity(activity_params, tenant)
  end


#2

Hey @bodhilogic,

Ecto.Multi is for executing multiple statements in a single transaction. It doesn’t batch inserts and updates. For that, you’d want to use insert_all or update_all defined on your repo. If all of your rows are inserted/updated in a single insert_all/update_all, you won’t need a transaction, because it’ll be part of a single atomic statement. However, if your CSV is large, you will likely want to batch insert in chunks. You can split your stream into chunks by using Stream.chunk_every/2. Then you can do a batch db operation with that chunk. It is likely best to not wrap the update of all chunks in a transaction either, because inserting/updating a large amount of rows in a table, will acquire write locks for all of those rows, preventing reads for as long as the transaction is open. That can usually affect other parts of your system negatively.


#3

Fantastic answer! Thank you so much :slight_smile:


#4

To answer your question “how do I use Ecto.Multi in a loop”, you would use Enum.reduce/3 to build up the Multi as you process elements in an enumerable.

def parse(_) do
  multi = Ecto.Multi.new()

  multi = 
    File.stream!("uploads/#{file}", [:trim_bom])
    |> CSV.decode(headers: true)
    |> Enum.reduce(multi, &process_row/2)
  
  Repo.transaction(multi)
end

defp process_row(row, multi) do
  parsed_row = ...

  Ecto.Multi.insert(multi, ...)
end