Elixir language performance tuning for 1 quadrillion records per month

Hello Elixir community,

I would like to share with you some results of my first 2 months of Elixir learning and ask important question about Elixir language performance as such.

I’ve made some POC application and was able to run it on production-like host (AWS c5.9xl - 96 cores, 70Gb of RAM) and I got about 3-3.5 time worse tps comparing to our current prod metrics. To troubleshoot I just decided make some very rudimentary tests of the main logic which is key in our business - the logic is very simple:

  • iterate over files (compressed JSONs or plain CSVs) and
  • line by line make some map transformation and
  • perform aggregation by applying a sum function for the decimal value on string key

Simple enough, however the only feature of our business is the volume - it is 1 quadrillion (10^15) records per month what I need to account which is ~380 * 10^6 tps. We have such applications which are handling this volume in prod ATM.

For Elixir capability evaluation I took 2 files a plain csv what I need to aggregate (reduce by sum function) - the small (~1000 records) and big (11.7 million records)

Please see snippet of this file

HeaderLength=8
DatasetName=xxxBillingHourly
CreationTimestamp=1561942800000
StartRangeMarker=2019-07-01-01-00-00
EndRangeMarker=2019-07-01-02-00-00
FieldSpec=field-1,field-2,field-3,field-4,field-5,field-6,field-7,field-8,field-9,startTime,endTime,value
KeySpec=field-1,field-2,field-3,field-4,field-5,field-6,field-7,field-8,field-9,startTime,endTime
DataSources=someS3bucket

007931482,abcStore,abcStore,GetPar,GetPar,,us-west-1,,,2018-07-01 01:00:00,2018-07-01 02:00:00,10
016299379,abcEC2,abcEC2,Hours,Gateway,,,arn:aws:someARN,,2018-07-01 01:00:00,2018-07-01 02:00:00,10
018870929,abcLambda,abcLambda,Second,Invoke,,,arn:aws:some-function,,2018-07-01 01:00:00,2018-07-01 02:00:00,59.35000000000002086
.
.

and used Elixir and Java programs to compare with each other (Java is current language we use).

Bellow a listing of two programs which are doing the same things.

Java

    @Test
    public void konaAggregate() throws IOException {        
        
        long start = System.currentTimeMillis();
        System.out.println("Start");
        
        String fName = "/home/temp/1/real_kona";
        Path totalFilePath = Paths.get(fName + "_java_aggregated");
        
        Stream<Entry<String, BigDecimal>> stream = Files
        .lines(Paths.get(fName))
        .skip(9)
        .map(l -> {
            String[] parts = l.split(",");
            return Tuple.of(String.join(",", parts[0], parts[1], parts[2], parts[3], parts[4]), new BigDecimal(parts[11]));
        })
        .collect(
                Collectors.groupingBy(
                        Tuple2::_1,
                        TreeMap::new,
                        Collectors.mapping(Tuple2::_2, Collectors.reducing(BigDecimal.ZERO, BigDecimal::add))
                        )
                )
        .entrySet()
        .stream();
        
        try (BufferedWriter writer = Files.newBufferedWriter(totalFilePath)) {
            stream.forEach(line -> {
                String record = line.getKey() + "," + line.getValue().toPlainString();
                try {
                    writer.write(record);
                    writer.newLine();
                } catch (IOException e) {
                    // ignore
                }
            });
        }
        long end = System.currentTimeMillis();        
        System.out.println("Total time: " + (end - start));
    }

Elixir

def aggregate_kona() do

  IO.puts("Start")
  kona =  "/home/temp/1/real_kona"

  output_path = kona <> "_elixir_aggregated"

  start = :os.system_time(:millisecond)

  kona
  |> File.stream!
  |> Stream.map(&String.split(&1, "\n"))
  |> Stream.drop(9)
  |> Stream.map(&(&1 |> hd))
  |> Stream.map(&parse_granular(&1))
  |> Enum.reduce(
    %{},
    fn %{record_key: key, record_value: value}, acc ->
      Map.update(acc, key, value, &Decimal.add(&1, value))
    end
  )
  |> Stream.map(&((elem(&1, 0) <> "," <> Decimal.to_string(elem(&1, 1))) <> "\n"))
  |> Stream.into(File.stream!(output_path, [:write, :utf8]))
  |> Stream.run()

  stop = :os.system_time(:millisecond)
  IO.puts("Total: #{stop - start}")
  end
  
  def parse_granular(record) do
    [p,
      pr,
      cpc,
      ut,
      op,
      _,
      _,
      _,
      _,
      _,
      _,
      value] =
      record |> String.split(",")

    %{record_key: p <> "," <> pr <> "," <> cpc <> "," <> ut <> "," <> op, record_value: Decimal.new(value)}

  end

Let me give you results:

For small file which is 1K records there is no issues and both are (Elixir is faster) latency is 50 milliseconds

But for the big file (11.7 million records and ~2Gb size)

Java gives 16 seconds latency and is able to aggregate file
Elixir was running for ~20 minutes with no produced result and I just terminated an iex session

To get somewhere, I made flow in Elixir even simpler, I commented a reduce part of the flow and made a run. It gave me latency ~115 seconds. Java version for such work coped for ~13 seconds.

So the question to Elixir community, could you review please my task, implementation whether I have any obvious mistakes in Elixir part and also why Elixir’s performance is such that I simply can not accept it?

Thank you.

1 Like

So… a couple of things. First, File.stream! is already splitting the input into lines by default, but you then Stream.map(&String.split(&1, "\n")) again on each line, which does nothing.

If you need to do a large number of updates on a Map, consider switching to ETS. It will perform much better with large amounts of data, since it has constant time access and isn’t garbage collected.

There are some other things I’d try that might help too, like replacing this &((elem(&1, 0) <> "," <> Decimal.to_string(elem(&1, 1))) <> "\n") with fn {key, value} -> key <> "," <> Decimal.to_string(elem(&1, 1)) <> "\n" end.

You’re also building a lot of strings, which in eg the JVM is automatically optimized. The BEAM doesn’t optimize that automatically, but there’s this concept of “iolists” that let you manually optimize it.

I’d suggest taking a look at a similar thread posted before that has a lot of tips Erlang/Elixir string performance - can this be improved?

I also wrote an article about this which isn’t completely up to date but collects most of the improvements from the thread https://blog.jola.dev/elixir-string-processing-optimization and it has examples of eg using ETS instead of Map and using iolists.

17 Likes

Thank you. Let me revisit impl with suggestions you gave and I’ll be back with (hopefully) new results.

Sorry could you help to rewrite a reduce function using ETS from the example I have? I desperately spent 3 hours doing that with no result and getting

** (ArgumentError) argument error

This is what I am trying

table = :ets.new(:records, [])

  kona
  |> File.stream!
  |> Stream.drop(9)
  |> Stream.map(&parse_granular(&1))
  |> Enum.each(fn record -> :ets.update_counter(table, record.record_key, {2, record.record_value}, {record.record_key, record.record_value}) end)

Erlang doc is not clear at all.

Thank you.

 update_counter(...)

The function fails with reason badarg in the following situations:

The table type is not set or ordered_setThe element to update is also the key.

Here’s an example:

defmodule A do

  require Record
  Record.defrecord(:user, record_key: 0, record_value: "", visits: 0)
  #default record =>  {:user, 0, "", 0}

  def go do
    :ets.new(:mytable, [:named_table, {:keypos, 2}]) #tuple indexes start at 1 in erlang
                                                     #keypos is the position in the tuple that serves 
                                                     #as the index in the table, which in this case 
                                                     #is record_key
    #defaults for :ets.new():                                                             
    #[set, protected, {keypos,1}, {heir,none}, 
    #{write_concurrency,false}, {read_concurrency,false}].

    records = [
      user(record_key: 1, record_value: "hello"),
      user(record_key: 2, record_value: "goodbye", visits: 1),
      user(record_key: 3, record_value: "g'day", visits: 2)
    ]

    Enum.each(records, fn record -> 
                          :ets.insert(:mytable, record)
                       end)
    IO.inspect :ets.tab2list(:mytable)

    Enum.each(records, fn user(record_key: key) -> 
                          :ets.update_counter(:mytable, key, {4, 1})  #{tuple pos, increment}
                       end)
    IO.inspect :ets.tab2list(:mytable)
    :ok
  end

end

In iex:

~/elixir_programs$ iex a.ex
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Interactive Elixir (1.8.2) - press Ctrl+C to exit (type h() ENTER for help)

iex(1)> A.go
[{:user, 1, "hello", 0}, {:user, 3, "g'day", 2}, {:user, 2, "goodbye", 1}]
[{:user, 1, "hello", 1}, {:user, 3, "g'day", 3}, {:user, 2, "goodbye", 2}]
:ok

iex(2)>
2 Likes

You’re using update_counter, which only supports updating integers, but your value is some struct. It is in the documentation, but I also think erlang documentation is hard to read.

You’ll need a combination of lookup and insert. Something like

def increment_item(table, key, value) do
  initial = case :ets.lookup(table, key) do
    [{_, value}] -> value
    [] -> Decimal.new(0)
  end

  :ets.insert(table, {key, initial + value})
end

Note that I have no idea what Decimal is, so I don’t know how to create them or how to add them together, so I just used +, you’ll have to replace that. I haven’t tested the code.

2 Likes

Thank Jola, I spent some more time an came up with similar solution myself and I have huge (but not the best) result

Here my implementation

defmodule Kona do

  def aggregate_kona() do

  IO.puts("Start")
  kona =  "/home/temp/1/real_kona"

  output_path = kona <> "_elixir_aggregated"

  start = :os.system_time(:millisecond)
  table = :ets.new(:records, [:set, :protected])

  kona
  |> File.stream!
  |> Stream.drop(9)
  |> Stream.map(&parse_granular(&1))
  |> Enum.each(fn record -> update(table, record) end)

  :ets.match_object(table, {:"$0", :"$1"})
  |> Stream.map(fn {key, value} -> key <> "," <> Decimal.to_string(value) <> "\n" end)
  |> Stream.into(File.stream!(output_path, [:write, :utf8]))
  |> Stream.run()

  stop = :os.system_time(:millisecond)
  IO.puts("Total: #{stop - start}")

  end

  def update(ets_table, record) do
    entry = :ets.lookup(ets_table, record.record_key)
    :ets.insert(ets_table, {record.record_key, Decimal.add(record.record_value, empty?(entry))})
  end

  def empty?([]) do
    Decimal.new(0)
  end

  def empty?(list) do
    list |> hd |> elem(1)
  end

  def parse_granular(record) do
    [p,
      pr,
      cpc,
      ut,
      op,
      _,
      _,
      _,
      _,
      _,
      _,
      value] =
      record |> String.split(",")

    %{record_key: p <> "," <> pr <> "," <> cpc <> "," <> ut <> "," <> op, record_value: Decimal.new((value |> String.replace("\n", "")))}

  end
  

end

with that I can now DO aggregate the big (2Gb) file with latency ~135 sec, but it is still ~8x slower than my java version. Could you please take one more look to see if I can do any better?

Thank you for you time and help!

with your version of reduction function got 15 sec boost more and final latency now is ~120 sec. But my question for more review still valid. Thank you

I’m not that sure, but…
I’m not surprised that this version is slower, because elixir/erlang is slower than java in general. To outperform java you need to use currency, which is not used in this case. Stream just helps with memory usage AFAIK.

You probably should look into https://hexdocs.pm/flow/Flow.html, which will parallelize your computations.

Disclaimer: I may be talking bs…

That’s not entirely true, stricto senso: streams allow composing operations so the collection is iterated over only once. In other words, if you want to apply multiple transformations to a collection of items, if you use Map the collection will be traversed once per transformation, but if you use Stream all the transformations will be “collected” into a single operation and only then will the collection be traversed (and only once).

Yes, and this is why it helps with memory consumption, we have to pay this though. Streams have an additional computational overhead and often slower than their eager counterparts.

But as we have an arbitrary large input file, it sounds as if the price is worth to be paid.

Anyway, I doubt as well, that the BEAM version will be able to be as performant as the Java version.

The BEAM version involves a lot of copying data back and forth between the process and the ETS, which in the Java Version is probably mutated in place.

Still, I’d like to give some more suggestions:

  1. Check the other kinds of ETS tables. Some of them are slower than others, and as far as I remember set is somewhat costly on insert/update, as it always has to scan the table for the key, but maybe I’m missremembering things from the talk (it was briefly mentioned in the talk about how phoenix got to 2M active connections)
  2. The parse function could probably be improved by not splitting and reconstructing a big portion of the string but instead found the position of the xth comma and the last comma, and use String.slice/2/3 to get the string-id and value.
  3. don’t create a map in the parser that you then will restructure into a tuple all over the place. Make and keep it as a tuple.
  4. instead of doing a match all query to the ETS, you could check if :ets.tab2list is more performant.
3 Likes

If you want maximum speed you have to stick with tuples, iolist and, integers. I wouldn’t use Decimal, it is based on structs which is a map, and maps are expansive, not as expensive as they used to be. I would recommend you to use :erts_debug.size/1 to determine how much memory your would use.
:erts_debug.size(%{}) == 4 :erts_debug.size({}) == 1

/edit

Another thing is, use pattern matching, if your data is consistent and you always expect the csv to be the same then you don’t have to split the lines to get your values, you can pick them right out of the string which would give you almost pointer arithmetic.

def getmyvalue(<<"StartRangeMarker=", year::binary-size(4), ?-, month::binary-sizer(2), ?-, day::binary-size(2), ?-, hours::binary-size(2), ?-, min::binary-size(2), ?-, sec::binary-size(2)>>) do
{:ok, year, month, day, hours, min, sec}
end
6 Likes

@frumos I think it’s worth talking about expectations here, do you expect the code to run as fast as Java? As @DiodonHystrix pointed out it’s not going to beat Java in raw performance. That’s not why we use Elixir. You could rewrite the Elixir code to use concurrency and it will be a lot faster, but that would be true for Java too (although I wouldn’t use Flow for it, Task.async_stream has less overhead in a scenario like this).

I think that was bag vs duplicate_bag, but I could be wrong.

Yeah, I think that’s from my blog post. I initially experienced match_object to be faster, but I think that was just bad measurements, I should update the post at some point to use :ets.tab2list instead.

1 Like

Do you know in advance how much maximum unique values will this combo have?

If you do, then Erlang’s :counters module can help you (and it’s faster than ETS). However, you’ll have to translate a combo of values to an index by yourself – and that might kill the performance advantage.

Could a Rust NIF help in this case? Maybe for the parsing part? (I’m still trying to understand how much is possible to use Rust in these situations)

1 Like

a few performance tricks to use when dealing with Strings in Elixir (or Erlang)

  • don’t use string concatenation (<>) use io lists
  • don’t use records maps, they are slow, use lists or tuples
  • take advantage of the standard library, don’t overthink the solution: in this case Integer.parse is more than enough to convert a string to an integer
  • use Erlang’s binary module. For example compiled patterns are usually much faster than just string patterns.

On my Laptop (Lenovo P70, 8 cores, 64GB of RAM) your solutions takes approximately 6 seconds to process a little bit more than 1 million rows, while the following takes around 2.1 seconds

  def aggregate_kona2() do
    IO.puts("Start")
    kona = "./data"

    output_path = kona <> "_elixir_aggregated"
    
    pattern = :binary.compile_pattern(",")

    {time, _} =
      :timer.tc(fn ->
        kona
        |> File.stream!()
        |> Stream.drop(9)
        |> Stream.map(&parse_granular2(&1, pattern))
        |> Enum.reduce(
          %{},
          fn [key, value], acc ->
            Map.update(acc, key, value, &Kernel.+(&1, value))
          end
        )
        |> Stream.map(fn {k, v} -> [k, ",", to_string(v), "\n"] end)
        |> Stream.into(File.stream!(output_path, [:write, :utf8]))
        |> Stream.run()
      end)

    IO.puts("Total: #{time / 1_000_000}s")
  end

  def parse_granular2(record, pattern) do
    [p, pr, cpc, ut, op, _, _, _, _, _, _, value] = record |> String.split(pattern)

    {value, _} = Integer.parse(value)

    [
      [p, ",", pr, ",", cpc, ",", ut, ",", op],
      value
    ]
  end

EDIT

Here’s a version using Flow.

It’s not optimized, I’m sure tweaking the params a bit will yeld much better results.

For around 11 millions rows it take ~17secs (as usual for the BEAM, results scale linearly with the size of the input, for 1 million rows it take 1.55 secs, 1.55 * 11 = 17.05)

  def aggregate_kona_flow() do
    IO.puts("Start")
    kona = "./data12"

    output_path = kona <> "_elixir_aggregated_3"
    {:ok, output_file} = File.open(output_path, [:write, :utf8])

    pattern = :binary.compile_pattern(",")

    part_opts = [stages: System.schedulers_online()]
    read_ahead = 32768
    max_demand = 256

    {time, _} =
      :timer.tc(fn ->
        kona
        |> File.stream!(read_ahead: read_ahead)
        |> Stream.drop(9)
        |> Flow.from_enumerable(max_demand: max_demand)
        |> Flow.map(&parse_granular2(&1, pattern))
        |> Flow.partition(part_opts)
        |> Flow.reduce(fn -> %{} end, fn [key, value], acc ->
          Map.update(acc, key, value, &(&1 + value))
        end)
        |> Flow.each(fn {k, v} ->
          IO.write(output_file, [k, ",", to_string(v), "\n"])
        end)
        |> Flow.run()
      end)

    File.close(output_file)

    IO.puts("Total: #{time / 1_000_000}s")
  end

9 Likes

Where did you get the source data from? I’d like try your code on my MacBook.

I created some fake data from the snippet in the first post

This is just what I was thinking, if performance is a concern on minimal hardware, rust would be the way to go, especially as it would outperform the very optimized JVM (and multi-threading is a breeze in it!).

2 Likes

Tell me if I’m wrong: with Rust NIFs (and NIFs in general) while the Rust code is executed the BEAM scheduler is completely locked right? So the Rust code should return as fast as possible?