Hello,
I’ve been figuring out a workflow with Flow to process a large set of log files and generate some stats from them. Everything has been working great when I start between 1 and 4 File stream. Data is partitioned as I expect. However, when I try to use 5 or more, the partition function no longer seems to do anything and I end up with duplicated state.
To demonstrate the behavior I’m seeing:
streams = for file <- File.ls!(@log_dir) do
File.stream!("#{@log_dir}/#{file}", read_ahead: 100_000)
end
streams
|> Enum.take(5) # less than 5 and it works
|> Flow.from_enumerables()
|> Flow.map(&LogEntry.from_log(&1))
|> Flow.partition(key: fn(event) ->
IO.inspect(event)
IO.inspect("Calculating hash: #{event.id}")
event.id
end)
|> Flow.group_by(& &1.id)
|> Flow.map(fn {key, log_entries} -> {key, LogStats.from_log_entries(log_entries)} end)
|> Enum.to_list()
|> Enum.sort_by(fn {_key, stats} -> stats.total_duration end, &>=/2)
|> Enum.map(fn {key, log_stats} ->
"#{key}: Avg: #{log_stats.average_duration} Total Calls: #{log_stats.total_calls} Total Duration: #{log_stats.total_duration}"
end)
The IO.inspect
's in the partition key function never print in the case with 5 or more input streams so it seems that key fn is ignored in this case. I have also tried just passing {:key, :id} and implementing my own hash function and passing that, with always the same result.
The section of the Flow docs (https://hexdocs.pm/flow/Flow.html) titled “Avoid Single Sources” makes it sound like having more sources than cores should be handled automatically, but it seems I’m missing something here.
Any ideas?
Thanks