What is the best approach for fetching large amount of records from postgresql with ecto

Used this approach with a “cron job” problem. There was one very difficult stage in our implementation where it makes an HTTP request using the information retrieved from the db. If for some reason the HTTP call times out and raises an exception, the whole thing is brought down. The stage processes are restarted but it doesn’t recover the “job” midway. The job is basically stopped. It doesn’t even redo it from start. It’s cool when it works though.

1 Like

Nowadays Scriviner added new option: allow_overflow_page_number, which defaults to false.
This chunk function is relying on empty results to stop streaming data, but with this option empty result is impossible.
To use this chunk function, you need to override this option, like this:

        page =
          paginate(queryable,
            page: page_number,
            page_size: chunk_size,
            options: [allow_overflow_page_number: true]
          )
3 Likes

There is another possibility no one mentioned. You can turn the database table into a work queue itself.

I am currently in a process of doing so on a project, where we initially had the solution @josevalim presented here, i.e. Transaction + Repo.stream + async workers, but we’re ditching it for the reasons of keeping the connection while it’s not necessary to, but also for the reason that this causes race condition if you have 2+ nodes on the cluster. So you have to make sure this process that iterates over database records is initialized on one of the nodes, which requires something like global registration and just complicates things a lot.

So, in your case you could, I think, simply use Honeydew + Ecto queue. https://github.com/koudelka/honeydew I think @koudelka could confirm since he’s on the forum, but that is going to work in your case, and I think pretty well. What it does is adds couple of fields to the table and then does a smart update to lock the next available row, it can also handle retries and failed jobs etc. fairly easily.

We actually used Honeydew on the same project before, replaced it with Exq for the most part, but we are re-introducing Honeydew-like behavior by doing what it does, just slightly differently. We need a bit more flexibility in how and which records we pick, and Honeydew really assumes you want to process all records one by one after they been inserted, and we need something different: processing subset of records at undefined moment in time in future when some conditions change (account subscription status to be precise). So we will not use Honeydew but basically do the same it does in order to lock rows and go one by one through the queue.

For the reference, these are the building blocks you need to build the queue:
https://dba.stackexchange.com/a/69497 - check out the section that talks about “FOR UPDATE SKIP LOCKED” as this is crucial to building queue system on top of PostgreSQL without race conditions and deadlocks, and also how they do it in Honeydew

And if you have a process that goes like that over list of rows from database, fetching and locking one after another - there is no reason why you can’t make this parallel also throughout the cluster. You just start a supervisor that starts say 5 workers on each node on the cluster, and you can have 5* N nodes workers without actually blocking 5 * N database connections.

I will probably put together a blog post once this is running on our end, but that won’t be today :slight_smile:

6 Likes

Yep! This is the sort of scenario that Honeydew’s Ecto Queue was written for, the case where there’s a strictly 1-to-1 relationship between a database row and a job (or a set of different kinds of jobs). As you mentioned, Honeydew will handle failure + (delayed) retry + success bits, as well as manage workers in the cluster.

In @joaquinalcerro’s case, each Honeydew worker could hold a persistent connection to the Gmail API. Alternatively, the workers can check out connections from a pool, if the additional complexity is worth it.

Regarding your use case, have you seen Honeydew’s Execution Criteria? You can provide an SQL fragment to tell the queue to only run jobs when they meet a certain condition. For example, when subscription_status = 'paid'.

Honeydew doesn’t require that jobs be run as soon as their associated row is inserted, they’ll be executed if one hasn’t been run successfully before, and if the row meets the criteria.

Definitely reach out if I can help! :slight_smile:

4 Likes

As a workaround for this problem I recommend streaming rows using cursor-based pagination.

Then:

  • you don’t need to wrap the streaming operation with a database transaction (That’s how Repo.stream does it - and it can cause your database to explode, given your dataset is large enough)
  • it’ll work with milions of rows (offset-based pagination through e.g. Scriviner will cause queries to last longer and longer with each further page)

For simplicity we published a small library called ecto_cursor_based_stream that gives you a Repo.cursor_based_stream/2 function that mimics the Repo.stream/2 interface. Feel free to use it :slight_smile:

You are not using database cursors but rather several queries over an indexed increasing field, correct? I was just a bit confused by the name

1 Like

Yes.

P.S. Didn’t know database cursors are a thing. Curious if anybody’s using that in production. (How easy would to encapsulate that with a library? Would it be better?)

  • It would be less compatible as not every database supports DB cursors.
  • When there is a need to optimize further than userland cursor pagination, a db-specific lib will probably not suffice.

Hence the DB cursors are less known.

1 Like