Json complex document streaming parsing with Jaxon

Hello!

Given

My input is, a gz batch file having records about usage of some product by customer in json structure (legacy format and not modifiable) like:

header1=value1
header2=value2
header3=value3
header4=value4
header5=value5

{"namespace":"aggregation","validators":["someValidator"],"symbol-table":{"timestamp":"0","customer":"1","product":"2","value":"3"},"records":[{"0":"1546339235","1":"Bob","2":"apple","3":"5"},{"0":"1546339739","1":"Bob","2":"orange","3":"2"},{"0":"1546339839","1":"Alice","2":"apple","3":"5"}, {"0":"1546339839","1":"Alice","2":"orange","3":"1"}, {"0":"1546339839","1":"Alice","2":"apple","3":"2"}],"record-count":"5"}

My streaming processing function to aggregated similar records (the same customer and same product) looks like:

"/home/user/temp/rawBatch/batch.gz" 
|> File.stream! 
|> StreamGzip.gunzip 
|> Stream.map(&(String.split(&1, "\n"))) 
|> Stream.flat_map(&(&1)) 
|> Stream.drop(6) 
# I also need to get symbol-table to be able compose aggregation key (used in reduce function) dynamically
|> Jaxon.Stream.query([:root, "records", :all]) 
# hardcoded key indexes
|> Stream.map(&(%{record_key: &1["1"] <> "," <> &1["2"], record_value: Decimal.new(&1["3"])})) 
|> Enum.reduce(%{}, fn %{record_key: key, record_value: value}, acc -> Map.update(acc, key, value, & Decimal.add(&1, value)) end) 
|> Enum.for_each(write_to_file)

and it works near perfect except line

Stream.map(&(%{record_key: &1[“1”] <> “,” <> &1[“2”], record_value: Decimal.new(&1[“3”])}))

where I compose an aggregation key with hard coded indexes.

The indexes are encoded however in json’s “symbol-table” element and can vary from batch to batch.

Problem

I need to compose aggregation key with indexes obtained from symbol-table element dynamically in the stream preferably with one file processing ‘loop’

There may be such an option in Jaxon library but its documentation does not give me any clear hint on the solution.

Could you suggest please viable options to solve my task?

Thank you.

Are you expecting huge files to be fed to your code?

@boudra Can Jaxon execute two separate queries on the same stream without rewinding it from the start?

thank you for your review.

Are you expecting huge files to be fed to your code?

the batch file size compressed is up to 30Mb, average one is 5Mb. The amount of records in one batch can be up to 100K(it is big one and rare), average is 2-10K

This is my very first step. Then I need to scale this app to ~20K batches tps.

Looked through Jaxon’s docs and part of the code but I cannot find anything that allows you to do two separate queries on one stream. Maybe the library’s author whom I mentioned above can help.