Flow for parsing CSV with grouped records

Using Flow with grouped records in CSV file and then parsing the group.

Lets say first 10 records in a csv are GROUP G1, next 20 are in GROUP G2.

Inside Flow I want to partition with GROUP ID and when the GROUP entry is complete like all 10 records are collected, then start the parser for that group.

Current code

defmodule DataProcessor.Orders.CSV.Services.Parser do
  
  alias DataProcessor.Orders.CSV
  alias NimbleCSV.RFC4180, as: CSVParser
  
  @processor_sup CSV.orders_processor_task_sup
  
  def parse(config) do
    chunk_size = 100

    config
    |> config_parser_and_return_file_path
    |> File.stream!(read_ahead: chunk_size)
    |> CSVParser.parse_stream
    |> Stream.drop(config["params"]["rows"] |> skip_rows)
    |> Stream.map(&parse_data_columns(&1))
    |> Enum.to_list
  end

  defp skip_rows(""), do: 0
  defp skip_rows(rows) do
    rows = rows |> String.to_integer
    if rows > 1, do: rows, else: 0
  end

  defp config_parser_and_return_file_path(config) do
    params = config["params"]
    csv_sep = params["sep"] |> config_seperator
    csv_delimiter = params["delimiter"] |> config_escape_char

    IO.inspect "seperator #{csv_sep}, escape #{csv_delimiter}"

    CSVParser
    |> NimbleCSV.define(separator: csv_sep, escape: csv_delimiter)

    config["file_path"]
  end

  defp config_seperator(""), do: "\t"
  defp config_seperator("\\t"), do: "\t"
  defp config_seperator(csv_sep), do: csv_sep

  defp config_escape_char(""), do: "|"
  defp config_escape_char(csv_delimiter), do: csv_delimiter

  defp data_row?(row) do
    case row do
      [] -> false
      _ -> true
    end
  end

  defp parse_data_columns(row) do
    {:ok, _} = Task.Supervisor.start_child(
      @processor_sup,
      fn -> CSV.Services.Processor.process(row) end)
  end
end

Tried this with flow

config
    |> config_parser_and_return_file_path
    |> File.stream!(read_ahead: chunk_size)
    |> CSVParser.parse_stream
    |> Stream.drop(config["params"]["rows"] |> skip_rows)
    |> Flow.from_enumerable()
    |> Flow.group_by(&Enum.at(&1, mapping["group_id"]["position"]))
    |> Flow.map(&parse_data_columns(&1))
    |> Flow.run

Will it be able to handle 1Million records

Yes.

Will it do it in your unspecified timeframe? I don’t know, since this does not only depend on your code, but the system you run this code on as well. You need to benchmark.

Also, where does mapping come from? Why are you using String-Keys? Atoms are more performant in most cases. But you should only use them if your keys are statically known.

Also you are using some functions which you didn’t show. I often realised in such parallelized environments, that most of the optimisations can be done in the “single-threaded” workers.

Last but not least, in your current code, it may happen (as far as I understand how Flow works) that some of the items having the same group are on different workers and therefore can’t be grouped. You need to use Flow.partition/2 with correct options to ensure that items that belong together share their habitat.

1 Like

current code is this

defmodule DataProcessor.Orders.CSV.Services.Parser do
  
  alias DataProcessor.Orders.CSV
  alias NimbleCSV.RFC4180, as: CSVParser
  
  @processor_sup CSV.orders_processor_task_sup
  
  def parse(config) do
    chunk_size = 100
    group_id = mapping["group_id"]["position"]

    config
    |> config_parser_and_return_file_path
    |> File.stream!(read_ahead: chunk_size)
    |> CSVParser.parse_stream
    |> Stream.drop(config["params"]["rows"] |> skip_rows)
    |> Flow.from_enumerable()
    |> Flow.partition(key: &Enum.at(&1, group_id))
    |> Flow.group_by(&Enum.at(&1, group_id))
    |> Flow.map(&parse_data_columns(&1))
    |> Flow.run
  end

  defp skip_rows(""), do: 0
  defp skip_rows(rows) do
    rows = rows |> String.to_integer
    if rows > 1, do: rows, else: 0
  end

  defp config_parser_and_return_file_path(config) do
    params = config["params"]
    csv_sep = params["sep"] |> config_seperator
    csv_delimiter = params["delimiter"] |> config_escape_char

    IO.inspect "seperator #{csv_sep}, escape #{csv_delimiter}"

    CSVParser
    |> NimbleCSV.define(separator: csv_sep, escape: csv_delimiter)

    config["file_path"]
  end

  defp config_seperator(""), do: "\t"
  defp config_seperator("\\t"), do: "\t"
  defp config_seperator(csv_sep), do: csv_sep

  defp config_escape_char(""), do: "|"
  defp config_escape_char(csv_delimiter), do: csv_delimiter

  defp data_row?(row) do
    case row do
      [] -> false
      _ -> true
    end
  end

  defp parse_data_columns(row) do
    {:ok, _} = Task.Supervisor.start_child(
      @processor_sup,
      fn -> CSV.Services.Processor.process(row) end)
  end
end

The config, mapping is coming from a rails app.

Added the partition with with same group_id as key

|> Flow.partition(key: &Enum.at(&1, group_id))