How to use Ecto.Repo.stream without preloading data

In my system, I have two tables, A and B where A has a foreign key to B.

I also have a processing step that will link a row in table A with a row in table B, or, in case no row in B matches the criteria, a new row in B is added.

To do that, basically I created a query that will do a left lateral join from A to B with my criteria, and then, if that join is nil, I will insert a new row in B.

Since I have millions of rows, I want to do that process in bulk, to do that I have the following code:

query
|> Repo.stream()
|> Stream.chunk_every(2)
|> Stream.map(&process/1)
|> Stream.each(&save/1)
|> Stream.run()

As you can see, I chunk the results by 2 rows (in prod that would be 1000 or higher, I’m using 2 here just to ilustrate the issue). The problem is that this will fail in the following scenario:

Let’s say I have 2 chunks to process (3 or 4 rows), in my first chunk, one of the rows (A1) can’t find a row in B using some criteria (in other words, the join returned nil). In this case, when I reach the &save/1 step, I will create that new B row (B1) and insert into the B table.

Now, let’s say that the second chunk also have a row with the same criteria as A1, in this case, the join should return B1, but it will actually return nil.

The only reason that I see for that to happen is because the Repo.stream call is getting more rows (more than 2) before the stream requests the next chunk, meaning that it will actually query the second chunk data before it finished processing the first chunk, so B1 doesn’t exists yet.

Repo.stream has a max_rows options, but even if I set it to 2 or 1, the problem persists.

Any idea on how I can solve that?

Repo.stream is using a cursor, and at least on Postgres cursors don’t reflect changes to the underlying data once the cursor is opened:

In particular, the INSENSITIVE / ASENSITIVE option:

Cursor sensitivity determines whether changes to the data underlying the cursor, done in the same transaction, after the cursor has been declared, are visible in the cursor. INSENSITIVE means they are not visible, ASENSITIVE means the behavior is implementation-dependent. A third behavior, SENSITIVE , meaning that such changes are visible in the cursor, is not available in PostgreSQL. In PostgreSQL, all cursors are insensitive; so these key words have no effect and are only accepted for compatibility with the SQL standard.


Beware doing millions of updates in a single transaction; it can cause all sorts of headaches.

2 Likes

Ah, of course, that makes total sense! Thanks a lot :grin: