How to leverage concurrency and speed up the aggregate operation

Hi guys, I’m having this problem.
Let say I have a list of 10_000 items, each item is a map with a RRule column

  [
     %{rrules: 
        [
           %{value: 1, rule: rrule}, 
           %{value: 2, rule: rrule2}
        ]
     },
     ........
  ]

I’m spawning 10_000 tasks, each will handle one item

Enum.map(list, fn item -> Task.async(fn -> handle_item(item.rrules) end) end)
|> Enum.reduce(%{}, fn task, acc -> 
  value = Task.await(task)
  Map.put(acc, value, 1)
end)

handle_item/1 will look like this

Enum.map(rrules, fn element -> 
  element.rule
  |> CockTail.occurrences() # This returns a stream of time-series data ~ 8000 records
  |> Enum.reduce(%{}, fn time, acc -> Map.put(acc, time, element.value) end)
end)

obviously, this implementation is very inefficient. But I’m not sure how I can leverage the concurrent in this case, to speed up the aggregate operation.
I would be very appreciated if somebody could point me to the right direction, thanks in advance

I’d consider using Task.async_stream/2 or Flow for this first part. I believe it will do a better job on managing throughput.

I assume this isn’t your real code because here you are using the map returned from handle_item/1 as a map key.

You could consider using an :ets table for the accumulator since that allows concurrent write access. However concurrency here isn’t a magic bullet since you are already running this code pithing a concurrent environment (the original 10_000 items).

This looks like you are exploding an rrule into individual event times (is that what CockTail.occurrences/1 does?). I think the key question here is whether you need to materialise all the times up front or whether there is an opportunity to lazily evaluate the individual times when you need them. The calendar_recurrence might help with this.

4 Likes

+1 for leveraging Flow. It’s literally designed to solve tasks like that one.

1 Like

thanks so much @kip I’m using a similar library to work with the recurrence rules https://github.com/peek-travel/cocktail and it does lazily generate time-series data

I changed my code as you suggest

    items()
    |> Task.async_stream(fn item -> handle_item(item) end)
    |> Enum.reduce(%{}, fn element, acc -> Map.put(acc, element, 1) end)

but still, it took too long to run. I only tested with 100 items and it gives timeout. I’m not sure if this problem can be addressed, or it’s just the bottleneck that we can’t solve

I think switching the Flow would be your best bet since it is designed for exactly this kind of use case. Something like (not tested):

items()
|> Flow.from_enumerable(max_demand: :erlang.system_info(:schedulers_online))
|> Flow.map(&handle_item/1))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn time, acc -> Map.put(acc, time, element.value) end)
|> Enum.to_list()
3 Likes

Always benchmark: classic solution vs async_stream vs Flow. I would be very curious of the differences if you could share them at the end.

Check the options for async_stream: https://hexdocs.pm/elixir/Task.html#async_stream/5-options. Putting ordered to false should speed up things a bit.

Also, consider using ExCal. It’s a library from the same cool guys that released Cocktail. Should run much faster as it’s uses the C libical library behind.

Have you considered looking at Parallel Streams https://github.com/beatrichartz/parallel_stream ?

10k items? This should barely blip for 0.5 secs IMO. If you can put some example GitHub project that can produce the items so we can check a few algorithms, we should be able to help further.

2 Likes

I suspect the time is taken materialising the time events (the code says about 8,000 of them for each of the 10,000 items). Avoiding materialising these time events would, I think, be the better path.

2 Likes

exactly, I’m trying to see if I can avoid generating those time series data on the fly. Thank you so much @kip

thank you @sfusato, it is indeed much faster