Just to move the discussion into another topic @sasajuric https://elixirforum.com/t/using-registry-as-a-counter-or-better-alternatives/14893/22. For instance in this case, is it ok to use terminate
as a callback to remove the file on which the gen_server works?
defmodule CsvParser.Server do
use GenServer, restart: :temporary
alias NimbleCSV.RFC4180, as: CSV
use MSLRecord_Constants
def start_link({path, filename, admin_id} = file_tuple) do
case File.exists?(path) do
true ->
case GenServer.start_link(__MODULE__, file_tuple, name: CsvParser.ref(filename)) do
{:ok, pid} ->
send(pid, :start)
{:ok, pid}
_ ->
{:error, "Error Starting CsvParser for #{inspect file_tuple}"}
end
_ ->
{:error, "The file - #{inspect filename} - with path - #{inspect path} - doesn't appear to exist"}
end
end
def init({path, filename, admin_id}) do
{:ok, {path, filename, {0, 0, 0}, [], nil, admin_id, []}}
end
def terminate(reason, state) do
IO.puts("Terminating Parser with final state: #{inspect state}")
IO.puts("Reason: #{inspect reason}")
File.rm_rf(elem(state, 0))
end
def handle_info(:start, {path, filename, {0, 0, 0}, [], nil, admin_id, errors} = state) do
stream = File.stream!(path)
headers =
stream
|> CSV.parse_stream(headers: false)
|> Enum.take(1)
|> List.first
|> Enum.map(&(String.trim(&1)))
send(self(), :process_file)
{:noreply, {path, filename, {0, 0, 0}, headers, stream, admin_id, errors}}
end
def handle_info(:start, state) do
send(self(), :process_file)
{:noreply, state}
end
def handle_info(:increment, {path, filename, {n, p, f}, headers, stream, admin_id, errors}) do
{:noreply, {path, filename, {n+1, p+1, f}, headers, stream, admin_id, errors}}
end
def handle_info({:error_parsing, new_errors}, {path, filename, {n, p, f}, headers, stream, admin_id, errors}) do
clean_errors = Enum.reduce(new_errors, %{}, fn({k, {error, _}}, acc) -> Map.put(acc, k, error) end)
{:noreply, {path, filename, {n+1, p, f+1}, headers, stream, admin_id, [ [clean_errors, n+1] | errors ]}}
end
def handle_info(:stop, {path, filename, {n, p, f}, headers, stream, admin_id, errors} = state) do
AdminApiWeb.AdminChannel.broadcast_to_admin(admin_id, "finished_csv_processing",
%{
filename: filename,
errors: errors,
total: n,
processed: p,
failed: f
}
)
{:stop, :normal, state}
end
def handle_info(:process_file, {path, filename, {0, 0, 0}, headers, stream, admin_id, errors}) do
AdminApiWeb.AdminChannel.broadcast_to_admin(admin_id, "started_csv_processing",
%{
filename: filename,
headers: headers
}
)
stream
|> CSV.parse_stream
|> Stream.map(fn list ->
List.zip([headers, list])
|> Map.new
|> Map.take(@columns_home_property)
|> Enum.reduce(%{}, fn({k, v} = pair, acc)->
Map.put(acc, @columns_map_home_property[k], MSLRecord.convert_field(@columns_map_home_property[k], v))
end)
|> MSLRecord.save
|> Db_Msl.Repo.insert(on_conflict: :replace_all, conflict_target: [:id])
end)
|> Enum.each(fn
({:error, %Ecto.Changeset{errors: errors}}) ->
send(self(), {:error_parsing, errors})
(_) ->
send(self(), :increment)
end)
File.close(stream)
send(self(), :stop)
{:noreply, {path, filename, {0, 0, 0}, headers, stream, admin_id, errors}}
end
end