Hey all. I’ve been working on a system lately that needs to ingest 10s of thousands to millions of rows from a data lake in order to do some post-processing of that data. We’ve decided to try using the Stream
api for handling the ingestion of data from the data lake.
The real code I’m working with is a bit complex, but I’ve reproduced a simplified version of what the code really looks like
defmodule IngestionWorker do
@doc """
1. Query raw data from an external system
2. Save it to the database
3. Persist the state of our work process (so it knows where to start again)
"""
def perform_ingest(cursor) do
{total, last_row} =
query_new_data_to_ingest(cursor)
|> Stream.map(&process_row/1)
|> count_elements(returning_last: true)
# persist the total number of rows and last row seen to implement a simple cursor
persist_ingestion_state(total, last_row.id)
end
# pretend that this is actually ingesting data from an external system
# here we ignore the cursor, but imagine this cursor is used in a WHERE clause
# to only select rows that we haven't seen yet
defp query_new_data_to_ingest(_cursor) do
1..1_000_000
|> Stream.map(fn id -> %{id: id, interesting_data_we_need: "row_#{id}"} end)
end
defp process_row(record) do
# the line below causes a memory leak
RawRecord.changeset(record) |> insert!()
end
defp insert!(changeset) do
changeset
# MyCoolApp.Repo.insert!(changeset)
end
defp count_elements(stream, opts) do
result =
stream
|> Stream.transform(0, fn value, acc -> {[{acc + 1, value}], acc + 1} end)
|> Stream.take(-1)
|> Enum.to_list()
{count, last} =
case result do
[] -> {0, nil}
[{count, last}] -> {count, last}
end
Tracer.set_attribute(:stream_count, count)
if Keyword.get(opts, :returning_last, false),
do: {count, last},
else: count
end
defp persist_ingestion_state(_total, _last_seen_id) do
# implementing a cursor based system is left as an exercise for the reader
nil
end
end
defmodule RawRecord do
@moduledoc """
Persists the state of raw data received from external systems.
This data may be incomplete, wrong, or be otherwise low in quality.
We persist this data in its raw form to decouple ingestion from enrichment in clean domain tables
Additionally this serves as an audit log of how data has changed over time (this is important)
"""
use Ecto.Schema
import Ecto.Changeset
schema "raw_records" do
field :interesting_data_we_need, :map
timestamps()
end
def changeset(record \\ %__MODULE__{}, attrs) do
record
|> cast(Map.new(attrs))
|> validate_required(~w[interesting_data_we_need]a)
end
end
In our private function process_row/1
we want to persist each row using an Ecto schema. When we run this code, it quickly results in a growing heap. However, if we change the implementation as below, we don’t get a memory leak
defp process_row(record) do
RawRecord.changeset(record) |> insert!()
# avoid a memory leak by discarding the result
nil
end
This works for when we don’t need to read the last value from the stream, but if we want to get the last value read from the stream, we would need to get that some other way (eg: issue another ecto query).
I have two questions.
-
Why does consuming the stream result in increasing the heap size if we are only going to take the last value from the stream ? I’d love to understand the stdlib
Stream
implementation better -
Does anyone have any clever ideas for how we can consume the stream without causing a memory leak while still returning the last value ?