I am trying to batch insert records from CSV file, my only problem now is that I can’t have a global counter to use for storing state of insertion (number of records inserted )
Please look at logs below:
15:38:34.449 [info] CSV progress: 940 / 1000
15:38:34.449 [info] CSV progress: 960 / 1000
15:38:34.449 [info] INSERT DRIVERS NOW : ...:
15:38:34.449 [info] CSV progress: 960 / 1000
15:38:34.452 [info] CSV progress: 980 / 1000
15:38:34.452 [info] CSV progress: 980 / 1000
As you can see, last two logs are at the same time! that has resulted into wrong counter for progress.
Here is how I set state of Agent to store progress:
Agent.cast(agent, fn %{"stream" => stream, "status" => %{total_entries: total_entries, processed_entries: processed_entries}} = stream_info ->
put_in(stream_info, ["status", :processed_entries], processed_entries + just_processed_entries)
end)
It works fine, however, sometimes (like logs above) two processes attempts to write at the same time, thus I can’t know when progress is really finished.
I also tried mailbox to store state as:
Agent.update(agent, fn stream_info ->
put_in(stream_info.status.processed_entries, stream_info.status.processed_entries + just_processed_entries)
end)
but, I got the same issue…
Here is how I use streams:
stream= File.stream!(path)
|> CSVParser.parse_stream
|> Stream.chunk(chunck_n, step, [])
|> Task.async_stream(Trackware.CsvsController, :chunk_handler_fn, [process_name, db_map, entity_name, entity_id, table_name])
So, if it’s not possible to maintain global state in such case for multiple processes, how would I at least, know that all processes are done?