What is the best approach for fetching large amount of records from postgresql with ecto

Hi there,

I am working with Ecto-Postgresql and I need to call all of the records from a specific table but the table has 40,000 records approximately (15MB).

Each record in the table has a email message id with which I have to call Google Gmail API to get the message.

So I am thinking about memory efficient. What is the most efficient way to call all the records with the minimum impact in memory or not loading the hole 15MB at once just to make API calls?

I found Ecto Stream but it works inside a transaction and I don’t use transactions. And… is Ecto Stream the best way to go?

Thanks for any comments.

Regards.

9 Likes

Well you could just request back that specific email message id instead of the entire row, just a simple list back would be quite tiny, as well as even if you got it all 15megs would still not be a lot. :slight_smile:

3 Likes

I haven’t ever done it with Ecto, but a PostgreSQL cursor could be a good option. The less complicated is just to query chunks of ids up to the max amount to avoid having to get them all in one query, deal with order by, etc.

3 Likes

@OvermindDL1 has a good suggestion and 40,000 rows is a small result set even if you are running on a very constrained system. Cursor has not much utility beyond using it inside Postgres functions it seams.

2 Likes

This seems like a good case for GenStage.

You could create a producer that queries your table using a LIMIT and passes those records on to the consumer which makes the call to the Google Gmail API.

The only change this would require is your add limit to your table query:

def get(limit) do
  Repo.all(
   from t in MyTable,
      limit: ^limit,
      select: t
   )
end

Check out this video for a simple application of the concept.

4 Likes

IMO stream is the way to go. You need to run inside a transaction but that should not be a problem. You should be able to do neat things such as:

Repo.stream(...)
|> Task.async_stream(&deliver/1, max_concurrency: 10)
|> Stream.run

and that will get emails in batches of 10 and invoke the google email api concurrently. It requires Elixir v1.4.

27 Likes

@OvermindDL1, @brightball, @andre1sk, @wfgilman, @josevalim

Thank you all for your comments.

I will give it a try with Ecto.Stream and post final code for every one to see.

Best regards.

1 Like

After hitting transaction timeouts with Repo.stream we switched to a pagination with Scriviner.
The interface is emulating Stream.chunk but uses a query as the source:

@doc """
Stream chunks of results from the given queryable.

Unlike Repo.stream, this function does not keep a long running transaction open.
Hence, consistency is not guarenteed in the presence of rows being deleted or sort criteria changing.

## Example

  Ecto.Query.from(u in Users, order_by: [asc: :created_at])
  |> Repo.chunk(100)
  |> Stream.map(&process_batch_of_users)
  |> Stream.run()
"""
@spec chunk(Ecto.Queryable.t, integer) :: Stream.t
def chunk(queryable, chunk_size) do
  chunk_stream = Stream.unfold(1, fn page_number ->
    page = Repo.paginate(queryable, page: page_number, page_size: chunk_size)
    {page.entries, page_number + 1}
  end)
  Stream.take_while(chunk_stream, fn [] -> false; _ -> true end)
end
8 Likes

You can increase the timeout for the transaction where streaming happens:

Repo.transaction(fn -> ... end, timeout: some_huge_number)

You need to remember that LIMIT + OFFSET pagination is prone to race conditions - it may happen that you’ll miss some rows or see rows that you shouldn’t. Database-level cursor gives a consistent view of data.

11 Likes

@michalmuskala yes we definitely reached for Repo.stream as the first option, but in our case the query is processing ~1M records, making HTTP requests for a subset that match some criteria.
The total time could run for hours, so we opted for the pagination approach. The query orders by record insertion time set by db trigger, and no records are deleted, so in our particular use case the race conditions aren’t a problem.

6 Likes

awesome. how is this render in the phoenix view?

1 Like

Shouldn’t the primary key order by insertion time automatically?

1 Like

We used this approach in a background job. For a Phoenix view, I’d probably just use Scriviner with query params for page number and size.

2 Likes

Sequential ids would work, but we used uuids for the primary key on that table IIRC.

2 Likes

LIMIT + OFFSET is also linear. When getting the last 100 in the 1 million, you will go through the first 99900. Using an ID or a stream with a timeout of :infinity would be preferred I would say.

9 Likes

thanks. that was useful

1 Like

Detailed explanation there: http://use-the-index-luke.com/no-offset

Short version, when iterating over the records get the max range and the min range to make sure it’s used in your where clause. This trims the result set BEFORE sorting. Using LIMIT and OFFSET means the entire possible result has to be loaded and sorted to calculate which chunk of them is coming back.

10 Likes

Thanks @josevalim, TIL!

There are some trade-offs with long running transactions also:

  • In general, keeping transactions open for longer means fewer available connections in the pool, limiting the number of concurrent requests that can be served.
  • Postgres wont vacuum any records that have been updated/deleted if there is any open transaction older than those records.
  • Row locks will be held until transaction completes

If none of these are problematic then I agree Repo.stream with an infinite transaction timeout is a very clean solution.

How about selecting the data into a temporary table with sequential primary key?
You could then use where conditions to read the data in chunks, without being prone to race conditions or inefficient querying.

3 Likes

Your chunk function is very nice. A possible small improvement: return an error if queryable.order_bys is [], since (as you said) pagination will be unpredictable without a consistent sort criteria.

Also, if you just want to know the absolute fastest way to get the data out in the smallest footprint, you’ll want to use Postgres COPY. It’s the fastest way to get data into or out of PG (by a wide margin).

Probably not what you really need in this particular situation, but if you really wanted to you could use it to write the data to a file (like a cache) and then stream over the lines of the file.

7 Likes