Restricted concurrent processing options from Stream.resource/3?

I’m reading in data in batches from Ecto by using Stream.resource/3. I then map over each row and call out to a function. I can do some async, but it has to be limited as it does DB writes, etc. Let’s say 3 concurrently for the example.

Two options that came to mind were Task.async_stream/3 with max_concurrency: 3 option (with timeout, and on_timeout adjustments), or perhaps using Flow.from_enumerable/2 and limiting the stages: 3 ? I don’t need an aggregate list of results at the end. Most errors are logged within the function (ie: do_the_thing/1), but it would be nice to catch and log a timeout from async_stream if using that.

I know I could probably write a supervised genserver myself, and have a lot more control, but was hoping to use a simple pipe approach.

sooo, psuedo code would looks something like:

# Flow option:
stream_resource()
|> Flow.from_enumerable(max_demand: 1, stages: 3)
|> Flow.map(&do_the_thing/1)
|> Flow.run()

# Task option:
stream_resource()
|> Task.async_stream(&do_the_thing/1, max_concurrency: 3, timeout: :timer.minutes(1), ordered: false, on_timeout: :kill_task)
|> Stream.run()

Couple of questions:

  1. Is there a benefit of using one vs the other (Task vs Flow) when I need limited concurrency and don’t need an aggregated list at the end?
  2. Is there a simple way of catching the timeout so I can log the args if using Task.async_stream/3 rather than using on_timeout: :kill_task and just getting {:exit, :timeout} ? (I know i can usually catch those in Genserver handle_call/3, but not sure how to do it here :shrug:)
  3. Is there a better, alternative approach that I’m totally missing here?

Thanks for any suggestions, tips or tricks.

2 Likes

As for 2, you can add a handler for the exit message in the process that spawns the task. Tasks are automatically linked to the calling process.

1 Like