How to calculate the total group count and parse the group in the same Flow.run?

I want to calculate the group total count and parse the group in the same Flow.run .
Currently it looks inefficient as calling flow twice.

def parse(config) do
  mapping = config["mapping"]

  flow = config
  |> config_parser_and_return_csv_data
  |> String.split("\n")
  |> CSVParser.parse_stream
  |> Stream.drop(config["params"]["rows"] |> skip_rows)
  |> Stream.filter(&data_row?(&1))
  |> Flow.from_enumerable()
  |> Flow.filter(&isValidRow?(&1, mapping))
  |> Flow.partition(key: &increment_id(&1, mapping))
  |> Flow.group_by(&Enum.at(&1, increment_id_position(mapping)))

  # require IEx
  # IEx.pry
  config = config |> Map.put("order_count", flow |> Enum.count)

  flow
  |> Flow.map(&parse_data_columns(&1, config))
  |> Flow.run
end
1 Like

Hi,

Flow.reduce/3 would allow you to both accumulate the total and build the list of parsed data.

As a side note, String.split/2 returns a list. You may want to use String.splitter/2, which returns a stream.

2 Likes

Thanks @pma

1 Like

Still now luck, reduce cannot be used after group_by/reduce @NobbZ any suggestion on above, question

1 Like

Not really. Also I’m not quite sure why you summon me explicitely. Its been a while that I worked with flow, and I do currently have no project at hands which I could use to play around.

1 Like

You should remove the group_by and replace by your own reduce that does the counting and the grouping.

3 Likes

Thanks Guys for the help. Sorry Noobz, for summoning you :grin:

1 Like