Flow API map/reduce

Hello,

I have simple task what I would like to accomplish with Flow API but I struggle and can not get acceptable result.

My input is 4000K files with ~1 billion records. Records are “apple” “orange” and “other stuff”.

The result of the work I need to get from input is to have ALL apples in file ‘apples.txt’ and all oranges in file ‘oranges.txt’. I have in my dispose lets say m5.24xl (96 cores) node.

What I am doing is:

def parse_r(record) do
  case String.split(record, ",") do
    [_, fruit, _] ->
      r?(fruit)
    _ ->
      ""
  end
end


# return apple, orange or empty for the rest
def r?("apple:" <> _, r), do: r
def r?("orange:" <> _, r), do: r
def r?(_, _r), do: ""

# for filter to ignore rest
def is_not_blank?(""), do: false
def is_not_blank?(string), do: true

# apple hashKey = 0, orange = 1
def get_partition("apple:" <> _ = l), do: {l, 0}
def get_partition(_ = l), do: {l, 1}

result =
      # ~4000k files containing 1 billion of records
      source_data("/home/ec2-user/data/konaElixir/#{hour}/fruits")
      # asking to process batch of 10 files
      |> Flow.from_enumerables(max_demand: 10)
      # in each file split by new line
      |> Flow.flat_map(&String.split(&1, new_line))
      # parse into 'apple', 'orange' or ""
      |> Flow.map(&parse_r/1)
      # take only 'apple' or 'orange'
      |> Flow.filter(&is_not_blank?/1)
      # parition them apples to apples, oranges to oranges and I dont know I have to use 2 stage???
      |> Flow.partition(hash: &get_partition/1, stages: 2)
      # ignore duplicates of the same fruits (lets say fruit has ID)
      |> Flow.uniq()
      # prepare for output by adding new line
      |> Flow.map(&add_new_line(&1, "\n"))
      # accumulate apples and oranges
      |> Flow.reduce(fn -> [] end, fn e, acc -> [e | acc] end)
      |> Flow.emit(:state)
      |> Flow.partition(stages: 2)
      |> Flow.map(&write_to_file(&1, output_path))
      |> Enum.to_list()

This implementation works but it is very inefficient My CPU load is only ~5%. I tried to remove stages: 2 but result is almost the same.

Could Elixir community please review and suggest the solution for this task and point me on mistakes? Thank you.

If you partition in two stages at the end, then you will only be able to use 2 cores. I believe the problem could actually be addressed without Flow. For example, you could spawn one process per file (using either Task.async or Task.async_stream), have each process count the apples and oranges on each file and write to apples.txt and oranges.txt immediately.

Also, keep in mind you don’t want to partition multiple times. Every partition means more processes which means more message passing which means more data copying.

3 Likes

Hello Valim and thank you so much for taking a look of my issue. In short I was able to achieve (at this point of project) expected results and let me share, answer suggestions and ask new questions.

Thanks for explaining the stage/core relation, indeed (as I understand now) requesting to do business by partition key which is in range of 2 values only will result that data will flow to 2 stages only hence using 2 cores, right?

I spent some time to understand the Flow and my best friend was this simple function

  def log(data, msg) do
    str_pid = inspect(self())
    IO.puts("In pid #{str_pid} from: #{msg} data: #{inspect(data)}")
  end

what I used or injected after/in all Flow steps, it revealed to me flow topology and how data running there. It also let me to get rid of useless function calls like Flow.flat_map what I had originally

Then I came up with idea that to use HW i need to have more reducers not just 2 and I ended up with this very simple flow

result =
      source_data("/home/ec2-user/data/konaElixir/#{hour}/raw_konas")
      |> Flow.from_enumerables(max_demand: 4, stages: 100)
      |> Flow.map(fn d -> parse_r(d) end)
      |> Flow.filter(&is_not_blank?/1)
      |> Flow.partition(stages: 100)
      |> Flow.uniq()
      |> Flow.reduce(fn -> [[], []] end, fn
        "arn:" <> _ = e, [a, b] -> [[e | a], b]
        e, [a, b] -> [a, [e | b]]
      end)
      |> Enum.to_list()

where key point is ‘double’ accumulator fn -> [[], []] one for ‘apples’ and one for ‘oranges’

This gave me both very good overall latency of execution and HW utilization. I was able process 970M input records from ~4000 files in 55 minutes and vend 2 output files which in total contain 465M apples and oranges.

The post flow stage is to push data to 2 files and it is serial, but latency is acceptable

    fd_1 = File.open!(dest_arns, [:append])
    fd_2 = File.open!(dest_instances, [:append])

    Stream.each(result, &stream_to_files(&1, fd_1, fd_2))
    |> Stream.run()

    File.close(fd_1)
    File.close(fd_2)

I see however a possibility to write to multiple files (# of stages) after reducers to achieve slightly better performance but for now it is OK.

I have only question about Flow.from_enumerables(max_demand: 4, stages: 100) config

My app is IO bound and I played with stages from # of cores to 2x, 3x #of cores. I see that anything which is > than # of core brings higher app latency. Why?

also I played with mac_demand and found the best config is in range 4-6. I don’t understand why such low value (just 4-6 records per mini batch) is most optimal for app latency?

Thank you for your time.