Using streams and concurrency to demonstrate Elixir's potential to colleagues

I’m trying to create a simple example to show colleague the potential of Elixir.
My contrived example was to find the most frequently occurring alphanumeric
in a very large data file. It ruby, it’s simply:

It took 23 minutes.

My attempt in Elixir was to use File.stream! with params to chunk it into 1M chunks and apply 6 cores.

To my horror it took 60 mins of cpu and 10 minutes of real time.
What am I doing wrong?

1 Like

This might give You some hints…

6 Likes

I’m familiar with Flow - it’s an elegant library.
But it’s overkill for what I’m doing.
It also doesn’t help me understand what is wrong with my existing code.

Are there tools available to help a programmer understand
where a concurrent piece of code is spending its time?

Something similar is explored in https://www.youtube.com/watch?v=Y83p_VsvRFA.

Elixir is not that good with computation-intensive operations. Immutability does not help here either. That being said you can do a few things which might improve it.

The elixir code is not equivalent to ruby code

  1. you are using Enum, which is not lazy. It is essentially loading the whole file at once and passing whole new object between enum operations.
  2. you should merge many enum operations into one to avoid multiple intermediate object creation, you can replace all of it with an Enum.reduce. we can find max in one pass without creating map or intermediate list etc
  3. you can replace sort & fetch by max_by or min_by, ruby version does not sort
  4. you can use binary utf8 pattern matching (<<char::utf8, rest::binary>>) to avoid codepoints and regexp

you can see memory used by all objects in the VM using :observer.start()

I’m trying to create a simple example to show colleague the potential of Elixir.

I think you should focus on unique features elixir brings to the table such as fault tolerance and concurrency, rather than number crunching. Depending on the problem you are trying to solve, these things might be way more important than raw computation speed.

4 Likes

While I agree with @akash-akya that making such a piece of code faster is not necessarily the most interesting feature of Elixir, in that case there is room for improvement.
I think a good tool to figure out where a piece of code is spending its time is Benchee.
My hypothesis is that the code is slow because of the list creations in count_alpha. On the reduce side of the code, the [encoding: :utf8] option passed to File.stream! might be problematic.

To test this hypothesis I wrote a quick and dirty version (probably not the fastest and certainly not the most idiomatic or elegant) :

defmodule AlphamaxOptim do
  @valid_chars Enum.map(?a..?z, fn x -> <<x::utf8>> end) ++
                 Enum.map(?A..?Z, fn x -> <<x::utf8>> end) ++
                 Enum.map(?0..?9, fn x -> <<x::utf8>> end)

  @init_char_map Enum.map(@valid_chars, fn x -> {x, 0} end) |> Enum.into(%{})

  def count_alpha(string) do
    recursive_count(@init_char_map, string)
  end

  def recursive_count(acc, ""), do: acc

  for ch <- @valid_chars do
    def recursive_count(acc, unquote(ch) <> next) do
      recursive_count(%{acc | unquote(ch) => acc[unquote(ch)] + 1}, next)
    end
  end

  def recursive_count(acc, str) do
    case String.next_codepoint(str) do
      nil -> recursive_count(acc, "")
      {_, next} -> recursive_count(acc, next)
    end
  end

  def process(file) do
    File.stream!(file, [], 1_000_000)
    |> Task.async_stream(fn x -> count_alpha(IO.iodata_to_binary(x)) end,
      max_concurrency: 6,
      ordered: false
    )
    |> Stream.map(fn {:ok, x} -> x end)
    |> Enum.reduce(fn a, b -> Map.merge(a, b, fn _k, v1, v2 -> v1 + v2 end) end)
    |> Enum.max_by(fn {_k, v} -> v end)
    |> IO.inspect()
  end
end

I believe it is correct as it spits out the same result as the original. Ideally, it should be tested with something like proper, using the original as an anchor.

Then I benchmarked it against the original with a 100M file :

Benchmarking optimized100...
{"0", 8819253}
{"0", 8819253}
{"0", 8819253}
Benchmarking original100...
{"0", 8819253}
{"0", 8819253}

Name                   ips        average  deviation         median         99th %
optimized100          0.22         4.48 s     ±0.37%         4.48 s         4.49 s
original100         0.0183        54.79 s     ±0.00%        54.79 s        54.79 s

Comparison:
optimized100          0.22
original100         0.0183 - 12.23x slower +50.31 s

And with a 500M file (just to be sure) :

Benchmarking optimized500...
{"0", 31722406}
{"0", 31722406}
Benchmarking original500...
{"0", 31722406}
{"0", 31722406}

Name                   ips        average  deviation         median         99th %
optimized500        0.0404       0.41 min     ±0.00%       0.41 min       0.41 min
original500        0.00359       4.64 min     ±0.00%       4.64 min       4.64 min

Comparison:
optimized500        0.0404
original500        0.00359 - 11.25x slower +4.23 min

Then I wondered what would happen if we had mutability, only to remember that we did : ets tables or :counters. I’ve never used counters, so I tried it :

defmodule AlphamaxOptimMutable do
  @valid_chars Enum.map(?a..?z, fn x -> <<x::utf8>> end) ++
                 Enum.map(?A..?Z, fn x -> <<x::utf8>> end) ++
                 Enum.map(?0..?9, fn x -> <<x::utf8>> end)


  @indexed_chars Enum.with_index(@valid_chars, 1) |> Enum.into(%{})


  def count_alpha(ref, string) do
    recursive_count(ref, string)
  end

  def recursive_count(ref, ""), do: ref

  for ch <- @valid_chars do
    def recursive_count(ref, unquote(ch) <> next) do
      :counters.add(ref, @indexed_chars[unquote(ch)], 1)
      recursive_count(ref, next)
    end
  end

  def recursive_count(ref, str) do
    case String.next_codepoint(str) do
      nil -> recursive_count(ref, "")
      {_, next} -> recursive_count(ref, next)
    end
  end

  def process(file) do
    ref = :counters.new(length(@valid_chars), [:write_concurrency])

    File.stream!(file, [], 1_000_000)
    |> Task.async_stream(fn x -> count_alpha(ref, IO.iodata_to_binary(x)) end,
      max_concurrency: 6,
      ordered: false
    )
    |> Enum.map(fn x -> x end)

    @indexed_chars
    |> Enum.map(fn {ch, ix} -> {ch, :counters.get(ref, ix)} end)
    |> Enum.max_by(fn {_k, v} -> v end)
    |> IO.inspect()
  end
end

Benchmarked it again :

Name                           ips        average  deviation         median         99th %
optimized_mutable100          0.27         3.65 s     ±0.55%         3.65 s         3.66 s
original100                 0.0184        54.32 s     ±0.00%        54.32 s        54.32 s

Comparison:
optimized_mutable100          0.27
original100                 0.0184 - 14.88x slower +50.67 s

optimized_mutable500        0.0525       0.32 min     ±0.00%       0.32 min       0.32 min
original500                0.00360       4.63 min     ±0.00%       4.63 min       4.63 min

Comparison:
optimized_mutable500        0.0525
original500                0.00360 - 14.56x slower +4.31 min


There are probably much better ways to improve the original code (performance wise), but I think the methodology would be more or less similar.

6 Likes

My attempt at it.
Showing benchmark comparison with previous best, AlphamaxOptimMutable by krstfk. Mostly similar to krstfk’s solution but has slight difference in details.

defmodule AlphaMaxFreq do
  @chars Enum.to_list(1..?z)

  defp collect(<<>>, acc), do: acc

  defp collect(<<char::size(8), rest::binary>>, ref)
       when (char >= ?0 and char <= ?9) or (char >= ?a and char <= ?z) or
              (char >= ?A and char <= ?Z) do
    :counters.add(ref, char, 1)
    collect(rest, ref)
  end

  defp collect(<<_::utf8, rest::binary>>, acc), do: collect(rest, acc)
  defp collect(<<_::size(8), rest::binary>>, acc), do: collect(rest, acc)

  def max(file) do
    sch = :erlang.system_info(:schedulers)
    ref = :counters.new(length(@chars), [:write_concurrency])

    File.stream!(file, [], 10 * 64 * 1024)
    |> Task.async_stream(&collect(&1, ref), max_concurrency: sch, ordered: false)
    |> Stream.run()

    char = Enum.max_by(@chars, &:counters.get(ref, &1))
    IO.inspect([<<char::size(8)>>, :counters.get(ref, char)])
  end
end

Benchmark

Operating System: macOS
CPU Information: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Number of Available Cores: 12
Available memory: 16 GB
Elixir 1.9.4
Erlang 22.1.1

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
parallel: 1
inputs: none specified
Estimated total run time: 14 s

Benchmarking AlphaMaxFreq...
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
["P", 1054798]
Benchmarking AlphamaxOptimMutable...
{"P", 1054798}
{"P", 1054798}
{"P", 1054798}

Name                           ips        average  deviation         median         99th %
AlphaMaxFreq                  1.45         0.69 s    ±11.58%         0.65 s         0.83 s
AlphamaxOptimMutable          0.33         3.04 s     ±6.69%         3.04 s         3.18 s

Comparison:
AlphaMaxFreq                  1.45
AlphamaxOptimMutable          0.33 - 4.42x slower +2.35 s

Extended statistics:

Name                         minimum        maximum    sample size                     mode
AlphaMaxFreq                  0.62 s         0.83 s              8                     None
AlphamaxOptimMutable          2.90 s         3.18 s              2                     None
6 Likes
  1. That was my first thought.
    But it turns out that Enum.reduce causes the file to be loaded incrementally.
    I confirmed this by putting some IO.inspect in the pipeline.

  2. Agreed. What I ended up with was:

  def count_alpha_num(chunk) do
    String.replace(chunk, ~r/\W/, "")
    |> String.graphemes
    |> Enum.reduce(Map.new, fn c,acc -> Map.update(acc, c, 1, &(&1+1)) end)
  end
  1. yeah but that’s a minor cost in the grand scheme of things.

  2. I’d forgotten about Elixir’s binary pattern matching. Good suggestion!

krstfk and akash-akya - thanks for the feedback!

My original concern was that the file wasn’t loaded lazily but some IO.inspect convinced me it was.

akash-akya:
Very nice use of :counters.
That ended up being 10x faster than my improved solution.
I’ll file that away for future reference.

What Mac do you have that has 16 cores? Jealous!

Are you able to upload the file with which you are exercising the potential solutions? I am willing to take a shot at this because I have lately did something very similar both in Elixir and Rust.

I used this to generate some data

defmodule GenFile do
  @chars 32..?~

  def start(name, size) do
    count = trunc(size / 65536)

    1..count
    |> Task.async_stream(
      fn _ ->
        Enum.map(1..65536, fn _ -> <<Enum.random(@chars)::size(8)>> end)
        |> IO.iodata_to_binary()
      end,
      max_concurrency: 12,
      ordered: false
    )
    |> Stream.map(fn {:ok, chunk} -> chunk end)
    |> Enum.into(File.stream!(name))
  end
end

GenFile.start("testdata.txt", 100_000_000)

What Mac do you have that has 16 cores? Jealous!

haha, Mac Pro 15in 2018 :slight_smile:

You can find the dataset I used at:

http://www.catalogueoflife.org/content/annual-checklist-archive

I downloaded the 2019 dataset and used the taxa.txt file (1.5G) as input.

My original naive example took the same time as my ruby solution - 20 m on a recent Macbook Pro.
Following krstfk and akash-akya’s suggestions got it down to six minutes.

But akash-akya’s counter solution crushed it in 14 seconds.
Best of luck beating that…

Sure, I generate my own data quite a lot as well – especially to test how well an algorithm scales, I use 1MB, 10MB, 100MB, 1GB sets, sometimes 10GB as well (and if I have doubts, stuff like 20-50 MB, or 200-500MB etc). I just want to have a baseline test data for this case.

Thanks!

I don’t have to, I am just curious. :slight_smile: I use the :counters module (sometimes the :ets.update_counter functions as well) in such workflows all the time anyway – copying and modifying an otherwise immutable map in parallel contention conditions is extremely inefficient.

I’ll give it a go once I shake off all the take-home assignments I have to work on as a part of my job hunt.

Am I right to think that you are only ever counting the alphanumerics (A-Za-z0-9)?

If so, why? Wasn’t the original task to count all characters, Unicode included?

Additionally, why use :schedulers and not :schedulers_online (that’s what Task.async_stream uses if you don’t specify concurrency).

Well, I did. :wink: Here’s the code:

defmodule AlphaMaxDimi do
  @chars Enum.to_list(1..?z)

  defp collect(<<>>, acc), do: acc

  defp collect(<<char::size(8), rest::binary>>, ref)
       when (char >= ?0 and char <= ?9) or (char >= ?a and char <= ?z) or
              (char >= ?A and char <= ?Z) do
    :counters.add(ref, char, 1)
    collect(rest, ref)
  end

  defp collect(<<_::utf8, rest::binary>>, acc), do: collect(rest, acc)
  defp collect(<<_::size(8), rest::binary>>, acc), do: collect(rest, acc)

  defp collect_chunk(blob, ref) do
    subref = :counters.new(length(@chars), [])
    collect(blob, subref)
    Enum.each(@chars, fn c ->
      :counters.add(ref, c, :counters.get(subref, c))
    end)
  end

  def max(path) do
    ref = :counters.new(length(@chars), [:write_concurrency])

    path
    |> File.stream!([], 512 * 1024)
    |> Task.async_stream(&collect_chunk(&1, ref), ordered: false)
    |> Stream.run()

    char = Enum.max_by(@chars, &:counters.get(ref, &1))
    IO.inspect([<<char::size(8)>>, :counters.get(ref, char)])
  end
end

The code is almost the same as @akash-akya’s but I was not happy that each and every character gets to write to an atomic counter. So I batched those. Each 512KB buffer read from the file gets their own private counters which are updated serially (single thread, e.g. single Erlang process), and then they get coalesced on to the global counters. This improved speeds roughly from 20% to 25%, depending on file sizes.

Here are the comparisons between mine and @akash-akya’s code on three different files.

18MB file (2019-annual/vernacular.txt), benchmark time 5s:

Name                   ips        average  deviation         median         99th %
Dimi (18MB)           7.67      130.37 ms     ±4.49%      127.94 ms      145.92 ms
Akash (18MB)          6.01      166.37 ms     ±3.07%      165.58 ms      182.67 ms

Comparison:
Dimi (18MB)           7.67
Akash (18MB)          6.01 - 1.28x slower +36.01 ms

300MB file (2019-annual/reference.txt), benchmark time 20s:

Name                    ips        average  deviation         median         99th %
Dimi (300MB)           0.49         2.06 s     ±0.55%         2.06 s         2.08 s
Akash (300MB)          0.40         2.50 s     ±1.61%         2.52 s         2.53 s

Comparison:
Dimi (300MB)           0.49
Akash (300MB)          0.40 - 1.21x slower +0.44 s

1.6GB file (2019-annual/taxa.txt), benchmark time 60s:

Name                    ips        average  deviation         median         99th %
Dimi (1.6GB)         0.0925        10.81 s     ±0.30%        10.82 s        10.84 s
Akash (1.6GB)        0.0755        13.25 s     ±0.46%        13.25 s        13.31 s

Comparison:
Dimi (1.6GB)         0.0925
Akash (1.6GB)        0.0755 - 1.22x slower +2.43 s
3 Likes