Yeah, it’s the “processes” memory usage that increases.
The GenStage pipeline is pretty straight forward:
- Producer loads batches of records from DB
- Consumer supervisor consumes these records, and start an
EventProcessor
process
- The event processor processes the event (parsing text, etc.), then sends a notification event to the process managing the entire pipeline. Upon receiving an
ack
message from the managing process, the event processor shuts down.
It appears the event processor is where I have some issues, process-wise, as Observer is reporting 746MB in “memory” which is just nuts. The event handling module is quite straightforward:
defmodule IngestionEngine.Pipeline.EventProcessor do
use GenServer
require Logger
@ack_timeout 6_000
def start_link(args, event), do: start_link(Keyword.put(args, :event, event))
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
@impl GenServer
def init(args) do
state =
args
|> Keyword.take([:report_to, :handler, :event])
|> Enum.into(%{})
|> Map.put(:ack_await_ref, nil)
{:ok, state, {:continue, :handle_event}}
end
@impl GenServer
def handle_continue(:handle_event, state), do: handle_event(state)
defp handle_event(%{handler: handler, event: event} = state) do
result = handler.(event)
case Map.get(state, :report_to) do
nil ->
stop(state)
report_to ->
send(report_to, {:event_processed, self(), %{event: event, result: result}})
ack_await_ref = Process.send_after(self(), :ack_not_received, @ack_timeout)
{:noreply, Map.put(state, :ack_await_ref, ack_await_ref)}
end
end
@impl GenServer
def handle_info(:retry, state) do
state =
state
|> cancel_timer()
|> handle_event()
{:noreply, state}
end
def handle_info(:ack_event_processed, state) do
stop(state)
end
def handle_info(:ack_not_received = reason, state) do
stop(state, reason)
end
def handle_info(msg, state) do
Logger.warn("Unhandled info message: #{inspect(msg)}")
{:noreply, state}
end
defp stop(state, reason \\ :normal) do
{:stop, reason, cancel_timer(state)}
end
defp cancel_timer(%{ack_await_ref: nil} = state), do: state
defp cancel_timer(%{ack_await_ref: ref} = state) when is_reference(ref) do
Process.cancel_timer(ref)
Map.put(state, :ack_await_ref, nil)
end
end
So I believe the issue is likely in the handler
function: Observer tells me the problem processes are spending a lot of time in erlang:re_run_trap
(which I assume is a regex-related function, as I haven’t found any info on it) as well as Elixir.String:do_replace
, so I’m probably doing something dumb in there… I’ll probably start by adding a send_after
in the event processor to crash and dump the current state if the process runs for too long to see if I can reproduce the issue with a given set of data.