Enum.map(rrules, fn element ->
element.rule
|> CockTail.occurrences() # This returns a stream of time-series data ~ 8000 records
|> Enum.reduce(%{}, fn time, acc -> Map.put(acc, time, element.value) end)
end)
obviously, this implementation is very inefficient. But I’m not sure how I can leverage the concurrent in this case, to speed up the aggregate operation.
I would be very appreciated if somebody could point me to the right direction, thanks in advance
I’d consider using Task.async_stream/2 or Flow for this first part. I believe it will do a better job on managing throughput.
I assume this isn’t your real code because here you are using the map returned from handle_item/1 as a map key.
You could consider using an :ets table for the accumulator since that allows concurrent write access. However concurrency here isn’t a magic bullet since you are already running this code pithing a concurrent environment (the original 10_000 items).
This looks like you are exploding an rrule into individual event times (is that what CockTail.occurrences/1 does?). I think the key question here is whether you need to materialise all the times up front or whether there is an opportunity to lazily evaluate the individual times when you need them. The calendar_recurrence might help with this.
but still, it took too long to run. I only tested with 100 items and it gives timeout. I’m not sure if this problem can be addressed, or it’s just the bottleneck that we can’t solve
Also, consider using ExCal. It’s a library from the same cool guys that released Cocktail. Should run much faster as it’s uses the C libical library behind.
10k items? This should barely blip for 0.5 secs IMO. If you can put some example GitHub project that can produce the items so we can check a few algorithms, we should be able to help further.
I suspect the time is taken materialising the time events (the code says about 8,000 of them for each of the 10,000 items). Avoiding materialising these time events would, I think, be the better path.