I’m working to update a bunch of records and am seeing some unexpected behavior after attempting to leverage Streamp.resource/3
as found in Programming Ecto 3.0. For some reason, regardless of the number of rows I have in the DB only half of the rows returned in the query will be updated. Once the batch that exceeds 50% of the original query is updated the next batch returns 0 rows and the task ends.
defmodule MyApp.StreamQuery do
import Ecto.Query
alias MyApp.Repo
def run(query, batch_size, opts \\ []) do
Stream.resource(
fn -> 0 end,
fn
:stop ->
{:halt, :stop}
offset ->
rows =
query
|> limit(^batch_size)
|> offset(^offset)
|> Repo.all(opts)
if Enum.count(rows) < batch_size, do: {rows, :stop}, else: {rows, offset + batch_size}
end,
fn _ -> :ok end
)
end
end
So this test will fail with 100 records that haven’t been updated.
test "updates all records when total rows exceeds 2x given StreamQuery limit" do
for _ <- 1..300, do: insert(:favorite, viewed_by_customer: false)
query = where(Deal, [d], not d.viewed_by_customer)
query
|> MyApp.StreamQuery.run(100, timeout: :infinity)
|> Stream.map(&mark_deal_as_viewed(&1))
|> Stream.run()
assert Repo.aggregate(query, :count, :id) == 0
end
end
def mark_deal_as_viewed(%Deal{} = deal) do
deal
|> Deal.viewed_by_customer_changeset(%{viewed_by_customer: true})
|> Repo.update!()
end