stevensonmt
Stream MUCH slower than enum processing modest CSV file
I’m working through the book Concurrent Data Processing in Elixir and the example in chapter four involves reading a smallish CSV file and filtering out some rows. The naive implementation uses File.read! and NimbleCSV.parse_string and does the filtering with the Enum module functions.
A slightly better example uses File.stream! and NimbleCSV.parse_stream and does the filtering with the Stream module functions before finally collecting to a list with Enum.to_list.
The first version runs in about 4 seconds on my machine. The second takes over two minutes! In the book they reported a 5-fold improvement from ~3 seconds to 600ms. What could explain the discrepancy?
Original:
defmodule Airports do
alias NimbleCSV.RFC4180, as: CSV
def airports_csv() do
Application.app_dir(:airports, "/priv/airports.csv")
end
def open_airports() do
airports_csv()
|> File.read!()
|> CSV.parse_string()
|> Enum.map(fn row ->
%{
id: Enum.at(row, 0),
type: Enum.at(row, 2),
name: Enum.at(row, 3),
country: Enum.at(row, 8)
}
end)
|> Enum.reject(&(&1.type == "closed"))
end
end
Stream version:
def open_airports() do
airports_csv()
|> File.stream!()
|> CSV.parse_stream()
|> Stream.map(fn row ->
%{
id: :binary.copy(Enum.at(row, 0)),
type: :binary.copy(Enum.at(row, 2)),
name: :binary.copy(Enum.at(row, 3)),
country: :binary.copy(Enum.at(row, 8))
}
end)
|> Stream.reject(&(&1.type == "closed"))
|> Enum.to_list()
end
Even weirder, the book then introduces the Flow library to replace the Stream functions and reports a two fold slow down, but I get a 60 fold speed up!
def open_airports() do
airports_csv()
|> File.stream!()
|> CSV.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
%{
id: :binary.copy(Enum.at(row, 0)),
type: :binary.copy(Enum.at(row, 2)),
name: :binary.copy(Enum.at(row, 3)),
country: :binary.copy(Enum.at(row, 8))
}
end)
|> Flow.reject(&(&1.type == "closed"))
|> Enum.to_list()
end
Finally, fixing the presumed bottleneck to allow flow to take advantage of concurrency by moving the CSV parsing into the Flow.map call is supposed to lead to significant speed up but in my case is essentially the same running time.
def open_airports() do
airports_csv()
|> File.stream!()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
[row] = CSV.parse_string(row, skip_headers: false)
%{
id: Enum.at(row, 0),
type: Enum.at(row, 2),
name: Enum.at(row, 3),
country: Enum.at(row, 8)
}
end)
|> Flow.reject(&(&1.type == "closed"))
|> Enum.to_list()
end
I really am at a loss to explain why I’m having such different results from the book.
Most Liked
LostKobrakai
Is this the same code as well as the same data?
Generally moving from enum to streams trades speed for less memory need. Things are not meant to become faster (but rather become slower) in favor of not needing to load everything into memory at once and being able to process data, which doesn’t fit into memory comfortably or even at all.
If your data fits into memory Enum functions are expected to be faster.
Similarly moving to a multi process architecture adds overhead in process communication (there’s no reduction) in favor of being able to use multiple cpu cores at the same time.
If your data doesn’t even max out a single core for processing it’ll be faster in a single process than with multiple ones.
Especially the last one is a bit simplified as there’s also things like io involved, but I hope you get the idea.
LostKobrakai
While this is a nice hint for the next person, testing performance in iex is not a great idea anyways. There are lots of behaviours different in iex to a production environment that it cannot reasonably serve as a proxy for how performance will look elsewhere.
stevensonmt
It’s real world data, so the file is probably not exactly the same as the one used by the authors, but it is from the same source and should be similar in size and structure. The code I’m using is the same as from the book, yes.
Thanks for explaining Enum vs Stream broadly. As I understand it both inherently and from your explanation, the performance results I got probably make some sense but I still don’t understand why my results diverge so drastically from the authors’ results.
Just to reinforce @LostKobrakai’s points above, the chapter concludes with the same basic advice about when using something like Flow for concurrency might be a bad idea:
While processes are lightweight, they are still an overhead when dealing with
small tasks, which can be processed faster synchronously.
Popular in Questions
Other popular topics
Categories:
Sub Categories:
Forums
Popular Tags
- #ecto
- #liveview
- #troubleshooting
- #learning-elixir
- #deployment
- #library
- #erlang
- #testing
- #genserver
- #mix
- #absinthe
- #remote-other
- #otp
- #plug
- #how-to-question
- #macros
- #postgres
- #channels
- #elixirconf
- #exunit
- #discussion
- #javascript
- #code-sync
- #podcasts
- #onsite
- #dialyzer
- #docker
- #authentication
- #umbrella
- #full-time-contract
- #podcasts-by-brainlid
- #ecto-query
- #elixir-ls
- #phoenix_html
- #iex
- #blog-post
- #graphql
- #genstage
- #ai
- #websockets
- #supervisor
- #advent-of-code
- #elixirconf-us
- #distillery
- #processes
- #forms
- #api
- #metaprogramming
- #security
- #performance








