A Phoenix app I am working on is experiencing binaries related memory leak during Quantum triggered processing. This is the gist of it:
- a long running process is started with the app, given it does the work using a pool of workers
- Quantum triggers the processing via
GenServer.cast(:start_processing)
- workers fetch data from external service, via JSON API
- then parse data and store result in DB
The result is pretty much in line with findings in Memory leak with quantum and processing binary files. I tried the strings: :copy
idea, but it did not work. Also tried hibernating related process (including poolboy workers), as suggested in Memory leaking with long running processes - #9 by dom, but binaries are still leaking.
Things tried without result:
- pass
strings: :copy
to Jason, viaplug(Tesla.Middleware.JSON, engine_opts: [strings: :copy])
(we use Tesla for the HTTP client) - do
:hibernate
in related processes (and workers), although this was added only to finalhandle_*
callbacks, e.g. only those callbacks where we actually know the work is completed (not all)
Then I did some investigation to see which processes were actually hanging on to binaries after all this, and found them all to be telemetry related. Here’s the findings:
processes_with_large_binaries =
Process.list()
|> Enum.map(fn pid -> {pid, Process.info(pid, :binary)} end)
|> Enum.reject(fn {pid, binaries} -> match?({:binary, []}, binaries) end)
|> Enum.map(fn {pid, {:binary, binaries}} -> {pid, Enum.map(binaries, fn {_, size, _} -> size end)
|> Enum.sum()} end)
|> Enum.sort_by(fn {_, s} -> s end, &>=/2)
binary_largest = pids_with_details |> Enum.sort_by(& &1.binaries_size, :desc) |> Enum.take(10)
All seem to be telemetry related:
Enum.map(binary_largest, & &1.process_info[:dictionary])
[
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
[
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
]
]
A sample:
pid_with_details = Enum.random(binary_largest)
%{
binaries_size: 977410683,
memory: 196896,
pid: #PID<0.7970.48>,
process_info: [
current_function: {:gen_statem, :loop_receive, 3},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 0,
links: [#PID<0.5412.0>],
dictionary: [
"$initial_call": {:h2_stream, :init, 1},
"$ancestors": [#PID<0.5412.0>, #PID<0.4567.0>, #PID<0.4550.0>,
:otel_batch_processor, :opentelemetry_sup, #PID<0.4474.0>]
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4473.0>,
total_heap_size: 24503,
heap_size: 6772,
stack_size: 11,
reductions: 93231,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 65535,
minor_gcs: 10
],
suspending: []
]
}
Before garbage collection:
Process.info(pid_with_details.pid, :binary)
{:binary,
[
{139802410680360, 6147237, 160},
{139802410680360, 6147237, 160},
{139802410680360, 6147237, 160},
{139802410680360, 6147237, 160},
{139802410680360, 6147237, 160},
{139802410680360, 6147237, ...},
{139802410680360, ...},
{...},
...
]}
After garbage collecting:
:erlang.garbage_collect(pid_with_details.pid)
true
Process.info(pid_with_details.pid, :binary)
{:binary, []}
So even though the processes responsible for actual work don’t hold on to those binaries (at least I could not detect this), telemetry processes do hold on for some reason.
I am thinking of trying:
- doing the
:erlang.garbage_collect()
in related processes and workers; but not sure if it makes sense to garbage collect them since binaries are held by telemetry processes - do a periodic scan for processes with large binaries, like above, and garbage collect them, maybe even trigger that from actual processes that do the work once they know they are done
- make use of dynamic supervisor; but not sure how to build such a tree and still use pool of workers and likely this would not help given binaries are held in telemetry processes
But, given strings: :copy
was used for Jason and those worker processes don’t seem to hold on to binaries, don’t think any solution to improve workers related code will help. And I would like to avoid garbage collection, seems like a sledge hammer solution to the problem.
Thoughts/ideas welcome!
Versions of libs:
- telemetry 1.1.0
- telemetry_metrics 0.6.0
- telemetry_poller 0.5.1
- opentelemetry 1.0.5
- opentelemetry_api 1.0.3
- opentelemetry_exporter 1.0.4
- opentelemetry_telemetry 1.0.0
- phoenix 1.5.8
- jason 1.3.0