Question about Flow, ordering and partitions

Some of us are following the Advent of Code challenge in the Slack channel. The puzzle for Day 5 was computationally expensive, so that solving it would take 50-70 seconds depending on the starting conditions input and the machine.

The basic instruction was as follows:

Part 1:

Given a starting string s = "abc",
calculate the md5 hash of "abc0", "abc1" and so on,
whenever the hex representation of the hash starts with "00000",
store the 6th hex digit of the hash,
repeat until you have collected 8 digits.

(SPOILER! Part 2 is only visible if you complete part 1)

Part 2:

Given a starting string s = "abc",
calculate the md5 hash of "abc0", "abc1" and so on,
whenever the hex representation of the hash starts with "00000",
and the 6th hex digit of the hash, i , providing 0 <= i < 8,
store the 7th hex digit of the hash, to be used at location i
repeat until you have collected digits for locations 0..7,
using only the first occurrence for each location i.

I, together with a lot of other people, have implemented solutions based on Streams:

Stream 1, 2, 3…, calculate hash, filter on the various conditions, reduce to collect the digits, print solution.

Now obviously this is an easily parallelizable problem - you can compute hashes independently. Here’s my non-parallel solution (slightly reformatted for brevity). I am omitting the get_passwd_indexed implementation, it returns {true, char, i, n} or {false, n}, where n is the current “iteration”.

  def crack_indexed(input) do
    import Day5.Crypto
    Stream.iterate(0, &(&1+1))
    |> Stream.map(fn(n) -> get_passwd_indexed(hash(input, n), n) end)
    |> Stream.filter(fn(res) -> elem(res, 0) == true end)
    |> Stream.map(fn({true, c, i, _n}) -> {i, c} end)
    |> extract_indexed()
  end

  def extract_indexed(enumerable) do
    enumerable
    |> Enum.reduce_while(%{}, fn({i, c}, acc) ->
      acc = Map.put_new(acc, i, c)
      if length(Map.keys(acc)) == 8, do: {:halt, acc}, else: {:cont, acc}
    end)
    |> Map.to_list  |> Enum.sort  |> Enum.map(fn({_i, c}) -> c end) |> Enum.join
  end

Now, I can change this to using Flow by just replacing map & filter with their Flow versions, and adding Flow.from_enumerable(). It does work, and the runtime is reduced to almost 50%, which makes sense since my machine has two physical cores. I also see all 4 schedulers pegged at 100%.

My question is, does Flow re-order the results when they are passed to Enum.reduce_while or is it just an accident that I get the correct results?

If it is an accident, and given that in my version of the code I still have access to the proper order of things, n, how can I instruct Flow to reorder results based on that n?

3 Likes