Consume a Stream with side effects without leaking memory?

Hey all. I’ve been working on a system lately that needs to ingest 10s of thousands to millions of rows from a data lake in order to do some post-processing of that data. We’ve decided to try using the Stream api for handling the ingestion of data from the data lake.

The real code I’m working with is a bit complex, but I’ve reproduced a simplified version of what the code really looks like

defmodule IngestionWorker do

  @doc """
  1. Query raw data from an external system
  2. Save it to the database
  3. Persist the state of our work process (so it knows where to start again)
  """
  def perform_ingest(cursor) do
    {total, last_row} =
    query_new_data_to_ingest(cursor)
    |> Stream.map(&process_row/1)
    |> count_elements(returning_last: true)

    # persist the total number of rows and last row seen to implement a simple cursor
    persist_ingestion_state(total, last_row.id)
  end

  # pretend that this is actually ingesting data from an external system
  # here we ignore the cursor, but imagine this cursor is used in a WHERE clause
  # to only select rows that we haven't seen yet
  defp query_new_data_to_ingest(_cursor) do
    1..1_000_000
    |> Stream.map(fn id -> %{id: id, interesting_data_we_need: "row_#{id}"} end)
  end

  defp process_row(record) do
    # the line below causes a memory leak
    RawRecord.changeset(record) |> insert!()
  end

  defp insert!(changeset) do
    changeset

    # MyCoolApp.Repo.insert!(changeset)
  end

  defp count_elements(stream, opts) do
    result =
      stream
      |> Stream.transform(0, fn value, acc -> {[{acc + 1, value}], acc + 1} end)
      |> Stream.take(-1)
      |> Enum.to_list()

    {count, last} =
      case result do
        [] -> {0, nil}
        [{count, last}] -> {count, last}
      end

    Tracer.set_attribute(:stream_count, count)

    if Keyword.get(opts, :returning_last, false),
      do: {count, last},
      else: count
  end

  defp persist_ingestion_state(_total, _last_seen_id) do
    # implementing a cursor based system is left as an exercise for the reader
    nil
  end
end

defmodule RawRecord do
  @moduledoc """
  Persists the state of raw data received from external systems.

  This data may be incomplete, wrong, or be otherwise low in quality.

  We persist this data in its raw form to decouple ingestion from enrichment in clean domain tables

  Additionally this serves as an audit log of how data has changed over time (this is important)
  """

  use Ecto.Schema
  import Ecto.Changeset

  schema "raw_records" do
    field :interesting_data_we_need, :map

    timestamps()
  end

  def changeset(record \\ %__MODULE__{}, attrs) do
    record
    |> cast(Map.new(attrs))
    |> validate_required(~w[interesting_data_we_need]a)
  end
end

In our private function process_row/1 we want to persist each row using an Ecto schema. When we run this code, it quickly results in a growing heap. However, if we change the implementation as below, we don’t get a memory leak

  defp process_row(record) do
    RawRecord.changeset(record) |> insert!()

    # avoid a memory leak by discarding the result
    nil
  end

This works for when we don’t need to read the last value from the stream, but if we want to get the last value read from the stream, we would need to get that some other way (eg: issue another ecto query).

I have two questions.

  1. Why does consuming the stream result in increasing the heap size if we are only going to take the last value from the stream ? I’d love to understand the stdlib Stream implementation better :sweat_smile:

  2. Does anyone have any clever ideas for how we can consume the stream without causing a memory leak while still returning the last value ?

1 Like

To be fair, no, sorry.

Seeing as you are inserting stuff in DB then why can’t you just fetch the record that has the most recent value in inserted_at? :thinking: It would achieve the same without having to change a stream accumulator millions of times.