What is the best approach for fetching large amount of records from postgresql with ecto

@OvermindDL1 has a good suggestion and 40,000 rows is a small result set even if you are running on a very constrained system. Cursor has not much utility beyond using it inside Postgres functions it seams.

2 Likes

This seems like a good case for GenStage.

You could create a producer that queries your table using a LIMIT and passes those records on to the consumer which makes the call to the Google Gmail API.

The only change this would require is your add limit to your table query:

def get(limit) do
  Repo.all(
   from t in MyTable,
      limit: ^limit,
      select: t
   )
end

Check out this video for a simple application of the concept.

4 Likes

IMO stream is the way to go. You need to run inside a transaction but that should not be a problem. You should be able to do neat things such as:

Repo.stream(...)
|> Task.async_stream(&deliver/1, max_concurrency: 10)
|> Stream.run

and that will get emails in batches of 10 and invoke the google email api concurrently. It requires Elixir v1.4.

26 Likes

@OvermindDL1, @brightball, @andre1sk, @wfgilman, @josevalim

Thank you all for your comments.

I will give it a try with Ecto.Stream and post final code for every one to see.

Best regards.

1 Like

After hitting transaction timeouts with Repo.stream we switched to a pagination with Scriviner.
The interface is emulating Stream.chunk but uses a query as the source:

@doc """
Stream chunks of results from the given queryable.

Unlike Repo.stream, this function does not keep a long running transaction open.
Hence, consistency is not guarenteed in the presence of rows being deleted or sort criteria changing.

## Example

  Ecto.Query.from(u in Users, order_by: [asc: :created_at])
  |> Repo.chunk(100)
  |> Stream.map(&process_batch_of_users)
  |> Stream.run()
"""
@spec chunk(Ecto.Queryable.t, integer) :: Stream.t
def chunk(queryable, chunk_size) do
  chunk_stream = Stream.unfold(1, fn page_number ->
    page = Repo.paginate(queryable, page: page_number, page_size: chunk_size)
    {page.entries, page_number + 1}
  end)
  Stream.take_while(chunk_stream, fn [] -> false; _ -> true end)
end
8 Likes

You can increase the timeout for the transaction where streaming happens:

Repo.transaction(fn -> ... end, timeout: some_huge_number)

You need to remember that LIMIT + OFFSET pagination is prone to race conditions - it may happen that you’ll miss some rows or see rows that you shouldn’t. Database-level cursor gives a consistent view of data.

10 Likes

@michalmuskala yes we definitely reached for Repo.stream as the first option, but in our case the query is processing ~1M records, making HTTP requests for a subset that match some criteria.
The total time could run for hours, so we opted for the pagination approach. The query orders by record insertion time set by db trigger, and no records are deleted, so in our particular use case the race conditions aren’t a problem.

6 Likes

awesome. how is this render in the phoenix view?

1 Like

Shouldn’t the primary key order by insertion time automatically?

1 Like

We used this approach in a background job. For a Phoenix view, I’d probably just use Scriviner with query params for page number and size.

2 Likes

Sequential ids would work, but we used uuids for the primary key on that table IIRC.

2 Likes

LIMIT + OFFSET is also linear. When getting the last 100 in the 1 million, you will go through the first 99900. Using an ID or a stream with a timeout of :infinity would be preferred I would say.

9 Likes

thanks. that was useful

1 Like

Detailed explanation there: http://use-the-index-luke.com/no-offset

Short version, when iterating over the records get the max range and the min range to make sure it’s used in your where clause. This trims the result set BEFORE sorting. Using LIMIT and OFFSET means the entire possible result has to be loaded and sorted to calculate which chunk of them is coming back.

9 Likes

Thanks @josevalim, TIL!

There are some trade-offs with long running transactions also:

  • In general, keeping transactions open for longer means fewer available connections in the pool, limiting the number of concurrent requests that can be served.
  • Postgres wont vacuum any records that have been updated/deleted if there is any open transaction older than those records.
  • Row locks will be held until transaction completes

If none of these are problematic then I agree Repo.stream with an infinite transaction timeout is a very clean solution.

How about selecting the data into a temporary table with sequential primary key?
You could then use where conditions to read the data in chunks, without being prone to race conditions or inefficient querying.

3 Likes

Your chunk function is very nice. A possible small improvement: return an error if queryable.order_bys is [], since (as you said) pagination will be unpredictable without a consistent sort criteria.

Also, if you just want to know the absolute fastest way to get the data out in the smallest footprint, you’ll want to use Postgres COPY. It’s the fastest way to get data into or out of PG (by a wide margin).

Probably not what you really need in this particular situation, but if you really wanted to you could use it to write the data to a file (like a cache) and then stream over the lines of the file.

7 Likes

Used this approach with a “cron job” problem. There was one very difficult stage in our implementation where it makes an HTTP request using the information retrieved from the db. If for some reason the HTTP call times out and raises an exception, the whole thing is brought down. The stage processes are restarted but it doesn’t recover the “job” midway. The job is basically stopped. It doesn’t even redo it from start. It’s cool when it works though.

1 Like

Nowadays Scriviner added new option: allow_overflow_page_number, which defaults to false.
This chunk function is relying on empty results to stop streaming data, but with this option empty result is impossible.
To use this chunk function, you need to override this option, like this:

        page =
          paginate(queryable,
            page: page_number,
            page_size: chunk_size,
            options: [allow_overflow_page_number: true]
          )
3 Likes

There is another possibility no one mentioned. You can turn the database table into a work queue itself.

I am currently in a process of doing so on a project, where we initially had the solution @josevalim presented here, i.e. Transaction + Repo.stream + async workers, but we’re ditching it for the reasons of keeping the connection while it’s not necessary to, but also for the reason that this causes race condition if you have 2+ nodes on the cluster. So you have to make sure this process that iterates over database records is initialized on one of the nodes, which requires something like global registration and just complicates things a lot.

So, in your case you could, I think, simply use Honeydew + Ecto queue. https://github.com/koudelka/honeydew I think @koudelka could confirm since he’s on the forum, but that is going to work in your case, and I think pretty well. What it does is adds couple of fields to the table and then does a smart update to lock the next available row, it can also handle retries and failed jobs etc. fairly easily.

We actually used Honeydew on the same project before, replaced it with Exq for the most part, but we are re-introducing Honeydew-like behavior by doing what it does, just slightly differently. We need a bit more flexibility in how and which records we pick, and Honeydew really assumes you want to process all records one by one after they been inserted, and we need something different: processing subset of records at undefined moment in time in future when some conditions change (account subscription status to be precise). So we will not use Honeydew but basically do the same it does in order to lock rows and go one by one through the queue.

For the reference, these are the building blocks you need to build the queue:
https://dba.stackexchange.com/a/69497 - check out the section that talks about “FOR UPDATE SKIP LOCKED” as this is crucial to building queue system on top of PostgreSQL without race conditions and deadlocks, and also how they do it in Honeydew

And if you have a process that goes like that over list of rows from database, fetching and locking one after another - there is no reason why you can’t make this parallel also throughout the cluster. You just start a supervisor that starts say 5 workers on each node on the cluster, and you can have 5* N nodes workers without actually blocking 5 * N database connections.

I will probably put together a blog post once this is running on our end, but that won’t be today :slight_smile:

6 Likes