Unusual behavior using Stream.resource/3 while attempting to update records in batches

I’m working to update a bunch of records and am seeing some unexpected behavior after attempting to leverage Streamp.resource/3 as found in Programming Ecto 3.0. For some reason, regardless of the number of rows I have in the DB only half of the rows returned in the query will be updated. Once the batch that exceeds 50% of the original query is updated the next batch returns 0 rows and the task ends.

defmodule MyApp.StreamQuery do
  import Ecto.Query
  alias MyApp.Repo
  def run(query, batch_size, opts \\ []) do
    Stream.resource(
      fn -> 0 end,
      fn
        :stop ->
          {:halt, :stop}
        offset ->
          rows =
            query
            |> limit(^batch_size)
            |> offset(^offset)
            |> Repo.all(opts)
          if Enum.count(rows) < batch_size, do: {rows, :stop}, else: {rows, offset + batch_size}
      end,
      fn _ -> :ok end
    )
  end
end

So this test will fail with 100 records that haven’t been updated.

test "updates all records when total rows exceeds 2x given StreamQuery limit" do
      for _ <- 1..300, do: insert(:favorite, viewed_by_customer: false)
      query = where(Deal, [d], not d.viewed_by_customer)
      query
      |> MyApp.StreamQuery.run(100, timeout: :infinity)
      |> Stream.map(&mark_deal_as_viewed(&1))
      |> Stream.run()
      assert Repo.aggregate(query, :count, :id) == 0
    end
  end
  def mark_deal_as_viewed(%Deal{} = deal) do
    deal
    |> Deal.viewed_by_customer_changeset(%{viewed_by_customer: true})
    |> Repo.update!()
  end

Have you inspected what gets emitted at runtime, probably with a much smaller limit than 100 items?

Have you inspected what gets emitted at runtime, probably with a much smaller limit than 100 items?

Yes, I’ve been playing around with it all morning. The limit and offset are always calculated/returned correctly but for some reason the return from the call to Repo.all(limimted_query, opts) is inconsistent and unexpected, returning an empty list once we pass the 50% threshold of the original query. We are using UUIDs as the primary key on this table but I am unaware of any complications that would result.

If we modify the module to only stop if a call to Repo.aggreate(query, :count, :id) is less than the batch size we are stuck in an infinite loop. This means the call to Repo.all(limited_query) and Repo.aggregate(query,:count, :id) are retuning different results when I would expect the aggregate of the original query(without limit/offset) to be reduced after each batch.

Function updates:

    Stream.resource(
      fn -> 0 end,
      fn
        :stop ->
          {:halt, :stop}

        offset ->
          limited_query =
            query
            |> limit(^batch_size)
            |> offset(^offset)

          rows = Repo.all(limited_query, opts)
          IO.inspect(Enum.count(rows), label: :row_count)
          agg = Repo.aggregate(query, :count, :id)
          IO.inspect(agg, label: :agg_count)
          if agg < batch_size, do: {rows, :stop}, else: {rows, offset + batch_size}
      end,
      fn _ -> :ok end
    )
  end

Test updates:

    test "updates all records when total rows exceeds 2x given StreamQuery limit" do
      insert_list(10, :favorite)
      query = where(Deal, [d], not d.viewed_by_customer)

      query
      |> StreamQuery.run(2, timeout: :infinity)
      |> Stream.map(&Customers.mark_deal_as_viewed(&1))
      |> Stream.run()

      assert Repo.aggregate(query, :count, :id) == 0
    end

Inspect results:

row_count: 0
agg_count: 4

So the question then becomes “What would cause the limit and offset in the queries and calls to Repo.all and Repo.aggregate to return different results?”.

Are you using Scrivener, by the way? The library that adds pagination to your Repo?

You aren’t specifying an order. limit + offset will not behave in a very predictable manner if you don’t consistently order the records. Try order_by(asc: :id).

1 Like

Are you using Scrivener, by the way? The library that adds pagination to your Repo?

We do use it in our app but I don’t see the benefit here.

You aren’t specifying an order. limit + offset will not behave in a very predictable manner if you don’t consistently order the records. Try order_by(asc: :id) .

My apologies, it looks like the order_by was omitted by mistake, I have been playing around and making minor modifications to see if different results were produced. Unfortunately, this still results in records not being updated:

  def run(query, batch_size, opts \\ []) do
    Stream.resource(
      fn -> 0 end,
      fn
        :stop ->
          {:halt, :stop}

        offset ->
          limited_query =
            query
            |> limit(^batch_size)
            |> offset(^offset)
            |> order_by(asc: :id)

          rows = Repo.all(limited_query, opts)
          IO.inspect(Enum.count(rows), label: :row_count)
          agg = Repo.aggregate(query, :count, :id)
          IO.inspect(agg, label: :agg_count)
          if agg < batch_size, do: {rows, :stop}, else: {rows, offset + batch_size}
      end,
      fn _ -> :ok end
    )
  end

FWIW this is the orig solution that produces the same results:

  def run(query, batch_size, opts \\ []) do
    batches_stream =
      Stream.unfold(0, fn
        :done ->
          nil

        offset ->
          results =
            Repo.all(from(_ in query, order_by: :id, offset: ^offset, limit: ^batch_size), opts)

          if length(results) < batch_size do
            {results, :done}
          else
            {results, offset + batch_size}
          end
      end)

    Stream.concat(batches_stream)
  end

As perhaps a tangent, If you have enough records where you don’t want to load them all in memory, I’d strongly consider writing this as a dedicated Repo.update_all call, instead of performing N updates.

As perhaps a tangent, If you have enough records where you don’t want to load them all in memory, I’d strongly consider writing this as a dedicated Repo.update_all call, instead of performing N updates.

Definitely will keep in mind for cases where Repo.update_all/3 is an option, thanks! We have a few cases where we need to do some mathematical calculations on around 30k records and then persist the updates. While I still haven’t been able to identify the issue with the queries above we are evaluating a couple different options and I’m curious which route you’d go and why.

  1. Use Repo.stream and Task.async_stream to make the updates:
stream =
      query
      |> Repo.stream()
      |> Task.async_stream(&update_record(&1),
        max_concurrency: System.schedulers_online() * 2,
        ordered: false,
        timeout: :infinity
      )

    Repo.transaction(fn -> Stream.run(stream) end, timeout: :infinity)
  1. Gather all of the records in memory and then process them with Flow:
   query
    |> Repo.all(timeout: :infinity)
    |> Flow.from_enumerable(min_demand: 2, max_demand: 10)
    |> Flow.map(&update_record/1)
    |> Flow.run()
  1. ???
1 Like

The problem here might lie in the fact that you’re changing the value of the field which is also used to filter the records (viewed_by_customer). So e.g. let’s say you fetch the first 100 unviewed records and mark them as viewed. If I get it right, next time you’ll look for the second batch of 100 unviewed records. But the thing is that you really want to look for the first 100 unviewed records. So in this particular case, it seems that offset is redundant.

2 Likes

The problem here might lie in the fact that you’re changing the value of the field which is also used to filter the records ( viewed_by_customer ). So e.g. let’s say you fetch the first 100 unviewed records and mark them as viewed. If I get it right, next time you’ll look for the second batch of 100 unviewed records. But the thing is that you really want to look for the first 100 unviewed records. So in this particular case, it seems that offset is redundant.

I thought I was overlooking something simple. Thanks so much for taking the time, I really appreciate it!

1 Like