Nesting Task.Supervisor.async_stream bad idea?

So im looking to do something like below where I get a list of posts from the db and then run an api call on each post's comments and i want it to all happen as concurrently as possible. Is nesting task bad practice?

defmodule MyApp.ObanWorker do
  def perform do
    posts = MyApp.Repo.all(Posts)

    MyTaskSupervisor
    |> Task.Supervisor.async_stream(posts, &MyApp.ApiClient.do_it/1)
    |> List.flatten()
    |> Enum.each(fn {_, message} -> Logger.info(message) end)

    :ok
  end
end

defmodule MyApp.ApiClient do
  def do_it(%{comments: comments} = post) do
    MyTaskSupervisor
    |> Task.Supervisor.async_stream(comments, &make_call/1)
    |> Enum.filter(&(elem(&1, 0) == :error))
    
  end

  def make_call(comment) do
    result = # ...make api request
    
    case result do
      {:ok, %{status: 200}} -> {:ok, "good job."}
      _ -> {:error, "not good for #{comment}."}
    end
  end
end

One thing to watch out for with tasks is copying large data structures - in this case, comments presumably has a lot of entries (thus the need for concurrency) so copying it is expensive.

1 Like

I wouldn’t nest them. That would increase the complexity of tuning via max_concurrency. I would start with a list of comments that belong to the desired posts in the original query.

2 Likes

Don’t nest them because you’re likely dealing with increased copying of values and with parallelism guarantees that will no longer hold true (i.e. originally you made the code to do no more than 20 tasks in parallel but with nesting that can balloon further).

Alternative thing you can do is to separate all the tasks and queue them one by one through, say, commanding a GenServer to pull them one by one from a queue, and then each task can use Task.async_stream (with only one level and no nesting).

2 Likes