I am working with Ecto-Postgresql and I need to call all of the records from a specific table but the table has 40,000 records approximately (15MB).
Each record in the table has a email message id with which I have to call Google Gmail API to get the message.
So I am thinking about memory efficient. What is the most efficient way to call all the records with the minimum impact in memory or not loading the hole 15MB at once just to make API calls?
I found Ecto Stream but it works inside a transaction and I don’t use transactions. And… is Ecto Stream the best way to go?
Well you could just request back that specific email message id instead of the entire row, just a simple list back would be quite tiny, as well as even if you got it all 15megs would still not be a lot.
I haven’t ever done it with Ecto, but a PostgreSQL cursor could be a good option. The less complicated is just to query chunks of ids up to the max amount to avoid having to get them all in one query, deal with order by, etc.
@OvermindDL1 has a good suggestion and 40,000 rows is a small result set even if you are running on a very constrained system. Cursor has not much utility beyond using it inside Postgres functions it seams.
You could create a producer that queries your table using a LIMIT and passes those records on to the consumer which makes the call to the Google Gmail API.
The only change this would require is your add limit to your table query:
def get(limit) do
Repo.all(
from t in MyTable,
limit: ^limit,
select: t
)
end
Check out this video for a simple application of the concept.
After hitting transaction timeouts with Repo.stream we switched to a pagination with Scriviner.
The interface is emulating Stream.chunk but uses a query as the source:
@doc """
Stream chunks of results from the given queryable.
Unlike Repo.stream, this function does not keep a long running transaction open.
Hence, consistency is not guarenteed in the presence of rows being deleted or sort criteria changing.
## Example
Ecto.Query.from(u in Users, order_by: [asc: :created_at])
|> Repo.chunk(100)
|> Stream.map(&process_batch_of_users)
|> Stream.run()
"""
@spec chunk(Ecto.Queryable.t, integer) :: Stream.t
def chunk(queryable, chunk_size) do
chunk_stream = Stream.unfold(1, fn page_number ->
page = Repo.paginate(queryable, page: page_number, page_size: chunk_size)
{page.entries, page_number + 1}
end)
Stream.take_while(chunk_stream, fn [] -> false; _ -> true end)
end
You need to remember that LIMIT + OFFSET pagination is prone to race conditions - it may happen that you’ll miss some rows or see rows that you shouldn’t. Database-level cursor gives a consistent view of data.
@michalmuskala yes we definitely reached for Repo.stream as the first option, but in our case the query is processing ~1M records, making HTTP requests for a subset that match some criteria.
The total time could run for hours, so we opted for the pagination approach. The query orders by record insertion time set by db trigger, and no records are deleted, so in our particular use case the race conditions aren’t a problem.
LIMIT + OFFSET is also linear. When getting the last 100 in the 1 million, you will go through the first 99900. Using an ID or a stream with a timeout of :infinity would be preferred I would say.
Short version, when iterating over the records get the max range and the min range to make sure it’s used in your where clause. This trims the result set BEFORE sorting. Using LIMIT and OFFSET means the entire possible result has to be loaded and sorted to calculate which chunk of them is coming back.
There are some trade-offs with long running transactions also:
In general, keeping transactions open for longer means fewer available connections in the pool, limiting the number of concurrent requests that can be served.
Postgres wont vacuum any records that have been updated/deleted if there is any open transaction older than those records.
Row locks will be held until transaction completes
If none of these are problematic then I agree Repo.stream with an infinite transaction timeout is a very clean solution.
How about selecting the data into a temporary table with sequential primary key?
You could then use where conditions to read the data in chunks, without being prone to race conditions or inefficient querying.
Your chunk function is very nice. A possible small improvement: return an error if queryable.order_bys is [], since (as you said) pagination will be unpredictable without a consistent sort criteria.
Also, if you just want to know the absolute fastest way to get the data out in the smallest footprint, you’ll want to use Postgres COPY. It’s the fastest way to get data into or out of PG (by a wide margin).
Probably not what you really need in this particular situation, but if you really wanted to you could use it to write the data to a file (like a cache) and then stream over the lines of the file.