Most efficient way to do transformations, filters and aggregations on large CSV file as data frame

Hi!

I am currently porting our old python data pipeline to elixir, and after implementing it more of less the same, I can’t make it run in a “streamable, concurrent” way, so basically when loading one of our biggest csv files (25mb), the pipeline takes over 15+ minutes to run a file, which is unacceptable considering we receive about 11 files per user to be loaded and want to show them the results asap.

Giving a little bit more of context, we receive data from GPS trackers once they are connected locally and csv files are extracted and then uploaded through our web app, each tracker gets data each 0,1 second and total usage is normally about 30-200 minutes, but there are variations and also can have some GPS blackouts that we do not consider. Then we process that “raw” data, today in form of a Pandas dataframe, to generate enriched data with the help of many python libraries such as numpy, scipy etc. All of the main formulas, filters and aggregators have been ported already, so we have to do a lot of operations in arrays such as diffs, rolling means, filters that drop all indexes from the “dataset” when a gps blackout or impossible value is found, and transformations to put values in the format we need before storing in the DB.

In order to mimic a pandas dataset, I’m using Keyword lists, so imagine a csv file transformed into a Keyword list, and then I was experimenting with Flow to generate do the operations. But haven’t had much luck with results yet.

The problem that I see is that maybe Flow’s way of map reducing element by element may not be suitable to work with datasets, because very often I need huge amounts of data loaded in memory in order to make an aggregation or remove rows by index of the whole dataset, so I’d like to know if there’s a more suitable tool for this kind of operation… I’m going to leave here the main stream of my current attempt so that you can have an idea of what I’m trying to do. I hope it’s enough to understand, but I can show more if not.

def process(stream, field_longs_lats, team_settings) do
    stream
    |> Flow.from_enumerable(stages: 1)
    |> Flow.filter(fn [time | _tl] -> time != "-" end)
    |> Flow.uniq_by(fn [time | _tl] -> time end)
    |> Flow.reduce(&V4.bare_structure_reordered/0, &put_in_keyword_list/2)
    |> Flow.map(&Transform.apply_tranformations/1)
    |> Stream.into([{:field_longs_lats, field_longs_lats}])
    |> Flow.from_enumerable(stages: 1)
    |> Flow.reduce(
      fn -> [] end,
      &TemporaryFields.create_temporary_fields/2
    )
    |> Stream.into([])
    |> Flow.from_enumerable(stages: 1)
    |> Flow.reduce(fn -> [] end, &Filters.filter_outliers/2)
    |> Stream.into([{:team_settings, team_settings}])
    |> Flow.from_enumerable(stages: 1)
    |> Flow.reduce(
      fn -> [] end,
      &FinalFields.create_final_fields/2
    )
    |> Stream.into([])
  end

PS: As you can see, I cannot use more than 1 stage, because inside those reducers there are aggregators that depend on the whole row to give correct output (for example, the millisecond count is a cumulative sum of all differences in time between each row, or the diff function that must consider always the last element to provide proper diff), so I can’t separate in stages otherwise those aggregations will be wrong because will be aggregated only per stage.

2 Likes

This post was written in the middle of the night after only a quick glance at your code, and after a long day filled with talks at the ElixirConf.EU, but hopefully I am able to provide some basic tips:

  1. You state that for large files the current process takes 15+ minutes per file, where a user might send you about 11 files per user. I am fairly certain that regardless of if you use Flow, by running your process on all files concurrently, you will be able to handle all your users in roughlymax(15) minutes rather than in 11 * 15 * number_of_users minutes.
  2. Of course, it would be even better if you were able to work on each of these files in a concurrent way, if that allows you to speed up the handling of them even further. However, because of the amount of calls to Flow.reduce it seems to me that your current application depends heavily on the whole file to make decisions on each single row. If you are able to alter this so that you only need to keep track of some of the rows before (and possibly after) the current one, then this would mean that you are not collecting and copying large amounts of data at a time as much. For instance, your outlier-mechanism could check events in a given window, rather than in the complete lifetime of the sensor measurements.