Using Flow with grouped records in CSV file and then parsing the group.
Lets say first 10 records in a csv are GROUP G1, next 20 are in GROUP G2.
Inside Flow I want to partition with GROUP ID and when the GROUP entry is complete like all 10 records are collected, then start the parser for that group.
Current code
defmodule DataProcessor.Orders.CSV.Services.Parser do
alias DataProcessor.Orders.CSV
alias NimbleCSV.RFC4180, as: CSVParser
@processor_sup CSV.orders_processor_task_sup
def parse(config) do
chunk_size = 100
config
|> config_parser_and_return_file_path
|> File.stream!(read_ahead: chunk_size)
|> CSVParser.parse_stream
|> Stream.drop(config["params"]["rows"] |> skip_rows)
|> Stream.map(&parse_data_columns(&1))
|> Enum.to_list
end
defp skip_rows(""), do: 0
defp skip_rows(rows) do
rows = rows |> String.to_integer
if rows > 1, do: rows, else: 0
end
defp config_parser_and_return_file_path(config) do
params = config["params"]
csv_sep = params["sep"] |> config_seperator
csv_delimiter = params["delimiter"] |> config_escape_char
IO.inspect "seperator #{csv_sep}, escape #{csv_delimiter}"
CSVParser
|> NimbleCSV.define(separator: csv_sep, escape: csv_delimiter)
config["file_path"]
end
defp config_seperator(""), do: "\t"
defp config_seperator("\\t"), do: "\t"
defp config_seperator(csv_sep), do: csv_sep
defp config_escape_char(""), do: "|"
defp config_escape_char(csv_delimiter), do: csv_delimiter
defp data_row?(row) do
case row do
[] -> false
_ -> true
end
end
defp parse_data_columns(row) do
{:ok, _} = Task.Supervisor.start_child(
@processor_sup,
fn -> CSV.Services.Processor.process(row) end)
end
end
Will it do it in your unspecified timeframe? I don’t know, since this does not only depend on your code, but the system you run this code on as well. You need to benchmark.
Also, where does mapping come from? Why are you using String-Keys? Atoms are more performant in most cases. But you should only use them if your keys are statically known.
Also you are using some functions which you didn’t show. I often realised in such parallelized environments, that most of the optimisations can be done in the “single-threaded” workers.
Last but not least, in your current code, it may happen (as far as I understand how Flow works) that some of the items having the same group are on different workers and therefore can’t be grouped. You need to use Flow.partition/2 with correct options to ensure that items that belong together share their habitat.