How to store count of processed tasks into state?


I am parsing a CSV file, and using stream chunks as:

    |> MyParser.parse_stream
    |> Stream.chunk(1, 1, [])
    |> Task.async_stream(MyApp.CsvsController, :chunk_handler_fn, [process_name, db_map, entity_name, entity_id, table_name])

I need to store a counter in the agent state, So, processed_entries will be increased in each async task process, each task will update the state as:

# each task will increment the processed_entries
processed_entries= processed_entries + 1
Agent.update(agent, fn stream_info -> %{stream: stream, status: %{total_entries: total_entries, processed_entries: processed_entries}} end)

The initial state is set like this:

{:ok, pid} = Agent.start_link( 
      fn -> %{stream: stream, status: %{total_entries: total_entries, processed_entries: 0}}  end, name: {:global, String.to_atom(process_name)})

But what I notice is that the processed_entries will not match the count of total_entries but always less
For example: if total_entries = 1000, processed_entries will reach ~ 980 (not always the same), although all entries we’re actually processed.

Is it because concurrent processes are trying to call Agent.update at the same time? thus counter will not be accurate? How can I update the state of agent to store count , when multiple concurrent tasks are updating the state for the same agent?

I hope I made it clear, if not, I will explain more.

Thank you.


You should use Agent.get_and_update/2 to increment processed_entries atomically.

Thanks, I tried that by:

new_state= %{stream: stream, status: %{total_entries: total_entries, processed_entries: processed_entries}}

fn new_state -> {got_state, new_state} end

As I have read about it here:

but I got error:

undefined function got_state/0

I see now you’re not interested in the current count at this point and just want to increment it atomically, so Agent.update/2 is the one to use (not get_and_update)

But you need to mutate the state inside the function you pass as second argument. This function takes the current state (count) and returns the updated state (incr count). So it would be something like:

Agent.update(agent, fn state ->
  put_in(state.status.processed_entries, state.status.processed_entries + 1)

This is what guarantees syncrhonization. Because the function will be executed by the Agent process, the calls will be serialized on the process mailbox.

1 Like

Thanks, that has solved the issue, as for: [quote=“pma, post:4, topic:4902”]
calls will be serialized on the process mailbox

Thanks for this hint, I have to read about process mailbox

This is how I do it:

In setup code:

Agent.start_link(fn -> 0 end , name: :ssh_count)

In ssh loop

Agent.cast(:ssh_count, fn x -> x + 1 end)

At processing end:

  ssh_count = Agent.get(:ssh_count, fn x -> x end)

There are potential drawbacks to this method ( the update must be very simple and quick relative to the action being taken or there’s a potential for the Agent process mailbox to get overloaded. )

1 Like

Thanks, but how would you store the initial value of : ssh_count ? like this:

x= 1
Agent.cast(, agent, :ssh_count, fn x -> x end)

Also, which cast implementation to use? the closest one is cast/2

:ssh_count is the registered name of the agent, not it’s value. You set the initial value of an agent in the
start_link function.

I’m confused about the cast implementation question. I am using Agent.cast/2.

1 Like