How to monitor last process completion?!

I am trying to batch insert records from CSV file, my only problem now is that I can’t have a global counter to use for storing state of insertion (number of records inserted )

Please look at logs below:

15:38:34.449 [info] CSV progress: 940 / 1000
15:38:34.449 [info] CSV progress: 960 / 1000
15:38:34.449 [info] INSERT DRIVERS NOW : ...:
15:38:34.449 [info] CSV progress: 960 / 1000
15:38:34.452 [info] CSV progress: 980 / 1000
15:38:34.452 [info] CSV progress: 980 / 1000

As you can see, last two logs are at the same time! that has resulted into wrong counter for progress.

Here is how I set state of Agent to store progress:

Agent.cast(agent, fn %{"stream" => stream, "status" => %{total_entries: total_entries, processed_entries: processed_entries}} = stream_info ->
  put_in(stream_info, ["status", :processed_entries], processed_entries + just_processed_entries)
end)

It works fine, however, sometimes (like logs above) two processes attempts to write at the same time, thus I can’t know when progress is really finished.

I also tried mailbox to store state as:

Agent.update(agent, fn stream_info ->
    put_in(stream_info.status.processed_entries, stream_info.status.processed_entries + just_processed_entries)
   end)

but, I got the same issue…

Here is how I use streams:

stream= File.stream!(path)
    |> CSVParser.parse_stream
    |> Stream.chunk(chunck_n, step, [])
    |> Task.async_stream(Trackware.CsvsController, :chunk_handler_fn, [process_name, db_map, entity_name, entity_id, table_name])

So, if it’s not possible to maintain global state in such case for multiple processes, how would I at least, know that all processes are done?

I’m probably missing something here - but looking at the example Task.async_stream produces an Enumerable - i.e. each element will be ready when it’s respective task has completed.

In the case of the example all tasks are complete when Enum.to_list produces the list - as Enum.to_list will block anytime it encounters an element who’s task hasn’t completed yet.

You don’t seem to be doing anything with the Enumerable that Task.async_stream produces.

Update:

Is it possible that in your logic the last of those entries had just_processed_entries == 0?

Using an agent from many processes at the same time is perfectly fine, it’s made for that. Try calling Agent.get_and_update and using its return value for the log.

@peerreynders no, value of just_processed_entries is always 20.

How would I use the Enumerable in my example above? do you mean that if I added a pipe to forward the > Enumerable that Task.async_stream produces, then I can be assured that the logic I put in that function will be implemented after all the Task.async_stream tasks have completed?

@dom How would I use Agent.get_and_update to update my state that look like:

%{“stream” => stream, “status” => %{total_entries: total_entries, processed_entries: processed_entries}}

as I am not sure how to get the updated state using Agent.get_and_update

Thanks for sharing knowledge

It’s covered in the doc here: https://hexdocs.pm/elixir/Agent.html#get_and_update/3

And there’s an example in the Getting Started guide: http://elixir-lang.org/getting-started/mix-otp/agent.html#other-agent-actions (the discussion about server vs client just below is very relevant too)

1 Like

See Task.async_stream/5

When streamed, each task will emit {:ok, val} upon successful completion or {:exit, val} if the caller is trapping exits. Results are emitted in the same order as the original enumerable.

So in

tuples= File.stream!(path)
    |> CSVParser.parse_stream
    |> Stream.chunk(chunck_n, step, [])
    |> Task.async_stream(Trackware.CsvsController, :chunk_handler_fn, [process_name, db_map, entity_name, entity_id, table_name])
    |> Enum.to_list()

next_step()

Enum.to_list would block execution until all the tasks have emitted their tuples - the list of which is bound to the tuples name.

So by the time you get to next_step() all the tasks have completed.

This simply cannot happen - while it is entirely possible for two timestamps to be identical, two processes cannot affect the same agent at the same instant in time. The only reasonable explanation is that there is an error in your code that you are not showing us. If two processes issue

Agent.cast(
  agent,
  fn
    %{"stream" => stream,
      "status" =>
        %{total_entries: total_entries,
          processed_entries: processed_entries
        }
    } = stream_info
  ->
    put_in(
      stream_info,
      ["status", :processed_entries],
      processed_entries + just_processed_entries
    )
  end
)

and just_processed_entries == 20 then there will be two messages (one from each process) in the actor’s mailbox with

  fn
    %{"stream" => stream,
      "status" =>
        %{total_entries: total_entries,
          processed_entries: processed_entries
        }
    } = stream_info
  ->
    put_in(
      stream_info,
      ["status", :processed_entries],
      processed_entries + 20
    )
  end

and each function contained in each message will be executed by the agent in turn, sequentially (never simultaneously). So if this is the actual point of failure that caused the log that you quoted then that output is only possible if the second message contained this instead:

  fn
    %{"stream" => stream,
      "status" =>
        %{total_entries: total_entries,
          processed_entries: processed_entries
        }
    } = stream_info
  ->
    put_in(
      stream_info,
      ["status", :processed_entries],
      processed_entries + 0 # zero, not 20
    )
  end

If you are absolutely certain that just_processed_entries == 20 then your unexpected log entry was caused by something else.

PS: all Agent.update/3 and Agent.get_and_update/3 will do is block the requesting process until the agent has run the function. Your resulting log may simply indicate that you are logging in the “wrong place” (and/or possibly the wrong time).

1 Like

That’s exactly what I needed to do, thanks.

I need to read your explanations thoughtfully as I am new to Elixir, however here is where I put the logging, hopefully why I got two logs in the same time would be explainable now:

agent= {:global, String.to_atom(process_name)}

#get agent state
stream_info= Agent.get(agent, fn stream -> stream end)
%{"stream" => stream, "status" => %{total_entries: total_entries, processed_entries: processed_entries}}= stream_info 
 
just_processed_entries= inserted_count + length(conflictedItems)
#update state of agent:
Agent.cast(agent, fn %{"stream" => stream, "status" => %{total_entries: total_entries, processed_entries: processed_entries}} = stream_info ->
  put_in(stream_info, ["status", :processed_entries], processed_entries + just_processed_entries)
end)

status= %{
  inserted_count: inserted_count,
  inserted_records: inserted_records,
  conflictedItemsCount: length(conflictedItems) ,
  conflictedItems: conflictedItems,
  processed_entries: processed_entries+ inserted_count+ length(conflictedItems),
  total_entries: total_entries
}

MyApp.Endpoint.broadcast process_name, "new_progress", status
# log the progress:
Logger.info "CSV progress: #{processed_entries + just_processed_entries} / #{total_entries}"

Your processed_entries value is still from the old Agent.get - so you don’t have the updated value.

The way your code is written you would have to use Agent.get_and_update and simply accept that your logic is blocked until the agent processes the message.

1 Like

but as I am running processes in parallel, then, the blocking mentioned above won’t cause the overall time need to implement the task to be much longer?

It shouldn’t with 50 tasks but by being shared the agent is a bottleneck. An improved design would turn the Agent into a GenServer that accepts a “status” message that includes just_processed_entries so that the GenServer can calculate processed_entries and write to the log itself (and do the broadcast).

As for the GenServer idea, the GenServer will be used to increment the state, but it can’t calculate, it just set the new state I set, how would be different than using an Agent? isn’t Agent an abstract for GenServer? sorry for much questions :blush:

Why do you think a GenServer can’t modify its own state?
Unlike an Agent, a GenServer is in complete control of its state; the Agent is at the mercy of the functions that are sent to it.

defmodule Sample do
  use GenServer

  ## Actual processing
  def handle_status(total, count) do
    new_state = total + count
    IO.puts  "Total: #{new_state}"
    new_state # i.e. return new modified state
  end


  ## Callbacks
  def init([count]), do:
    {:ok, count}
    # create state based on passed arguments

  def terminate(_reason, _state), do:
    :ok

  def handle_cast({:status, count}, state), do:
    {:noreply, handle_status(state, count)}
    #state is simply the current total

  ## public interface
  def start(), do:
    {:ok, _} = GenServer.start_link(__MODULE__, [0], name: MySample)
    # [0] is passed to init/1

  def stop(), do:
    GenServer.stop(MySample)
    # causes terminate/2 to run

  # client interface
  def submit_status(count), do:
    GenServer.cast(MySample, {:status, count})
    # message is processed by handle_cast/2

end

Sample.start
# GenServer.cast does not wait for the message to be processed (i.e. is asynchronous)
Sample.submit_status 3
Sample.submit_status 2
Sample.submit_status 1
# GenServer.stop waits for a reply (i.e. is synchronous)
Sample.stop
1 Like

Yea, I know that GenServer can modify its own state, but how to use it my case, shall I simply replace the Agent? and use it the same way I did in each process like I did with the Agent?

Yep, except instead of passing functions instead build proper functions to marshall to/from the GenServer as normal. :slight_smile:

A GenServer is always better than an Agent in my opinion. It may not be best for the task at hand, but other things are always better than an Agent in my opinion. ^.^

1 Like

The GenServer will let you get rid of the Agent.get because you no longer need processed_entries (and total_entries, possibly as a {total,processed} tuple) - because that can now be encapsulated in the GenServers state.

In the above sample rather than submitting a count submit a status:

status =
  %{
    inserted_count: inserted_count,
    inserted_records: inserted_records,
    conflictedItemsCount: conflictedItemsCount,
    conflictedItems: conflictedItems,
    just_processed_entries: just_processed_entries
  }

Sample.submit_status(status)

Note that all this information is copied to the GenServer so drop anything non-essential.

In handle_status update the processed count with just_processed_entries. Delete just_processed_entries from the status map and add processed_entries and total_entries. Broadcast that status and write to the log (replacing the IO.puts).

It’s really wonderful to get into this new way of thinking, this is my first task in Elixir that I consider very useful for my company, parsing CSV file and insert all entries into DB using Ecto, I learned a lot, and I am still learning …:slight_smile:

Thank you!

1 Like