Async Task not returning results

I have around 10-15 ecto queries which I want to run async in my API code.
I am using Task.async and Task.yeild_many

Following is the code for async task -

def get_tasks() do
  task_1 =
      Task.async(SomeModule, :some_function, [
          param_1,
          param_2
      ])

  task_2 =
      Task.async(SomeModule, :some_function, [
          param_1,
          param_2
      ])

  task_3 =
      Task.async(SomeModule, :some_function, [
          param_1,
          param_2
      ])

  [task_1, task_2, task_3]
end

I get the tasks in my main function as -

        [
          {_, task_1},
          {_, task_2},
          {_, task_3}
        ] =
          [
            task_1,
            task_2,
            task_3,
          ]
          |> MyCode.TaskHelper.yeild_multiple_tasks()

And my task helper code is given below -

defmodule MyCode.TaskHelper do

  def get_results_or_shutdown(tasks_with_results) do
    Enum.map(tasks_with_results, fn {task, res} ->
      res || Task.shutdown(task, :brutal_kill)
    end)
  end

  @doc """
  Returns the result of multiple tasks ran parallely

  ## Parameters
    - task_list: list, a list of all tasks
  """
  def yeild_multiple_tasks(task_list) do
    task_list
    |> Task.yield_many()
    |> get_results_or_shutdown()
  end
end

Each tasks are ecto queries.
The issue is the tasks are behaving randomly. Sometimes they return results, sometimes they don’t. But there was no time when all the tasks have return results (for the representational purpose I have written 3 tasks,
but I have around 10-15 async tasks).
I ran the code synchronously, and it returned the correct results (obviously). I tried changing the pool_size in config to 50 for Repo, but to no avail.

Can someone please help me out with this? I am quite stuck here.

Are all tasks calling the same function?

A batch of them does, other batch of them calls the other function.

The problem might be out of order results but let’s not check for that just yet.

Have you tried using Task.async_stream for each batch of tasks that call the same function?

If you only ever call 2 different functions then that would be 2 such calls.

How long does that take?

Task.yield_many/2 has a default timeout of 5 secs - maybe that isn’t enough time for Ecto to finish all the queries.

The issue is the tasks are behaving randomly.

I wouldn’t necessarily expect Ecto + RDBMS to behave in a deterministic fashion (order of work finished). So the apparent randomness wouldn’t be too surprising if the timeout is too short.

The other advantage of Task.async_stream/5 is that the level of concurrency can controlled with :max_concurrency (it defaults to the number of online cores).

There is also Task.Supervisor.async_stream_nolink/6.

code
explanation

1 Like