Hello,
I am parsing a CSV file, and using stream chunks as:
stream= File.stream!(path)
|> MyParser.parse_stream
|> Stream.chunk(1, 1, [])
|> Task.async_stream(MyApp.CsvsController, :chunk_handler_fn, [process_name, db_map, entity_name, entity_id, table_name])
I need to store a counter in the agent state, So, processed_entries
will be increased in each async task process, each task will update the state as:
# each task will increment the processed_entries
processed_entries= processed_entries + 1
Agent.update(agent, fn stream_info -> %{stream: stream, status: %{total_entries: total_entries, processed_entries: processed_entries}} end)
The initial state is set like this:
{:ok, pid} = Agent.start_link(
fn -> %{stream: stream, status: %{total_entries: total_entries, processed_entries: 0}} end, name: {:global, String.to_atom(process_name)})
But what I notice is that the processed_entries
will not match the count of total_entries
but always less
For example: if total_entries
= 1000, processed_entries
will reach ~ 980 (not always the same), although all entries we’re actually processed.
Is it because concurrent processes are trying to call Agent.update
at the same time? thus counter will not be accurate? How can I update the state of agent to store count , when multiple concurrent tasks are updating the state for the same agent?
I hope I made it clear, if not, I will explain more.
Thank you.