Beam.smp consumes all cpu and crashes

Hi,

I’m having problems with one of my servers. I have simple app that reads files from ftp and stores in database.
After maintenance restart application is crashing after opening FTP connection.CPU rises til process is killed by oom.

> Slogan: eheap_alloc: Cannot allocate 255489152 bytes of memory (of type "heap").
> System version: Erlang/OTP 23 [erts-11.0] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe]
> Compiled: Tue Aug 4 13:49:01 2020
> Taints: crypto
> Atoms: 29346
> Calling Thread: scheduler:0
> =scheduler:1
> Scheduler Sleep Info Flags: SLEEPING | TSE_SLEEPING | WAITING
> Scheduler Sleep Info Aux Work: THR_PRGR_LATER_OP
> Current Port:
> Run Queue Max Length: 0
> Run Queue High Length: 0
> Run Queue Normal Length: 0
> Run Queue Low Length: 0
> Run Queue Port Length: 0
> Run Queue Flags: OUT_OF_WORK | HALFTIME_OUT_OF_WORK
> Current Process:
> =scheduler:2
> Scheduler Sleep Info Flags: SLEEPING | POLL_SLEEPING | WAITING
> Scheduler Sleep Info Aux Work:
> Current Port:
> Run Queue Max Length: 0
> Run Queue High Length: 0
> Run Queue Normal Length: 0
> Run Queue Low Length: 0
> Run Queue Port Length: 0
> Run Queue Flags: OUT_OF_WORK | INACTIVE
> Current Process:
> =dirty_cpu_scheduler:3
> Scheduler Sleep Info Flags: SLEEPING | TSE_SLEEPING | WAITING
> Scheduler Sleep Info Aux Work:
> Current Process:
> =dirty_cpu_scheduler:4
> Scheduler Sleep Info Flags:
> Scheduler Sleep Info Aux Work:
> Current Process: <0.1121.0>
> Current Process State: Garbing
> Current Process Internal State: ACT_PRIO_NORMAL | USR_PRIO_NORMAL | PRQ_PRIO_NORMAL | ACTIVE | GC | DIRTY_ACTIVE_SYS | DIRTY_RUNNING_SYS
> Current Process Program counter: 0x00007f5941057438 ('Elixir.Ecto.Adapters.Postgres.TypeModule':'Elixir.Ecto.Adapters.Postgres.Timestamp'/6 + 240)
> Current Process Limited Stack Trace:
> 0x00007f594144f290:SReturn addr 0x436E35F8 ('Elixir.Postgrex.Protocol':rows_recv/4 + 136)
> 0x00007f594144f2a8:SReturn addr 0x436D8740 ('Elixir.Postgrex.Protocol':execute_recv/5 + 232)
> 0x00007f594144f2d8:SReturn addr 0x43688A80 ('Elixir.DBConnection':handle/4 + 400)
> 0x00007f594144f320:SReturn addr 0x43687928 ('Elixir.DBConnection':describe_run/5 + 360)
> 0x00007f594144f360:SReturn addr 0x4368DA80 ('Elixir.DBConnection':'-run_meter/5-fun-0-'/4 + 136)
> 0x00007f594144f388:SReturn addr 0x4368B888 ('Elixir.DBConnection':run_begin/3 + 112)
> 0x00007f594144f3b8:SReturn addr 0x4368A628 ('Elixir.DBConnection':prepare_execute/4 + 144)
> 0x00007f594144f3d8:SReturn addr 0x4367E5B0 ('Elixir.Ecto.Adapters.Postgres.Connection':prepare_execute/5 + 192)
> 0x00007f594144f3e0:SReturn addr 0x80320E18 ('Elixir.Ecto.Adapters.SQL':sql_call/6 + 840)
> 0x00007f594144f420:SReturn addr 0x8031ED38 ('Elixir.Ecto.Adapters.SQL':execute_and_cache/7 + 408)
> 0x00007f594144f460:SReturn addr 0x410E3FC8 ('Elixir.Ecto.Repo.Queryable':execute/5 + 864)
> 0x00007f594144f4c8:SReturn addr 0x410E37E0 ('Elixir.Ecto.Repo.Queryable':all/4 + 240)
> 0x00007f594144f4d0:SReturn addr 0x803009E0 ('Elixir.Neptun.Fetchers.FTPFetcher':filter_listing_for_downloaded_data/1 + 336)
> 0x00007f594144f4e8:SReturn addr 0x803002F0 ('Elixir.Neptun.Fetchers.FTPFetcher':do_work/0 + 248)
> 0x00007f594144f508:SReturn addr 0x803012E0 ('Elixir.Neptun.Fetchers.FTPFetcher':handle_info/2 + 504)
> 0x00007f594144f520:SReturn addr 0x84612198 (gen_server:try_dispatch/4 + 152)
> 0x00007f594144f568:SReturn addr 0x84613038 (gen_server:handle_msg/6 + 1696)
> 0x00007f594144f5a0:SReturn addr 0x8461B840 (proc_lib:init_p_do_apply/3 + 72)
> 0x00007f594144f5c0:SReturn addr 0x6A011F08 (<terminate normally="" process="">)
> =dirty_cpu_run_queue
> Run Queue Max Length: 0
> Run Queue High Length: 0
> Run Queue Normal Length: 0
> Run Queue Low Length: 0
> Run Queue Port Length: 0
> Run Queue Flags: OUT_OF_WORK | HALFTIME_OUT_OF_WORK | NONEMPTY | EXEC
> =dirty_io_scheduler:5
> Scheduler Sleep Info Flags: SLEEPING | TSE_SLEEPING | WAITING
> Scheduler Sleep Info Aux Work:
> Current Process:
> =dirty_io_scheduler:6</terminate>
> ```

elixir version: 1.7.4-otp-21 erlang/otp: 21.3 postgresql: 9.6

I tried compiling with newer versions with no result.

A appreciate your feedback. Thank you.

It’s hard to guess without a way to reproduce problem. However I think that “something” may happen at this part:

Having that in mind I would check if:

  1. Neptun.Fetchers.FTPFetcher (I guess it’s a GenServer) is not a bottleneck. I recommend this blog post The Dangers of GenServers in Elixir.

  2. Repo.all/4 may require huge amount of resources especially if you are storing big files. You may try for example a Repo.stream/2 callback as it would allow to process only one row at a time.

If that would not work it would be easier for everyone when we would have a way to reproduce your problem. Smallest possible app (without your project sensitive data) would be a huge help.

Thank you for your reply. Files are from 1kb-1mb in size, but mostly 1kb. I tried with just one file on ftp but it failed just the same.

defmodule Neptun.Fetchers.FTPFetcher do
  use GenServer
  alias Neptun.Repo



  def start_link(name \\ FTPFetcher) do
    # Instead of passing an atom to the `name` option, we send
    # a tuple. Here we extract this tuple to a private method
    # called `via_tuple` that can be reused for every function
    GenServer.start_link(__MODULE__, %{name: name}, name: via_tuple(name))
  end

  def init(state) do
    allow_work()
    {:ok, state}
  end

  defp via_tuple(device) do
    # And the tuple always follow the same format:
    # {:via, module_name, term}
    {:via, :gproc, {:n, :l, {:device, device}}}
  end

  def handle_info(:work, %{name: name}=state) do
    IO.puts "Getting data for #{name}..."

    {:ok, log} = Neptun.Logs.ftp_fetching_started
    :ok = do_work()
    {:ok, _log} = Neptun.Logs.job_finished(log)
    allow_work()

    {:noreply, state, :hibernate}
  end

  defp filter_listing_for_valid_stations(listing) do
    IO.puts "Filtering listing of length #{length(listing)} for unknown stations"
    station_uids = Neptun.Stations.list_stations |> Enum.map(fn station -> station.uid end)
    IO.puts "Existing stations: #{length(station_uids)}"

    filtered_listing = listing
    |> Enum.filter(fn l ->
      case Neptun.Utils.parse_ftp_filename(l) do
        [station_uid, _datetime] ->
          Enum.member?(station_uids, station_uid)
        nil ->
          IO.puts "Skipping line #{l}"
          false
      end
    end)
    IO.puts "Filtered listing length #{length(filtered_listing)}"
    filtered_listing
  end

  def filter_listing_for_downloaded_data(listing) do
    IO.puts "Filtering listing of length #{length(listing)} for downloaded data"
    data_filenames = Neptun.Stations.list_data |> Enum.map(fn data -> data.filename end)
    IO.puts "Existing data items: #{length(data_filenames)}"

    filtered_listing = MapSet.difference(MapSet.new(listing), MapSet.new(data_filenames)) |> MapSet.to_list

    IO.puts "Filtered listing length #{length(filtered_listing)}"
    filtered_listing
  end

  defp do_work() do
    listing =  Neptun.FTP.ls
    listing
    |> filter_listing_for_valid_stations()
    |> filter_listing_for_downloaded_data()
    |> Enum.each(fn l ->
      try do
        Repo.transaction(fn ->
          case Neptun.Stations.get_data_by_filename(l) do
            nil ->
              store_data(l)
            _ ->
              nil
              #IO.puts "#{l} already exists, skipping"
          end
        end)
      rescue
        e ->
          store_data_error(l, e)
      end
    end)
    Neptun.Processors.DataProcessor.process_data()
    :ok
  end

  defp store_data_error(filename, e) do
    IO.puts "Error while saving #{filename}"
    [station_uid, _datetime] = Neptun.Utils.parse_ftp_filename(filename)
    station = Neptun.Stations.get_station_by_uid(station_uid)

    params = %{
      "station_id" => station.id,
      "content" => "",
      "processed" => :false,
      "filename" => filename,
      "status" => Neptun.Stations.DataStatus.error,
      "message" => Exception.message(e)
    }

    {:ok, _data} = Neptun.Stations.create_data(params)
  end

  defp store_data(l) do
    IO.puts "Data #{l} not found, getting from server"
    {:ok, file} = Neptun.FTP.get_file(l)
    IO.puts "Saving #{l}"
    status = if file == "" do
      IO.puts "File #{l} is empty"
      Neptun.Stations.DataStatus.empty
    else
      Neptun.Stations.DataStatus.pending
    end

    [station_uid, datetime] = Neptun.Utils.parse_ftp_filename(l)

    case Neptun.Stations.get_station_by_uid(station_uid) do
      nil ->
        IO.puts "Station with UID #{station_uid} does not exist"
        nil
      station ->
        IO.puts "Station with UID #{station_uid} found"
        dt =
          Neptun.Utils.parse_csv_datetime(datetime)
          |> Neptun.Utils.set_timezone(station.timezone)

        params = %{
          "station_id" => station.id,
          "content" => to_string(file),
          "processed" => :false,
          "filename" => l,
          "status" => status,
        }

        {:ok, _data} = Neptun.Stations.create_data(params)

        params = %{
          "last_seen" => dt
        }

        IO.puts "PARAMS for station"
        IO.inspect params

        {:ok, _station} = Neptun.Stations.update_station(station, params)
    end
  end

  defp allow_work() do
    timeout = Application.get_env(:neptun, :ftp_fetcher_timeout)
    Process.send_after(self(), :work, timeout)
  end
end

Part of the code that handles ftp connection:

defmodule Neptun.FTP do
  @host 'xx.xx.xx.xx'
  @username 'xx'
  @password 'xxx'
  @dir '/xx'
  @sep "\r\n"

  require Logger

  def ls() do
    case authenticated_client() do
      {:ok, pid} ->
        {:ok, listing} = :ftp.ls(pid, @dir)
        listing =
          listing
          |> List.to_string
          |> parse_listing

        :ok = close_client(pid)
        {:ok, listing}
      {:error, msg} ->
        {:error, msg}
    end
  end

  def get_file(filename) do
    case authenticated_client() do
      {:ok, pid} ->
        case get_file(pid, filename) do
          {:ok, file} ->
            :ok = close_client(pid)
            {:ok, file}
          {:error, msg} ->
            {:error, msg}
        end
      {:error, msg} ->
        {:error, msg}
    end
  end
  def get_file(pid, filename) do
    result = :ftp.recv_bin(pid, @dir ++ '/' ++ String.to_charlist(filename))
    result
  end

  def get_files(filenames) do
    case authenticated_client() do
      {:ok, pid} ->
        files = filenames
        |> Enum.map(fn filename ->
          get_file(pid, filename)
        end)
        :ok = close_client(pid)
        files
      {:error, msg} ->
        msg
    end
  end

  def authenticated_client do
    with {:ok, pid} <- :ftp.open(@host),
         :ok <- :ftp.user(pid, @username, @password) do
      IO.puts "Opened FTP connection"
      IO.inspect pid
      {:ok, pid}
    else
      _ -> {:error, "Unable to authenticate"}
    end
  end

  def close_client(pid) do
    IO.puts "Closed FTP connection"
    IO.inspect pid
    :ftp.close(pid)
  end

  defp parse_listing(raw_listing) do
    raw_listing
    |> String.split(@sep)
    |> Enum.map(fn line ->
      case String.split(line) do
        [_perm, _num, _user, _group, _size, _month, _day, _time, filename] ->
          filename
        _ ->
          ""
      end
    end)
    |> Enum.filter(fn line ->
      len = line |> String.trim() |> String.length
      len > 0
    end)
  end
end

When opening ftp connection app should read filename in format 0000000401_20191110120118, open the file and write information from file to db. File contains data in this format:

  • 0000000401;0001;20191110;120000;11.20
  • 0000000401;0002;20191110;120000;7.82

For me this part looks too complicated. Firstly you could just build an Ecto.Query based on Neptun.FTP.ls return. Secondly ecto have an option for ignoring conflicts. I would recommend to read on_conflict option in Ecto.Repo.insert_all/3.

It looks like there is potential for leaking the ftp connections when errors occur in Neptun.FTP, and you’re starting an infinite loop with Neptun.Fetchers.FTPFetcher. Is there a way you can check the open connections over time?

EDIT: An easy way would be to check if the logs of Opened FTP connection and Closed FTP connection match up.

1 Like