Stream MUCH slower than enum processing modest CSV file

I’m working through the book Concurrent Data Processing in Elixir and the example in chapter four involves reading a smallish CSV file and filtering out some rows. The naive implementation uses File.read! and NimbleCSV.parse_string and does the filtering with the Enum module functions.

A slightly better example uses File.stream! and NimbleCSV.parse_stream and does the filtering with the Stream module functions before finally collecting to a list with Enum.to_list.

The first version runs in about 4 seconds on my machine. The second takes over two minutes! In the book they reported a 5-fold improvement from ~3 seconds to 600ms. What could explain the discrepancy?

Original:

defmodule Airports do
  alias NimbleCSV.RFC4180, as: CSV

  def airports_csv() do
    Application.app_dir(:airports, "/priv/airports.csv")
  end

  def open_airports() do
    airports_csv()
    |> File.read!()
    |> CSV.parse_string()
    |> Enum.map(fn row ->
      %{
        id: Enum.at(row, 0),
        type: Enum.at(row, 2),
        name: Enum.at(row, 3),
        country: Enum.at(row, 8)
      }
    end)
    |> Enum.reject(&(&1.type == "closed"))
  end
end

Stream version:

  def open_airports() do
    airports_csv()
    |> File.stream!()
    |> CSV.parse_stream()
    |> Stream.map(fn row ->
      %{
        id: :binary.copy(Enum.at(row, 0)),
        type: :binary.copy(Enum.at(row, 2)),
        name: :binary.copy(Enum.at(row, 3)),
        country: :binary.copy(Enum.at(row, 8))
      }
    end)
    |> Stream.reject(&(&1.type == "closed"))
    |> Enum.to_list()
  end

Even weirder, the book then introduces the Flow library to replace the Stream functions and reports a two fold slow down, but I get a 60 fold speed up!

  def open_airports() do
    airports_csv()
    |> File.stream!()
    |> CSV.parse_stream()
    |> Flow.from_enumerable()
    |> Flow.map(fn row ->
      %{
        id: :binary.copy(Enum.at(row, 0)),
        type: :binary.copy(Enum.at(row, 2)),
        name: :binary.copy(Enum.at(row, 3)),
        country: :binary.copy(Enum.at(row, 8))
      }
    end)
    |> Flow.reject(&(&1.type == "closed"))
    |> Enum.to_list()
  end

Finally, fixing the presumed bottleneck to allow flow to take advantage of concurrency by moving the CSV parsing into the Flow.map call is supposed to lead to significant speed up but in my case is essentially the same running time.

  def open_airports() do
    airports_csv()
    |> File.stream!()
    |> Flow.from_enumerable()
    |> Flow.map(fn row ->
      [row] = CSV.parse_string(row, skip_headers: false)

      %{
        id: Enum.at(row, 0),
        type: Enum.at(row, 2),
        name: Enum.at(row, 3),
        country: Enum.at(row, 8)
      }
    end)
    |> Flow.reject(&(&1.type == "closed"))
    |> Enum.to_list()
  end

I really am at a loss to explain why I’m having such different results from the book.

Is this the same code as well as the same data?

Generally moving from enum to streams trades speed for less memory need. Things are not meant to become faster (but rather become slower) in favor of not needing to load everything into memory at once and being able to process data, which doesn’t fit into memory comfortably or even at all.

If your data fits into memory Enum functions are expected to be faster.

Similarly moving to a multi process architecture adds overhead in process communication (there’s no reduction) in favor of being able to use multiple cpu cores at the same time.

If your data doesn’t even max out a single core for processing it’ll be faster in a single process than with multiple ones.

Especially the last one is a bit simplified as there’s also things like io involved, but I hope you get the idea.

5 Likes

It’s real world data, so the file is probably not exactly the same as the one used by the authors, but it is from the same source and should be similar in size and structure. The code I’m using is the same as from the book, yes.

Thanks for explaining Enum vs Stream broadly. As I understand it both inherently and from your explanation, the performance results I got probably make some sense but I still don’t understand why my results diverge so drastically from the authors’ results.


Just to reinforce @LostKobrakai’s points above, the chapter concludes with the same basic advice about when using something like Flow for concurrency might be a bad idea:

While processes are lightweight, they are still an overhead when dealing with
small tasks, which can be processed faster synchronously.

The main thing that strikes me as different between your Enum and Stream implementations is that you’re using :binary.copy in the Stream version. Is there a reason for that?

Not sure because I didn’t look at the data being processed but if I had to guess I would guess that it is because binary copy can save memory allowing the larger binary to be garbage collected.

erlang copy/2 docs.

Per the book it has to do with how NimbleCSV processes the stream. See: reference
To my post, though, that difference between the Stream and Enum implementations would not explain the discrepancy between what I got and what the book authors reported since we both used the same code.