Memory leak

Please, help to find a memory leak.
I have a phoenix app, almost without GenServer and such staff (in one place I am using event_bus + exq for async parsing and upload. This feature is used by one only user.). In production server I realized that memory consumption is slowly growing (from 100 mb on app start to almost 5 gb). I some random moments memory cleans to 200 - 300 mb and start growing again.
I don’t know how to benchmark memory consumption and from which point i should start my searching.

Is it really the memory consumption of that process that grows? Or is it the binary heap that grows or something else?

:observer should be able to help you figure those things out.

If its really the process that grows this huge, then its state is probably keeping stuff around, and it might be important still or not, but only you can now, as you haven’t shown us any code.

If though the binary heap is growing, its probably that you are parsing some string input and keeping subreferences around that make the whole binary survive until the sub reference gets GC’d. Please make use of :binary.copy/1 or provide a corresponding option to the parsing tool you use.

2 Likes

I just don’t know what exactly part of code do you need.

For file parsing i use xlsxir lib. This is the code of module, where i use it:

alias MyApp.Helpers.HelperFunctions

  def extract_data!(file) do
    extract_data!(file, nil)
  end

  def extract_data!(file, upload_source) do
    file
    |> get_data_from_file_with_check()
    |> Enum.map(fn {:ok, table_id} ->
      list_of_rows = Xlsxir.get_list(table_id)
      {column_names, list_of_rows} = List.pop_at(list_of_rows, 0)

      xlsx_data =
        list_of_rows
        |> Enum.map(fn row -> reject_row_if_empty(row, column_names) end)

      xlsx_data =
        case upload_source do
          :crystal ->
            list_name = Xlsxir.get_multi_info(table_id, :name)
            update_data_by_table_name(xlsx_data, "Город/район", list_name)

          _ ->
            xlsx_data
        end

      Xlsxir.close(table_id)

      xlsx_data
    end)
    |> List.flatten()
    |> Enum.reject(&is_nil/1)
    |> Enum.uniq()
    |> Enum.map(fn map ->
      map
      |> Enum.map(fn {key, value} ->
        {key,
         cond do
           is_nil(value) ->
             value

           is_binary(value) && String.trim(value) == "" ->
             nil

           is_binary(value) ->
             value |> String.replace(~r/([\r\n])/u, "") |> String.replace("_x000D_", "")

           true ->
             value
         end}
      end)
      |> Map.new()
    end)
    |> Enum.uniq()
  end

  defp get_data_from_file_with_check(file) do
    file.path
    |> Xlsxir.multi_extract()
    |> case do
      {:error, message} -> raise ArgumentError, message: message
      results -> results
    end
  end

  defp update_data_by_table_name(xlsx_data, list_key, list_name) do
    Enum.map(xlsx_data, fn data ->
      case data do
        nil ->
          nil

        data ->
          data |> Map.put(list_key, list_name) |> Map.delete(nil) |> Map.delete("")
      end
    end)
  end

  defp reject_row_if_empty(row, column_names) do
    row_without_nil_values = Enum.reject(row, &is_nil/1)

    case Enum.any?(row_without_nil_values) do
      true ->
        row = HelperFunctions.make_lists_of_equal_size_by_default_value(column_names, row)
        column_names |> Enum.zip(row) |> Enum.into(%{})

      _ ->
        nil
    end
  end

From this module only extract_data function is called.
After i get data from file, data goes through the long algorithm, which compares all rows from xlsx file with all records in DB table by several conditions and marks some rows as “duplicate”, according to the conditions. After it all rows insert in DB table and notify event from event bus:

.....
notify_event(updated_upload, :lead_upload_photos, nil)
....

def notify_event(event_data, topic, transaction_id) do
    # Build the event structure
    event = %EventBus.Model.Event{
      id: UUID.uuid4(),
      topic: topic,
      data: event_data,
      transaction_id: transaction_id
    }

    # Emit the event
    EventBus.notify(event)
  end

Where updated_upload - one full record from DB table. Event_bus creates an exq process:

def process({:lead_upload_photos, id} = event_shadow) do
    event = EventBus.fetch_event(event_shadow)

    Exq.enqueue(Exq, "lead_upload_photos", MyApp.LeadUpload.PhotosWorker, [
      cast_event_data(event)
    ])

    # update the watcher!
    EventBus.mark_as_completed({__MODULE__, :lead_upload_photos, id})
  end

The exq process download photos from urls, and save them to the DB:


  def get_photos_for_all_estates(updated_upload) do
    Enum.each(updated_upload.upload_leads, fn upload_lead ->
      # if estate has no photos, than download photos for estate 
      if is_nil(upload_lead.estate.photos) || upload_lead.estate.photos == [] do
        photos = get_photos_from_urls(upload_lead.photos_links)

        Estates.update_estate_with_section(upload_lead.estate, %{"photos" => photos}, %{}, true)
      end
    end)
  end

Also, I have restriction: I allow to perform only one exq process at the same time. Other processes are waiting in queue.

I think the week place here is event creation. I should give only id of DB record instead of full record.

Also, I think I have one more week place in app.

def get_reserved_persent_by_deals_count(reserved_percents, deals_count) do
    get_percents(reserved_percents, deals_count)
  end

  defp get_percents(index \\ 0, reserved_percents, deals_count) do
    item = Enum.at(reserved_percents, index)

    if index > length(reserved_percents) do
      raise "There is not reserved_percents to get!"
    end

    if deals_count == 0 && item["available"] do
      {item, index}
    else
      # if index is not in index list
      deals_count =
        if deals_count > 0 && item["available"] do
          deals_count - 1
        else
          # skip step
          deals_count
        end

      get_percents(index + 1, reserved_percents, deals_count)
    end
  end

reserved_percents - it is list of maps.
Maybe this recursion works not good and I should rewrite it by Enum.reduce_while.

how did you sort this out?

Rewrite by using ‘reduce_while’.

And it was only small part of memory leak problem.


  def get_reserved_persent_by_deals_count(reserved_percents, deals_count, _employee_id) do
    {p_index, acc_deals} =
      Enum.reduce_while(reserved_percents, {0, deals_count}, fn reserved_percent,
                                                                {index, acc_deals} ->
        if acc_deals == 0 && reserved_percent["available"] do
          {:halt, {index, acc_deals}}
        else
          if acc_deals > 0 && reserved_percent["available"] do
            {:cont, {index + 1, acc_deals - 1}}
          else
            {:cont, {index + 1, acc_deals}}
          end
        end
      end)

    percent = Enum.at(reserved_percents, p_index)

    if acc_deals > 0 || is_nil(percent) do
      {:error, acc_deals}
      # raise "There is not reserved_percents to get for employee ##{employee_id}!"
    else
      {:ok, {percent, p_index}}
    end
  end
2 Likes