Hello Elixir community!
First of all, I have accomplished my requirements and was successfully deliver results to my manager using Elixir first time, but I am not satisfied with performance or even more exactly I don’t understand performance of my program and need your assistance to help.
Short introductions:
I need to collect particular records belonging to one customer for the period of 1 year. For that:
- I list object from S3 and select to download only those which belongs to the customer
- After download I list 200K local *.gz batches and give that to the Task.async stream
- For each object, in separate process I need to open it, parse json and search for particular records by given filter
- Result is flushed to its own file
So overall the requirement is pretty trivial but total the # of records I have to process is ~10 billions
The total latency of the application run is ~5 hours
I have a C5.18XL EC2 instance (72 cores, 138Gb RAM) to run this program. My concurrency factor is equal to # of cores
The phase 1 is very fast and takes ~10 minutes, after that I list local files and give it to processing which take ~5 hours. I would not complain about latency if:
- when I give list with single file to the processing function I have latency of that file ~1.5 sec
- when I give list with many files (I tried 10, 72, 144) but there is that same file as above, the latency is 5x more and ~10 sec for that file
So running function per files in sequence give latency per file ~1.5 sec (all files are about the same size)
But running function per files in parallel give latency per file ~10 sec
The slowest part when I run in parallel is json parsing/processing. I tried many variations to exclude other parts and concluded it is json processing. I also tried 2 different parsers, one is streaming Jaxon and another one is loading the whole file into memory with Poison. Poison behaves worse then Jaxon but both introduce notable latency increase.
Let me give snippets for the core processing for review:
With Jaxon streaming:
def process_batch(file_name, src_path, dest_path) do
src_file = Path.join(src_path, file_name)
dest_file = Path.join(dest_path, file_name <> ".processed")
{process_time, _result} =
:timer.tc(fn ->
src_file
|> File.stream!([], @one_mb)
|> StreamGzip.gunzip()
|> Stream.map(&String.split(&1, @header_sep))
|> Stream.flat_map(& &1)
|> Stream.drop(1)
|> Jaxon.Stream.query([:root, "records", :all])
|> Stream.filter(
&(Map.values(&1)
|> Enum.member?(@my_important_customer))
)
|> Stream.map(&(Poison.encode!(&1) <> @new_line))
|> Stream.into(File.stream!(dest_file, [:write]))
|> Stream.run()
if File.stat!(dest_file).size == 0 do
File.rm!(dest_file)
else
Logger.info("Found affected usage in batch: #{file_name}")
end
end)
Logger.info("Processed batch: #{file_name} time: #{process_time}")
end
With Poison all in memory:
def process_batch_in_memory(file_name, src_path, dest_path) do
src_file = Path.join(src_path, file_name)
dest_file = Path.join(dest_path, file_name <> ".processed")
{time, _result} =
:timer.tc(fn ->
src_file
|> File.stream!([], 1024 * 1024)
|> StreamGzip.gunzip()
|> Stream.map(&String.split(&1, @header_sep))
|> Stream.flat_map(& &1)
|> Stream.drop(1)
|> Enum.into("")
|> Poison.decode!()
|> Map.fetch!("records")
|> Enum.filter(
&(Map.values(&1)
|> Enum.member?(@my_important_customer))
)
|> Enum.map(&(Poison.encode!(&1) <> @new_line))
|> Enum.into(File.stream!(dest_file, [:write]))
end)
if File.stat!(dest_file).size == 0 do
File.rm!(dest_file)
else
Logger.info("Found affected usage in batch: #{file_name}")
end
Logger.info("Processed batch: #{file_name} time: #{time}")
end
and that’s how I run it in parallel:
File.ls!(src_dir)
|> Task.async_stream(
&Tools.Amendments.Task.process_batch(&1, src_dir, dest_dir),
maximum_concurrency: @max_concurrency_process,
timeout: @task_timeout
)
|> Enum.to_list()
Please note that this question is not about “why my app/code is slow” but rather why the same code poses different latency when running sequentially/parallel. I simply don’t understand what is that hidden factor resulting to that.
Thank you for your time and review of my question.