Is it ok to use terminate as a callback to remove the file on which the gen_server works?

Just to move the discussion into another topic @sasajuric https://elixirforum.com/t/using-registry-as-a-counter-or-better-alternatives/14893/22. For instance in this case, is it ok to use terminate as a callback to remove the file on which the gen_server works?

defmodule CsvParser.Server do
  use GenServer, restart: :temporary
  alias NimbleCSV.RFC4180, as: CSV

  use MSLRecord_Constants

  def start_link({path, filename, admin_id} = file_tuple) do
    case File.exists?(path) do
      true ->
        case GenServer.start_link(__MODULE__, file_tuple, name: CsvParser.ref(filename)) do
          {:ok, pid} ->
            send(pid, :start)
            {:ok, pid}
          _ ->
            {:error, "Error Starting CsvParser for #{inspect file_tuple}"}
        end
      _ ->
        {:error, "The file - #{inspect filename} - with path - #{inspect path} - doesn't appear to exist"}
    end
  end

  def init({path, filename, admin_id}) do
    {:ok, {path, filename, {0, 0, 0}, [], nil, admin_id, []}}
  end

  def terminate(reason, state) do
    IO.puts("Terminating Parser with final state: #{inspect state}")
    IO.puts("Reason: #{inspect reason}")
    File.rm_rf(elem(state, 0))
  end

  def handle_info(:start, {path, filename, {0, 0, 0}, [], nil, admin_id, errors} = state) do

    stream = File.stream!(path)

    headers =
      stream
      |> CSV.parse_stream(headers: false)
      |> Enum.take(1)
      |> List.first
      |> Enum.map(&(String.trim(&1)))
    
    send(self(), :process_file)
    {:noreply, {path, filename, {0, 0, 0}, headers, stream, admin_id, errors}}
  end

  def handle_info(:start, state) do
    send(self(), :process_file)
    {:noreply, state}
  end

  def handle_info(:increment, {path, filename, {n, p, f}, headers, stream, admin_id, errors}) do
    {:noreply, {path, filename, {n+1, p+1, f}, headers, stream, admin_id, errors}}
  end

  def handle_info({:error_parsing, new_errors}, {path, filename, {n, p, f}, headers, stream, admin_id, errors}) do
    clean_errors = Enum.reduce(new_errors, %{}, fn({k, {error, _}}, acc) -> Map.put(acc, k, error) end)
    {:noreply, {path, filename, {n+1, p, f+1}, headers, stream, admin_id, [ [clean_errors, n+1] | errors ]}}
  end

  def handle_info(:stop, {path, filename, {n, p, f}, headers, stream, admin_id, errors} = state) do
    AdminApiWeb.AdminChannel.broadcast_to_admin(admin_id, "finished_csv_processing",
      %{
        filename: filename,
        errors: errors,
        total: n,
        processed: p,
        failed: f
      }
    )
    {:stop, :normal, state}
  end

  def handle_info(:process_file, {path, filename, {0, 0, 0}, headers, stream, admin_id, errors}) do
    AdminApiWeb.AdminChannel.broadcast_to_admin(admin_id, "started_csv_processing",
      %{
        filename: filename,
        headers: headers
      }
    )
    stream
    |> CSV.parse_stream
    |> Stream.map(fn list ->
      List.zip([headers, list])
      |> Map.new
      |> Map.take(@columns_home_property)
      |> Enum.reduce(%{}, fn({k, v} = pair, acc)->
        Map.put(acc, @columns_map_home_property[k], MSLRecord.convert_field(@columns_map_home_property[k], v))
      end)
      |> MSLRecord.save
      |> Db_Msl.Repo.insert(on_conflict: :replace_all, conflict_target: [:id])
    end)
    |> Enum.each(fn
      ({:error, %Ecto.Changeset{errors: errors}}) ->
        send(self(), {:error_parsing, errors})
      (_) ->
        send(self(), :increment)
    end)

    File.close(stream)
    send(self(), :stop)
    {:noreply, {path, filename, {0, 0, 0}, headers, stream, admin_id, errors}}
  end
end

Per what I’ve said in another thread, the problem I have here is that it’s hard to reason about the crash modes of this process, and hence it’s hard to be sure that terminate is always invoked. You might be able to reach such conclusion today, but whenever you change the code you need to keep terminate caveats in mind.

Just by glancing at this code I see that this process is not trapping exits. Hence, if the server is ordered by its supervisor to terminate (e.g. if System.stop/1 is invoked, or if some sibling of the parent supervisor is restarted), terminate is not invoked.

If terminate is not invoked, there’s potential to leak files. Perhaps that’s not a big problem, but I personally don’t like it. In some unlikely, but still possible scenario of frequent crashes and restarts, you might end up exhausting the entire disk space and cause a total denial of service.

Therefore, I would personally use a separate process which is responsible for cleaning up files when the owner process terminates.

Keep in mind that this is by itself not enough, as the BEAM OS process might get brutally killed. So you also need some startup logic which removes all the existing files. The supervision subtree could look like this:

        parser subtree (rest_for_one)
          /        \
    cleaner   parser supervisor (dynamic)
                          / \
                  parser1 ... parserN

Where the cleaner process deletes all the existing files during its init. That same process would be used to monitor alive parsers and cleanup files after they terminate (regardless of how they terminate).

This is somewhat more complex to write, but writing is a one-off convenience, so I don’t mind sacrificing it to reduce the chance of something going terribly wrong.

This approach reduces the chances of leaving dangling files. The only case I can come up with is if BEAM process is brutally terminated. In that case you’ll have a couple of leftover files, but as soon as you boot the system, they will be terminated.

I believe that, compared to terminate, this approach offers much stronger guarantees, and that it is more resilient with respect to possible future changes in the server code. Hence, this is how I would implement it.

5 Likes

Thank you again for taking the time to further explain. I agree with you and indeed there will be a regular cleanup at app start for any file, since they aren’t supposed to exist at start.

Neither do I, hence why I’m trying to understanding this better.

So if I’m using a process to monitor the parsers genservers, I would need to have some way of keeping track of the association between their names/pids and the files they’re working on (since I will only get either the {:exit/:down…} message without any way of retrieving the state from them. Perhaps trapping exits on the parser itself is a better option in this case?

(in this particular parser you can upload multiple files simultaneously, they’re copied from temp with randomised filenames once they’re uploaded, so the http response can be sent)

Because of the bunch of caveats mentioned here and in the previous thread, I personally wouldn’t choose terminate in this case. Using another process for cleanup requires more one-off effort during the initial writing process, but such code should be easier to reason about and more resilient with respect to future changes, so that’s the approach I’d choose.

You already gave me a lot of insights, I totally understand if it gets boring to explain it further so I think I’ll do some tests and see what I come up with.

I think I’m misunderstanding something basic, I thought that if I would trap exits it wouldn’t be the terminate callback that was going to be called but instead the handle_info for the exit signal and that in this case it would be more or less reliable…

Thanks again @sasajuric

Trapping exits also plays a role in whether terminate is invoked. A supervisor takes a child down by sending it a shutdown exit signal. If the child is not trapping exits, terminate/1 will not be invoked.

2 Likes