stevensonmt

stevensonmt

Stream MUCH slower than enum processing modest CSV file

I’m working through the book Concurrent Data Processing in Elixir and the example in chapter four involves reading a smallish CSV file and filtering out some rows. The naive implementation uses File.read! and NimbleCSV.parse_string and does the filtering with the Enum module functions.

A slightly better example uses File.stream! and NimbleCSV.parse_stream and does the filtering with the Stream module functions before finally collecting to a list with Enum.to_list.

The first version runs in about 4 seconds on my machine. The second takes over two minutes! In the book they reported a 5-fold improvement from ~3 seconds to 600ms. What could explain the discrepancy?

Original:

defmodule Airports do
  alias NimbleCSV.RFC4180, as: CSV

  def airports_csv() do
    Application.app_dir(:airports, "/priv/airports.csv")
  end

  def open_airports() do
    airports_csv()
    |> File.read!()
    |> CSV.parse_string()
    |> Enum.map(fn row ->
      %{
        id: Enum.at(row, 0),
        type: Enum.at(row, 2),
        name: Enum.at(row, 3),
        country: Enum.at(row, 8)
      }
    end)
    |> Enum.reject(&(&1.type == "closed"))
  end
end

Stream version:

  def open_airports() do
    airports_csv()
    |> File.stream!()
    |> CSV.parse_stream()
    |> Stream.map(fn row ->
      %{
        id: :binary.copy(Enum.at(row, 0)),
        type: :binary.copy(Enum.at(row, 2)),
        name: :binary.copy(Enum.at(row, 3)),
        country: :binary.copy(Enum.at(row, 8))
      }
    end)
    |> Stream.reject(&(&1.type == "closed"))
    |> Enum.to_list()
  end

Even weirder, the book then introduces the Flow library to replace the Stream functions and reports a two fold slow down, but I get a 60 fold speed up!

  def open_airports() do
    airports_csv()
    |> File.stream!()
    |> CSV.parse_stream()
    |> Flow.from_enumerable()
    |> Flow.map(fn row ->
      %{
        id: :binary.copy(Enum.at(row, 0)),
        type: :binary.copy(Enum.at(row, 2)),
        name: :binary.copy(Enum.at(row, 3)),
        country: :binary.copy(Enum.at(row, 8))
      }
    end)
    |> Flow.reject(&(&1.type == "closed"))
    |> Enum.to_list()
  end

Finally, fixing the presumed bottleneck to allow flow to take advantage of concurrency by moving the CSV parsing into the Flow.map call is supposed to lead to significant speed up but in my case is essentially the same running time.

  def open_airports() do
    airports_csv()
    |> File.stream!()
    |> Flow.from_enumerable()
    |> Flow.map(fn row ->
      [row] = CSV.parse_string(row, skip_headers: false)

      %{
        id: Enum.at(row, 0),
        type: Enum.at(row, 2),
        name: Enum.at(row, 3),
        country: Enum.at(row, 8)
      }
    end)
    |> Flow.reject(&(&1.type == "closed"))
    |> Enum.to_list()
  end

I really am at a loss to explain why I’m having such different results from the book.

Most Liked

LostKobrakai

LostKobrakai

Is this the same code as well as the same data?

Generally moving from enum to streams trades speed for less memory need. Things are not meant to become faster (but rather become slower) in favor of not needing to load everything into memory at once and being able to process data, which doesn’t fit into memory comfortably or even at all.

If your data fits into memory Enum functions are expected to be faster.

Similarly moving to a multi process architecture adds overhead in process communication (there’s no reduction) in favor of being able to use multiple cpu cores at the same time.

If your data doesn’t even max out a single core for processing it’ll be faster in a single process than with multiple ones.

Especially the last one is a bit simplified as there’s also things like io involved, but I hope you get the idea.

LostKobrakai

LostKobrakai

While this is a nice hint for the next person, testing performance in iex is not a great idea anyways. There are lots of behaviours different in iex to a production environment that it cannot reasonably serve as a proxy for how performance will look elsewhere.

stevensonmt

stevensonmt

It’s real world data, so the file is probably not exactly the same as the one used by the authors, but it is from the same source and should be similar in size and structure. The code I’m using is the same as from the book, yes.

Thanks for explaining Enum vs Stream broadly. As I understand it both inherently and from your explanation, the performance results I got probably make some sense but I still don’t understand why my results diverge so drastically from the authors’ results.


Just to reinforce @LostKobrakai’s points above, the chapter concludes with the same basic advice about when using something like Flow for concurrency might be a bad idea:

While processes are lightweight, they are still an overhead when dealing with
small tasks, which can be processed faster synchronously.

Where Next?

Popular in Questions Top

vertexbuffer
Hello, can anybody help here..? I have a list of players and I what to delete an element, but every for loop the list is reverting to ori...
New
senggen
Erlang/OTP 25 [erts-13.2.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] 15:22:35.803 [error] gen_event {lager_file_backend...
New
Kurisu
For example for a current url like http://localhost:4000/cosmetic/products?_utf8=✓&query=perfume&page=2, I would like to get: ...
New
chrisalley
ExUnit now has describe blocks which is a welcome addition coming from RSpec. In the docs, it states that nested hierarchies of describe ...
New
electic
Hi, I am new to Elixir. I am trying to use the DateTime component to insert a date into MySQL however the there seems to be no way to fo...
New
jononomo
I am trying to figure out how Mix knows whether the environment is test, dev, or prod – where is this set? Thanks.
New
LegitStack
I’m trying to make a websocket server in Phoenix or raw Elixir. I heard about gun, I think I could use cowboy, but since I’m not that sma...
New
baxterw3b
Hi guys, i’m new in the Elixir world, and i have to say, that i love it! i’m having some problem to understand anonymous functions with ...
New
bsollish-terakeet
Credo is smart enough to check for (something like) this: assert length(the_list) == 0 with this response: Checking if an enum is empt...
New
vonH
In asking this question I am more interested about the expressiveness of the language itself and less concerned about the availability of...
New

Other popular topics Top

malloryerik
Hi, this is for people who, like me, have had some friction using .html.heex templates in VSCode. The solution seems to be, in a hyphena...
New
9mm
I am constructing a JSON object (map) and I need to conditionally set a field. I’m trying to write proper elixir-way code… and I’m at a l...
New
lastday4you
I wanted to check elixir version in phoenix because i found that my elixir is 1.5 but when i use Enum.chunk_by it said the function is un...
New
chrismccord
As promised, the first release candidate of Phoenix 1.3.0 is out! This release focuses on code generators with improved project structure...
New
Patoshizzle
After calling mix ecto.create I get this error: 17:00:32.162 [error] GenServer #PID<0.412.0> terminating ** (Postgrex.Error) FATAL...
New
Fl4m3Ph03n1x
About me? ( if you have nothing better to do than reading about some random guy in the internet :stuck_out_tongue: ) Hello all, this is ...
New
fireproofsocks
Forgive me if this is obvious, but how does one delete a database record WITHOUT selecting it first? Ecto.Repo — Ecto v3.14.0 has exampl...
New
josevalim
Hi everyone, One of the features added to Elixir early on to help integration with Erlang code was the idea of overridable function defi...
New
RisingFromAshes
I’ve read in another post that it may be possible with a router helper - but I couldn’t find an appropriate one, and tbh, I’m still just ...
New
klo
Got a question about when to concat vs. prepending items to list then reversing to achieve appending. So i know lists boil down to [1 | ...
New

We're in Beta

About us Mission Statement