What could be the best way to fetch large data from database? Is it using Stream.resource or Repo.stream and Repo.transaction?

I am planning to read large data from a database for a report, what could be the best way of getting these reports? Is it via Stream.resource where I could set a limit and offset or Repo.stream inside a Repo.transaction?

1 Like

Repo.stream has never failed me. Use that.

You could also fetch with a key to control pagination i.e. values of the primary key.

2 Likes

Thank you

1 Like

Repo.stream is great, but sometimes you need to process hundreds of thousand to millions of records over minutes of time, and the issues of doing that in a single transaction are too great. For that we have this function in our Repo module:

  @doc """
  An alternative to Repo.stream that does not require a transaction.

  On the upside, this allows you to iterate through large collections that may require processing longer
  than is convenient / safe for a single transaction. The target use case here is background jobs
  that periodically comb through large tables and do various updates and interdependent transactions
  aren't relevant.

  The caveat of course is that you aren't operating on a single snapshot of the collection. The way this works
  is that the first N records are fetched, sorted by `asc: :id`. Then the next N records are fetched where
  the id is greater than the biggest id fetched previously.
  """
  def dirty_stream(queryable, opts \\ []) do
    import Ecto.Query

    # TODO: Raise if `queryable` has an `order_by` as we need to override that
    # TODO: Raise if `queryable` has a limit since we also need to override that.

    i = opts[:start_id] || -1
    batch_size = opts[:batch_size] || 100
    mapper = opts[:mapper] || fn record -> record.id end

    Stream.unfold(i, fn i ->
      queryable
      |> where([q], q.id > ^i)
      |> limit(^batch_size)
      |> order_by(asc: :id)
      |> __MODULE__.all()
      |> case do
        [] ->
          nil

        records ->
          new_max_id = records |> List.last() |> mapper.()
          {records, new_max_id}
      end
    end)
    |> Stream.flat_map(& &1)
  end

This is particularly handy when you need to bulk process stuff with preloads since you get a lot of economies of scale, but you can still deal with each record at a time.

4 Likes

Thank you for this one Ben.