Loading records from text file to database slow

I want to load the ratings from the netflix price data into a database (postgres on the same machine). But the process is excruciatingly slow.

The data is split over ~17000 files. Each corresponds to a movie. The files look like this:

[training_set]$ head mv_0006666.txt
6666:
2423875,5,2004-03-25
712664,4,2001-08-03
1990901,4,2000-04-15
1987434,3,2005-07-13
2615678,4,2004-01-16
1172232,3,2002-06-01
697038,3,2003-09-12
846887,4,2003-06-30
339448,1,2003-11-29

This is the code I load them with:

def load_nf_ratings() do
    Path.wildcard(@training_set)
    |> Enum.each(fn path ->
      [id_nf] = Regex.run(~r{(?:mv_0*(\d+).txt)}, path, capture: :all_but_first)
      IO.puts(id_nf)
      File.stream!(path, [encoding: :latin1])
      |> Enum.drop(1)
      |> Enum.each(fn line ->
        [uid_nf_str, rating_str, date_str] = String.trim_trailing(line) |> String.split(",", parts: 3)
        uid_nf = String.to_integer(uid_nf_str)
        rating = String.to_integer(rating_str)
        date = NaiveDateTime.from_iso8601(date_str <> " 00:00:00")

        User.changeset(%User{}, %{uid_nf: uid_nf}) |> Repo.insert!(on_conflict: :nothing)
        uid = Repo.get_by!(User, uid_nf: uid_nf) |> (&(&1.id)).()
        mid = Repo.get_by!(Movie, id_nf: id_nf) |> (&(&1.id)).()
        Interaction.changeset(%Interaction{}, %{user_id: uid, movie_id: mid, action: "presented", inserted_at: date})
          |> Repo.insert!(on_conflict: :nothing)
        Interaction.changeset(%Interaction{}, %{user_id: uid, movie_id: mid, action: "watched", inserted_at: date})
          |> Repo.insert!(on_conflict: :nothing)
        if rating > 3 do
          Interaction.changeset(%Interaction{}, %{user_id: uid, movie_id: mid, action: "liked", inserted_at: date})
            |> Repo.insert!(on_conflict: :nothing)
        end
      end)
    end
    )
  end

I match every netflix user/movie id with the id’s they have on the database (so they have a separate netflix id and a local id).

I run the code with:

iex(1)> Logger.configure(level: :warn)
iex(2)> Mvsg.Scripts.load_nf_ratings

The thing is, movie 176 for example only has ~200 ratings:

[training_set]$ wc -l mv_0000176.txt 
211 mv_0000176.txt

But it takes ~3 seconds to load:

:timer.tc(&Mvsg.Scripts.load_nf_ratings/0)
{2978493, :ok}

At this rate it is going to take weeks to load only 2GB of data (other files have many more records).

Is this normal performance for a task like this? How can I speed this up?

Doing an insertion per row is going to be extremely slow. At a minimum you want insert_all, but you’re probably best off with a Repo.stream which does a sql COPY.

1 Like

Ah ok, so I can load all the ratings from a movie into memory and then I insert_all them into the database? Although that still means I am hitting the database to get the local id’s for every entry. What is a Repo.stream with a sql copy? Looking at the docs, Repo.stream seems to just be for queries that return a lot of rows.

Well not all into memory, usually you do batches of several thousand, that tends to be the most efficient.

I would suggest simply getting all of the tables into SQL with only the data that is in the txt files, and then running an single giant UPDATE JOIN that effectively does all the lookups / updates in a single SQL query.

Yeah I was remembering a bit wrong, there’s a stream function on Ecto.Adapters.SQL that does what you want, here’s a generic function I have for streaming CSV rows into a table.

def load(repo, table, stream, columns) do
    statement = """
    COPY #{table} (#{Enum.join(columns, ", ")})
    FROM STDIN
    WITH (FORMAT csv, HEADER false)
    """

    {:ok, :ok} =
      repo.transaction(
        fn ->
          Logger.debug(statement)

          stream
          |> Stream.chunk_every(2000, 2000, [])
          |> Stream.into(Ecto.Adapters.SQL.stream(repo, statement))
          |> Stream.run()
        end,
        timeout: 3_600_000
      )

    :ok
  end

The core thought here though is that when you have this much data to process, you have to think about doing things in bulk, it isn’t feasible to treat this as a one a time operation, regardless of language.

8 Likes

Thanks! I have learned something new. I will do this.