I’m working through the book Concurrent Data Processing in Elixir and the example in chapter four involves reading a smallish CSV file and filtering out some rows. The naive implementation uses File.read!
and NimbleCSV.parse_string
and does the filtering with the Enum module functions.
A slightly better example uses File.stream!
and NimbleCSV.parse_stream
and does the filtering with the Stream module functions before finally collecting to a list with Enum.to_list
.
The first version runs in about 4 seconds on my machine. The second takes over two minutes! In the book they reported a 5-fold improvement from ~3 seconds to 600ms. What could explain the discrepancy?
Original:
defmodule Airports do
alias NimbleCSV.RFC4180, as: CSV
def airports_csv() do
Application.app_dir(:airports, "/priv/airports.csv")
end
def open_airports() do
airports_csv()
|> File.read!()
|> CSV.parse_string()
|> Enum.map(fn row ->
%{
id: Enum.at(row, 0),
type: Enum.at(row, 2),
name: Enum.at(row, 3),
country: Enum.at(row, 8)
}
end)
|> Enum.reject(&(&1.type == "closed"))
end
end
Stream version:
def open_airports() do
airports_csv()
|> File.stream!()
|> CSV.parse_stream()
|> Stream.map(fn row ->
%{
id: :binary.copy(Enum.at(row, 0)),
type: :binary.copy(Enum.at(row, 2)),
name: :binary.copy(Enum.at(row, 3)),
country: :binary.copy(Enum.at(row, 8))
}
end)
|> Stream.reject(&(&1.type == "closed"))
|> Enum.to_list()
end
Even weirder, the book then introduces the Flow library to replace the Stream functions and reports a two fold slow down, but I get a 60 fold speed up!
def open_airports() do
airports_csv()
|> File.stream!()
|> CSV.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
%{
id: :binary.copy(Enum.at(row, 0)),
type: :binary.copy(Enum.at(row, 2)),
name: :binary.copy(Enum.at(row, 3)),
country: :binary.copy(Enum.at(row, 8))
}
end)
|> Flow.reject(&(&1.type == "closed"))
|> Enum.to_list()
end
Finally, fixing the presumed bottleneck to allow flow to take advantage of concurrency by moving the CSV parsing into the Flow.map call is supposed to lead to significant speed up but in my case is essentially the same running time.
def open_airports() do
airports_csv()
|> File.stream!()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
[row] = CSV.parse_string(row, skip_headers: false)
%{
id: Enum.at(row, 0),
type: Enum.at(row, 2),
name: Enum.at(row, 3),
country: Enum.at(row, 8)
}
end)
|> Flow.reject(&(&1.type == "closed"))
|> Enum.to_list()
end
I really am at a loss to explain why I’m having such different results from the book.