Hi!
I am currently porting our old python data pipeline to elixir, and after implementing it more of less the same, I can’t make it run in a “streamable, concurrent” way, so basically when loading one of our biggest csv files (25mb), the pipeline takes over 15+ minutes to run a file, which is unacceptable considering we receive about 11 files per user to be loaded and want to show them the results asap.
Giving a little bit more of context, we receive data from GPS trackers once they are connected locally and csv files are extracted and then uploaded through our web app, each tracker gets data each 0,1 second and total usage is normally about 30-200 minutes, but there are variations and also can have some GPS blackouts that we do not consider. Then we process that “raw” data, today in form of a Pandas dataframe, to generate enriched data with the help of many python libraries such as numpy, scipy etc. All of the main formulas, filters and aggregators have been ported already, so we have to do a lot of operations in arrays such as diffs, rolling means, filters that drop all indexes from the “dataset” when a gps blackout or impossible value is found, and transformations to put values in the format we need before storing in the DB.
In order to mimic a pandas dataset, I’m using Keyword lists, so imagine a csv file transformed into a Keyword list, and then I was experimenting with Flow to generate do the operations. But haven’t had much luck with results yet.
The problem that I see is that maybe Flow’s way of map reducing element by element may not be suitable to work with datasets, because very often I need huge amounts of data loaded in memory in order to make an aggregation or remove rows by index of the whole dataset, so I’d like to know if there’s a more suitable tool for this kind of operation… I’m going to leave here the main stream of my current attempt so that you can have an idea of what I’m trying to do. I hope it’s enough to understand, but I can show more if not.
def process(stream, field_longs_lats, team_settings) do
stream
|> Flow.from_enumerable(stages: 1)
|> Flow.filter(fn [time | _tl] -> time != "-" end)
|> Flow.uniq_by(fn [time | _tl] -> time end)
|> Flow.reduce(&V4.bare_structure_reordered/0, &put_in_keyword_list/2)
|> Flow.map(&Transform.apply_tranformations/1)
|> Stream.into([{:field_longs_lats, field_longs_lats}])
|> Flow.from_enumerable(stages: 1)
|> Flow.reduce(
fn -> [] end,
&TemporaryFields.create_temporary_fields/2
)
|> Stream.into([])
|> Flow.from_enumerable(stages: 1)
|> Flow.reduce(fn -> [] end, &Filters.filter_outliers/2)
|> Stream.into([{:team_settings, team_settings}])
|> Flow.from_enumerable(stages: 1)
|> Flow.reduce(
fn -> [] end,
&FinalFields.create_final_fields/2
)
|> Stream.into([])
end
PS: As you can see, I cannot use more than 1 stage, because inside those reducers there are aggregators that depend on the whole row to give correct output (for example, the millisecond count is a cumulative sum of all differences in time between each row, or the diff function that must consider always the last element to provide proper diff), so I can’t separate in stages otherwise those aggregations will be wrong because will be aggregated only per stage.