How to build a large tensor from a postgres table

Hi!

I’ve got a >10M rows dataset in a postgres table. I’m trying to turn it into an Nx.Tensor in a reasonably performant way so I can pass it to EXGBoost.train/2.

My approach was to stream a query and build up a tensor using Nx.concatenate. I ran the following yesterday for a few hours:

Repo.transaction(
  fn ->
    stream = Ecto.Adapters.SQL.stream(Repo, "select * from training_data", [], max_rows: 50_000)

    [%{rows: [first_row | _] = rows}] = Enum.take(stream, 1)
    n_cols = length(first_row)

    stream
    |> Stream.drop(1)
    |> Stream.with_index(1)
    |> Enum.reduce(rows |> Nx.tensor() |> Nx.reshape({n_cols, :auto}), fn
      {%{rows: []}, _}, tensor ->
        tensor

      {%{rows: batch}, batch_index}, tensor ->
        if rem(batch_index, 10) == 0, do: IO.puts("Batch: #{batch_index}")
        Nx.concatenate([tensor, batch |> Nx.tensor() |> Nx.reshape({n_cols, :auto})], axis: 1)
    end)
  end,
  timeout: :infinity
)

(It eventually finished, but I seemed to be hitting some $O(n^2)$ behavior on the concatenate call.)

Unfortunately, I’m bad at using VMs and long story short I have to run this again today. So I figured I’d take the opportunity to post what I’m doing here and see if anyone has any thoughts/suggestions.

I think the better way is to Enum.map and then Nx.concatenate the full list only once.
By doing N concatenate calls, you’re effectively calling for memory allocation at each timestep for both the individual tensor and the new accumulator.

I assume that what you’re feeling as O(n^2) is actually the growing accumulator taking longer and longer to allocate new contiguous memory sections.

...
|> Enum.map(fn %{rows: batch} -> batch |> Nx.tensor() |> Nx.reshape({n_cols, :auto}) end)
|> Nx.concatenate(axis: 1)

another possible approach would be something like:

...
|> Enum.map(fn %{rows: batch} -> batch end)
|> Nx.tensor()
|> Nx.transpose()
1 Like

Thanks!

Makes sense.

Perhaps. My script finished a little while ago. (Took maybe 2 hours? I should’ve timed it.) I’m curious now so I’ll find some time to do some rough benchmarks and report back.

Also, @benwilson512 suggested I just yolo it and do query |> Repo.all |> Nx.tensor. Without any real justification, I just assumed that would be bad? But it’s worth a shot.

I’ll try all three approaches and see what’s best.

Repo.all is basically the same as the second suggestion, basically. The stream there doesn’t change much because you’ll need everything in memory in the end for the tensor allocation.

Unless you’re allocating in the GPU, then mapping and creating tensors individually first might have some impact.

Hey wanted to follow up:

For my use case, I wasn’t able to get either the Enum.map or Repo.all versions to work. I kept getting OOM’d. Here’s the script that ended up succeeding (took 15-20 mins across a few runs):

chunk_size = 1_700_000

{time, result} =
  :timer.tc(fn ->
    0..10
    |> Stream.map(&IO.inspect/1)
    |> Stream.map(fn i ->
      Repo.query!(
        "select * from training_data_2 where id > $1 and id <= $2",
        # parameterize like this and your debug output includes the fenceposts
        [i * chunk_size, (i + 1) * chunk_size],
        timeout: :infinity
      )
    end)
    |> Stream.filter(fn x -> Enum.any?(x.rows) end)
    |> Stream.map(fn x -> Nx.tensor(x.rows) end)
    |> Enum.reduce(fn new_tensor, prev_tensor ->
      IO.puts("concatenating...")
      result = Nx.concatenate([prev_tensor, new_tensor])
      IO.puts("garbage collecting...")
      Process.list() |> Enum.each(&:erlang.garbage_collect/1)
      result
    end)
  end)

It’s basically what I started with, but with a much larger chunk/batch size and an explicit GC call (thanks @benwilson512 for the suggestion!).

Obviously there are tons of considerations and YMMV with a given approach depending on the size of your dataset, your hardware, etc. But this is what ended up working best.

Oh and FWIW, we did a tiny benchmark where we 1. kept the number of batches static (10) and 2. kept multiplying the chunk size by 10 – i.e. we kept processing more and more rows. We saw roughly linear behavior WRT run time. So it seems like even though each concatenate call asks for more memory, the overall effect was negligible if you kept the number of batches low. Again, tons of factors here so take that conclusion with a grain of salt.

1 Like