Ecto multiple streams in 1 transaction

Background

PS: the following situation describes an hypothetical scenario, where I own a company that sells things to customers.

I have an Ecto query that is so big, that my machine cannot handle it. With billions of results returned, there is probably not enough RAM in the world that can handle it.

The solution here (or so my research indicates) is to use streams. Streams were made for potentially infinite sets of results, which would fit my use case.

https://hexdocs.pm/ecto/Ecto.Repo.html#c:stream/2

Problem

So lets imagine that I want to delete All users that bought a given item. Maybe that item was not really legal in their country, and now me, the poor guy in IT, has to fix things so the world doesn’t come down crashing.

Naive way:

item_id = "123asdasd123"

purchase_ids =
      Purchases
      |> where([p], p.item_id == ^item_id)
      |> select([p], p.id)
      |> Repo.all()

Users
    |> where([u], u.purchase_id in ^purchase_ids)
    |> Repo.delete_all()

This is the naive way. I call it naive, because of 2 issues:

What can I say, our product is highly addictive and very well priced!
Our customers simply cant get enough of it. Don’t know why. Nope. No reason comes to mind. None at all.

With these problems in mind, I cannot help my customers and grow my empire, I mean, little home owned business.

I did find this possible solution:

Stream way:

item_id = "123asdasd123"

purchase_ids =
      Purchases
      |> where([p], p.item_id == ^item_id)
      |> select([p], p.id)

stream = Repo.stream(purchase_ids)

Repo.transacion(fn -> 
  ids = Enum.to_list(stream)

  Users
    |> where([u], u.purchase_id in ^ids)
    |> Repo.delete_all()
end)

Questions

However, I am not convinced this will work:

  • I am using Enum.to_list and saving everything into a variable, placing everything into memory again. So I am not gaining any advantage by using Repo.stream.
  • I still have too many ids for my Repo.delete_all to work without blowing up

I guess the one advantage here is that this now a transaction, so either everything goes or nothing goes.

So, the following questions arise:

  • How do I properly make use of streams in this scenario?
  • Can I delete items by streaming parameters (ids) or do I have to manually batch them?
  • Can I stream ids to Repo.delete_all ?

I know it might not be the solution you’re looking for, but instead of two separate queries you could use subqueries to make them a single, but nested query. No need to send the results of the first query back to elixir in the first place.

1 Like

Given the the final objective is to perform a deletion, and that POSTGRES SQL has a limitation for deletes that basically forces me to perform 2 queries (the first where I get the ids, and the second where I delete the users) I don’t see how I could implement your solution.

Could you elaborate more? I am genuinely curious.

The SQL for this would be:

DELETE FROM users u
USING purchases p
WHERE u.purchase_id = p.id and p.item_id = $1

In Ecto you can do this as simply as:

query = from u in Users,
  join: p in assoc(u, :purchase),
  where: p.item_id == ^item_id

Repo.delete_all(query)
5 Likes

What @benwilson512 suggested is even better than a subquery, but for more information on subqueries see:

https://hexdocs.pm/ecto/3.8.4/Ecto.Query.html#subquery/1

2 Likes

But wont this still blow up if the results dont fit into memory ?
Or is Repo.delete_all free from memory?

What results are you expecting from delete_all?

The results do not go to ecto at all, it runs a DELETE on the postgres side. Postgres is tuned to deal with essentially arbitrary amounts of data through a mix of temporary working spaces in both systems ram and disk. A properly tuned postgres will not run out of memory no matter how many records you are deleting.

1 Like

It is my understanding that if possible delete_all will return {non_neg_integer(), nil | [term()]}, where term is the deleted item, if the DB beneath supports it, which in my case does.

Am I missing something there?

@benwilson512 Comparing this solution to another solution using streams, which one would you pick?
For the sake of this post, I will copy a solution from SO:

Repo.transacion(fn ->
  max_rows = 500

  purchase_ids
  |> Repo.stream(max_rows: max_rows)
  |> Stream.chunk_every(max_rows)
  |> Stream.each(fn ids ->
     Users
     |> where([u], u.purchase_id in ^ids)
     |> Repo.delete_all()
  end)
  |> Stream.run()
end, timeout: :infinity)

The original post can be found:

The streaming solution sounds pretty awesome to me, and given both do the same, which one would have more resilience to failure?
(I know that transactions hold a connection, but I am not near the connection limit)

What you’re missing is that it is only returned if you ask it to be returned. By default, it returns nothing at all except a count of the entities deleted.

3 Likes

In regards to the query:

query = from u in Users,
  join: p in assoc(u, :purchase),
  where: p.item_id == ^item_id

Repo.delete_all(query)

I get the error:

** (Ecto.QueryError) iex:41: could not find association `purchase` on schema Users in query:
...

Is there some alteration I need to do to the schema of Users or Purchase?
The SQL code did work though, so I am having a hard trouble understanding why the Ecto one doesn’t. I am assuming this is because I have an issue in some schema?

Also, I cannot use:

query = from u in Users,
  join: p in assoc(u, Purchase),
  where: p.item_id == ^item_id

even though Purchase is aliased, as this will give me a different error (basically I understand this only works with atoms).

Yeah I’m making some assumptions about your User schema cause it isn’t posted here. I’m assuming that if you have a purchase_id then you’d have a belongs_to :purchase, Purchase. If that isn’t true then you’ll need to change the join.

2 Likes

The second element on the tuple will only be a list of values if you explicitly ask for returned values.

1 Like

Because of the circumstances in which I am working, changing the schema is not ideal. Therefore I ended up going with the solution I mentioned from Stack Overflow.

However, I will still pick @benwilson512 's solution as the final one in this post, since it is a great solution that would likely be more readable if not for the limitations of my use case.

I would also like to thank everyone that participated in this thread and corrected some of my incorrect beliefs of how the system works. This is how I grow, and I thank everyone for taking the time to point out.

Notably, you can just write the join out by hand if you have the column and can’t change the schema:

query = from u in Users,
  join: p in Purchase, on: p.id == u.purchase_id,
  where: p.item_id == ^item_id
2 Likes

(EDIT: Sorry, just now saw you actually know about this. Still, leaving it in).


While I completely support @benwilson512’s recommendation and I feel the same – that PG should be able to handle a humongous DELETE command – I’ll still give you an alternative if you truly feel you want an iterable solution in Elixir. Taking the latter part of your “Stream way” snippet and modifying it:

Repo.transaction(fn -> 
  stream
  |> Stream.chunk_every(1000)
  |> Enum.each(fn batch ->
    Users
    |> where([u], u.purchase_id in ^ids)
    |> Repo.delete_all()
  end)
end, timeout: :infinity) # or something like two hours?

You are right to be worried that Enum.to_list might crash the VM. If you suspect anything more than 5_000 records, usually it’s not at all recommended to try and get them into memory because e.g. big binaries might stick around and render garbage collecting ineffective while still having a BEAM VM OS process taking a huge amount of RAM.

Thus, use Stream as intended and chew through work at your own pace. You could also do an SQL count query beforehand and have a progress % while you’re deleting the records in batches.

1 Like