I use Repo.insert_all to bulk insert ~100m rows in chunks. After each chunk, some outdated data based on the insert’s RETURNING is deleted with Repo.delete_all.
The query that selects the rows to delete uses join: values(...) and has around 200-600 parameters, depending on the situation. These queries are homogeneous when you factor out the VALUES list.
After inserting all rows, the ETS Repo cache table has 1841 objects and uses a whopping ~360 MB of memory. It looks like each distinct parameter count is PREPAREd and cached individually.
It seems unnecessary to cache these highly specific DELETE queries. Can I bypass the cache for these queries, or what other options do I have?
This isn’t exactly what you’re asking but if you’re in the millions of rows category I would strongly suggest using COPY instead of bulk inserts. COPY is both more performant at the postgres level, and will also bypass the query cache issue you’re having.
Here is a helper module we have for this operation:
defmodule MyApp.PostgresBulkLoader do
require Logger
def load(repo, table, stream, columns) do
statement = """
COPY #{table} (#{Enum.join(columns, ", ")})
FROM STDIN
WITH (FORMAT csv, HEADER false)
"""
{:ok, :ok} =
repo.transaction(
fn ->
Logger.debug(statement)
stream
|> Stream.chunk_every(2000, 2000, [])
|> Stream.into(Ecto.Adapters.SQL.stream(repo, statement))
|> Stream.run()
end,
timeout: 3_600_000
)
:ok
end
end
It’s mildly tedious because you have to basically build CSV of the data you’re ingesting but it’s well worth it if you’re in the 100m rows world.
Thank you @josevalim@fuelen@benwilson512. The Ecto fix is highly appreciated. I will try json_to_recordset before the new Ecto version is out. Looks like I can get away with much larger batches when I can squeeze the entire data set into one parameter.
I will also try the COPY statement + figure out a new way to retrieve the data I previously obtained via RETURNING.
this community – it’s great that everyone is so involved.