5x increased latency when processing json files in parallel vs sequential

Hello Elixir community!
First of all, I have accomplished my requirements and was successfully deliver results to my manager using Elixir first time, but I am not satisfied with performance or even more exactly I don’t understand performance of my program and need your assistance to help.

Short introductions:

I need to collect particular records belonging to one customer for the period of 1 year. For that:

  1. I list object from S3 and select to download only those which belongs to the customer
  2. After download I list 200K local *.gz batches and give that to the Task.async stream
  3. For each object, in separate process I need to open it, parse json and search for particular records by given filter
  4. Result is flushed to its own file

So overall the requirement is pretty trivial but total the # of records I have to process is ~10 billions

The total latency of the application run is ~5 hours

I have a C5.18XL EC2 instance (72 cores, 138Gb RAM) to run this program. My concurrency factor is equal to # of cores

The phase 1 is very fast and takes ~10 minutes, after that I list local files and give it to processing which take ~5 hours. I would not complain about latency if:

  • when I give list with single file to the processing function I have latency of that file ~1.5 sec
  • when I give list with many files (I tried 10, 72, 144) but there is that same file as above, the latency is 5x more and ~10 sec for that file

So running function per files in sequence give latency per file ~1.5 sec (all files are about the same size)
But running function per files in parallel give latency per file ~10 sec

The slowest part when I run in parallel is json parsing/processing. I tried many variations to exclude other parts and concluded it is json processing. I also tried 2 different parsers, one is streaming Jaxon and another one is loading the whole file into memory with Poison. Poison behaves worse then Jaxon but both introduce notable latency increase.

Let me give snippets for the core processing for review:

With Jaxon streaming:

  def process_batch(file_name, src_path, dest_path) do
    src_file = Path.join(src_path, file_name)
    dest_file = Path.join(dest_path, file_name <> ".processed")

    {process_time, _result} =
      :timer.tc(fn ->
        src_file
        |> File.stream!([], @one_mb)
        |> StreamGzip.gunzip()
        |> Stream.map(&String.split(&1, @header_sep))
        |> Stream.flat_map(& &1)
        |> Stream.drop(1)
        |> Jaxon.Stream.query([:root, "records", :all])
        |> Stream.filter(
          &(Map.values(&1)
            |> Enum.member?(@my_important_customer))
        )
        |> Stream.map(&(Poison.encode!(&1) <> @new_line))
        |> Stream.into(File.stream!(dest_file, [:write]))
        |> Stream.run()

        if File.stat!(dest_file).size == 0 do
          File.rm!(dest_file)
        else
          Logger.info("Found affected usage in batch: #{file_name}")
        end
      end)

    Logger.info("Processed batch: #{file_name} time: #{process_time}")
  end

With Poison all in memory:

  def process_batch_in_memory(file_name, src_path, dest_path) do
    src_file = Path.join(src_path, file_name)
    dest_file = Path.join(dest_path, file_name <> ".processed")

    {time, _result} =
      :timer.tc(fn ->
        src_file
        |> File.stream!([], 1024 * 1024)
        |> StreamGzip.gunzip()
        |> Stream.map(&String.split(&1, @header_sep))
        |> Stream.flat_map(& &1)
        |> Stream.drop(1)
        |> Enum.into("")
        |> Poison.decode!()
        |> Map.fetch!("records")
        |> Enum.filter(
          &(Map.values(&1)
            |> Enum.member?(@my_important_customer))
        )
        |> Enum.map(&(Poison.encode!(&1) <> @new_line))
        |> Enum.into(File.stream!(dest_file, [:write]))
      end)

    if File.stat!(dest_file).size == 0 do
      File.rm!(dest_file)
    else
      Logger.info("Found affected usage in batch: #{file_name}")
    end

    Logger.info("Processed batch: #{file_name} time: #{time}")
  end

and that’s how I run it in parallel:

  File.ls!(src_dir)
    |> Task.async_stream(
      &Tools.Amendments.Task.process_batch(&1, src_dir, dest_dir),
      maximum_concurrency: @max_concurrency_process,
      timeout: @task_timeout
    )
    |> Enum.to_list()

Please note that this question is not about “why my app/code is slow” but rather why the same code poses different latency when running sequentially/parallel. I simply don’t understand what is that hidden factor resulting to that.

Thank you for your time and review of my question.

2 Likes

You didn‘t say much about the filesystem/filesize. Are you sure that concurrent file access isn‘t slowing you down?

Did I understand you correctly? With your above code one 200KB JSON file takes 1.5 secs? Or is it 200KB compressed? If so, how big is it uncompressed?

Additionally, have you tried Jason? It supersedes Poison and is faster.

2 Likes

I ran into a similar problem and it turned out to be the Erlang file server.

The calls to File.stat go through a single process, causing a bottleneck in parallel code. See https://hexdocs.pm/elixir/File.html#module-processes-and-raw-files

It looks like the File.stat! function doesn’t have a :raw flag, but the Erlang :file.read_file_info does: http://erlang.org/doc/man/file.html#read_file_info-1

3 Likes

Thank you for your time reviewing my issue. I have some update and also answer your questions.

The update:
It seems it NOT a json parsing issue at all, it is just general processing issue. I removed the json parsing logic entirely and use now just simple text processing. Here new version of the function

{time, _result} =
      :timer.tc(fn ->
        src_file
        |> File.stream!([], 1024*1024)
        |> StreamGzip.gunzip
        |> Stream.map(&(String.split(&1, "\n\n")))
        |> Stream.flat_map(&(&1))
        |> Stream.drop(1)
        |> Stream.map(&String.split(&1, "\n"))
        |> Stream.flat_map(&(&1))
        |> Stream.filter(&(&1 =~ "myImportantCutomer"))
        |> Enum.map(&(&1 <> @new_line))
        |> Enum.into(File.stream!(dest_file, [:write]))
      end)

When I run this program for one process the latency is 90 milliseconds per file when run it for all cores - 72 processes the latency jumps to 300 milliseconds per file - so 3x performance lost.
In my next attempt I’ll try to remove Gzip function from stream to see the behavior, and also I can stream from S3 directly to memory as a option to check.

Answering questions:

  • the batch is gz-ed file, in archive ~700Kb, unzipped ~15Mb. Size is uniformed and ~55K records in one batch. 200K batches in local file system to process.
  • I removed the File.stat call from function - the same effect
  • host details:

user-dev-6004.iad6$ uname -a
Linux user-dev-6004.iad6.corp.some-company.com 4.9.184-0.1.ac.235.83.329.metal1.x86_64 #1 SMP Tue Jul 30 17:17:50 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

  • Erlang details

Erlang/OTP 22 [erts-10.4] [source] [64-bit] [smp:72:72] [ds:72:72:10] [async-threads:1] [hipe]
Interactive Elixir (1.9.0) - press Ctrl+C to exit (type h() ENTER for help)

Having that I would reformulate my question:

  • “Is it possible in Erlang that processes are influencing and slow down each other even if they have no any shared resources?”
  • or “Is it effect of when multiple processes are accessing the same disk device and compete for IO?”

Could administration please rename the topic header?

Thank you for your time and looking into my issue.

1 Like

try and increase async-threads to say 32,64 or even 96 respectively eg +A 32 when you start up the beam

also given sufficient memory? I would go for reading entire file into memory…

1 Like

I’m no expert but given the file size I would also try it without File.stream!.
I would simply change it to File.read.

Also if “myimportantclient” is a constant in the text I would pre-search the file before parsing it entirely.
Why parse the json if no useful data is there? I don’t know the use case but usually I just get creative until I solve the problem with fewer resources.

Sorry this is not an issue at all, I am not seeking for extra optimization of the working program. If I go this way and “per-seach”, the remaining batches will be still affected by the same issue what I need to locate and understand. Thank you.

Host has tons of RAM 170 Gb. I’ll try later a version of streaming the batch from the S3 directly to the processing function bypassing disk. Will report my results after. Thank you.

did you increase async-threads? believe you are currently file IO limited…

how do you start the server? try with iex --erl "+A 96" -S mix phx.server and see if that doesn’t “unlimit” file IO for you…

So all this disk IO is happening on an EBS volume? Have you ruled out IOPs throttling? you don’t get alot of i/o with out going to provisioned iops.

It would be interested to just try an instance that has a local ssd. Also, all the writing happening in a single directory, you might be creating contention with meta data. There are mount options to lessen this, and also creating more of a hashed directory structure might be something to try.

1 Like

Sorry haven’t tried it yet, will do later. Thank you

Thanks for analyzing - this assumption is pretty possible. Yes C5.18XL is EBS backup-ed, I need to pick up another instance type with SSD drive to test, will do it later. Thank you.

Sorry to say - did not help, the same issue:

iex --erl “+A 96” -S mix

then

all = File.ls!(“/home/user/temp/diskspace/tools/batches”)
take = all |> Enum.take(72)
take |> Tools.Amendments.proces_batches_in_process

and last entry from the log

00:32:16.633 [info] Processed batch: 2018-12-05-11-14-47-xyz-file-50bf681f9878696749d1ca483f5f9e5b.gz time: 257694

then running the same file in single process:

iex(9)> [“2018-12-05-11-14-47-xyz-file-50bf681f9878696749d1ca483f5f9e5b.gz”] |> Tools.Amendments.proces_batches_in_process
:ok
00:34:14.859 [info] Started to process batches
iex(10)>
00:34:14.859 [info] Overal process completed
00:34:14.953 [info] Processed batch: 2018-12-05-11-14-47-xyz-file-50bf681f9878696749d1ca483f5f9e5b.gz time: 93695

You see the 250 milliseconds vs 93 latency increase.

My next step to rule out EBS volume. Thank you.