I’m reading in data in batches from Ecto by using Stream.resource/3
. I then map over each row and call out to a function. I can do some async, but it has to be limited as it does DB writes, etc. Let’s say 3 concurrently for the example.
Two options that came to mind were Task.async_stream/3
with max_concurrency: 3
option (with timeout, and on_timeout adjustments), or perhaps using Flow.from_enumerable/2
and limiting the stages: 3
? I don’t need an aggregate list of results at the end. Most errors are logged within the function (ie: do_the_thing/1
), but it would be nice to catch and log a timeout from async_stream if using that.
I know I could probably write a supervised genserver myself, and have a lot more control, but was hoping to use a simple pipe approach.
sooo, psuedo code would looks something like:
# Flow option:
stream_resource()
|> Flow.from_enumerable(max_demand: 1, stages: 3)
|> Flow.map(&do_the_thing/1)
|> Flow.run()
# Task option:
stream_resource()
|> Task.async_stream(&do_the_thing/1, max_concurrency: 3, timeout: :timer.minutes(1), ordered: false, on_timeout: :kill_task)
|> Stream.run()
Couple of questions:
- Is there a benefit of using one vs the other (Task vs Flow) when I need limited concurrency and don’t need an aggregated list at the end?
- Is there a simple way of catching the timeout so I can log the args if using
Task.async_stream/3
rather than usingon_timeout: :kill_task
and just getting{:exit, :timeout}
? (I know i can usually catch those in Genserver handle_call/3, but not sure how to do it here :shrug:) - Is there a better, alternative approach that I’m totally missing here?
Thanks for any suggestions, tips or tricks.