Question about Flow.partition with multiple input streams

Hello,

I’ve been figuring out a workflow with Flow to process a large set of log files and generate some stats from them. Everything has been working great when I start between 1 and 4 File stream. Data is partitioned as I expect. However, when I try to use 5 or more, the partition function no longer seems to do anything and I end up with duplicated state.

To demonstrate the behavior I’m seeing:

   streams = for file <- File.ls!(@log_dir) do
      File.stream!("#{@log_dir}/#{file}", read_ahead: 100_000)
    end

    streams
    |> Enum.take(5) # less than 5 and it works
    |> Flow.from_enumerables()
    |> Flow.map(&LogEntry.from_log(&1))
    |> Flow.partition(key: fn(event) -> 
      IO.inspect(event)
      IO.inspect("Calculating hash: #{event.id}")
      event.id
    end)
    |> Flow.group_by(& &1.id)
    |> Flow.map(fn {key, log_entries} -> {key, LogStats.from_log_entries(log_entries)} end)
    |> Enum.to_list()
    |> Enum.sort_by(fn {_key, stats} -> stats.total_duration end, &>=/2)
    |> Enum.map(fn {key, log_stats} ->
      "#{key}: Avg: #{log_stats.average_duration} Total Calls: #{log_stats.total_calls} Total Duration: #{log_stats.total_duration}"
    end)

The IO.inspect's in the partition key function never print in the case with 5 or more input streams so it seems that key fn is ignored in this case. I have also tried just passing {:key, :id} and implementing my own hash function and passing that, with always the same result.

The section of the Flow docs (https://hexdocs.pm/flow/Flow.html) titled “Avoid Single Sources” makes it sound like having more sources than cores should be handled automatically, but it seems I’m missing something here.

Any ideas?

Thanks :slight_smile:

4 Likes

I’ve spent some more time trying to come up with a simpler case that reproduces this issue. I haven’t quite pulled it off, but found more interesting behavior that I believe is related. This example is based off the Flow doc’s word counting example.

First, with 4 input streams, everything works as expected:

    words = ["a", "b", "c", "d"]
    streams = for _ <- 1..4 do 
      Stream.map(1..100000, fn(_i) -> Enum.random(words) end)
    end

    streams
    |> Flow.from_enumerables()
    |> Flow.flat_map(&String.split(&1, " "))
    |> Flow.partition(key: fn(event) ->
         IO.inspect("hash on #{event}")
         event
       end)
    |> Flow.reduce(fn -> %{} end, fn word, acc ->
      Map.update(acc, word, 1, & &1 + 1)
    end)
    |> Enum.to_list()

I get a whole bunch of output from the IO.inspect in the partition key function and the correct answer at the end with no duplicate state. The final output in this case is: [{"a", 100257}, {"b", 100267}, {"c", 99413}, {"d", 100063}]

If I increase it to 5 input streams, however, nothing is every printed out from the IO.inspect in the partition key function. What is odd, and different from my original post here, however, is that in this case I still get the expected output: [{"a", 125673}, {"b", 124617}, {"c", 124682}, {"d", 125028}]. Since it appears the key function is never invoked, I would expect the same output that I would get if I didn’t have any partition call at all: [{"a", 24064}, {"b", 24253}, {"c", 24292}, {"d", 23891}, {"a", 25413}, {"b", 25483}, {"c", 25464}, {"d", 25140}, {"a", 24591}, {"b", 24894}, {"c", 24695}, {"d", 24820}, {"a", 25899}, {"b", 25692}, {"c", 25701}, {"d", 25708}] - which is what I see in my original post.

What appears to be happening is that once I’m hitting 5 or more input streams, the partition function stops using the key function I provide but still partitions on the event. In the example in this post, it still works if it is instead using the events themselves since that is an adequate hash. In my first post above, it would not since the event itself is not an adequate hash.

2 Likes

Thank you for the simple to reproduce example! I have pushed a fix to master, can you please give it a try? If it works for you, I can ship a new version.

6 Likes

Looking good now! Thanks so much - that was fast!

2 Likes