ParallelTask: easily run functions in parallel and capture the results

Hi!

ParallelTask on Github

I’m quite new to the Elixir community and so far hooked on the language and ecosystem. Today I read a nice article by Ryan Sydnor on how they speed up their application by parallelising database queries. He used tasks to concurrently run the expensive queries which improved performance drastically. The full article can be read here.

Elixir really shines in cases like these and I thought it would be great to have a library for simple parallelization. That’s why I created ParallelTask, a lightweight wrapper around Task making it easy to parallelize API requests, database queries, and the like.

This is my first Elixir project and any feedback is much welcomed. Thanks!

5 Likes

I like it, it reads nicely!

1 Like

I like the idea, how you can build up the tasks to execute in parallel and then execute them together. Although since the idea is conceptually very similar to Ecto.Multi I think it would make sense to mirror that API.

So instead of:

|> ParallelTask.add(first_task: fn -> "Result from first task" end)

We could write:

|> ParallelTask.add(:first_task, fn -> "Result from first task" end)

That would also allow us to more flexibly name our tasks (Ecto.Multi actually now accepts strings)

2 Likes

Nice package! :thumbsup:

It’s surprising that the Task module doesn’t make it easier to work with keyed tasks like this.
The closest I could get without writing a helper function is:

tasks = %{a: async(fn -> "foo" end), b: async(fn -> "bar" end)}
results = for {k, v} <- tasks, into: %{}, do: {k, Task.await(v)}

But that has slightly different semantics to your package (starts tasks eagerly and awaits each separately).

The implementation of add can benefit from some pattern matching and map update syntax:

def add(%__MODULE__{task_functions: task_functions} = object, new_functions \\ []) do
  %{object | task_functions: Enum.into(task_functions, new_functions)}
end

I try not to use Enum.at wherever possible, such as in the definition of perform, I’d try to use unzip and zip to match the keys and results together:

def perform(%__MODULE__{task_functions: task_functions}, timeout \\ 5000) do
    {keys, tasks} =
      task_functions
      |> Enum.map(fn {k, f} -> {k, Task.async(f)} end)
      |> Enum.unzip()

    task_results =
      tasks
      |> Task.yield_many(timeout)
      |> Enum.map(&get_task_result/1)

    keys
    |> Enum.zip(task_results)
    |> Map.new()
  end
1 Like

Thats a great idea!

I just added an alternative syntax for ParallelTask.add like Ecto.Multi and it should support string keys.

Thanks!

1 Like

Wow, can’t thank you enough for these tips.

The map update syntax is brilliant and I had no clue zip/unzip even existed. Your code really shows the beauty of Elixir.

I’ve incorporated it into my code, thanks!