I think this is a perfect use case for Task
.
Since your problem is basically processing a collection of items concurrently, you should look into using Task.async_stream
. Read carefully about the max_concurrency
option, usually it is more beneficial to have N concurrent processes for data processing where N is the number of hardware threads in your CPU. However, to achieve maximum throughput, you’d need to test and measure. If most of the time is spent waiting for data to be fetched from a URL, having more processes may speed things up.
So, assuming process X
has to spawn all the tasks and wait for them all to finish, it may look as follows:
results =
urls
|> Task.async_stream(MyWorkerModule, :process_url, [])
|> Enum.to_list()
If, on the other hand, your process X
has to be able to handle messages from other processes while all Yn
tasks are working, then X
should be a gen server with at least the following basic functionality:
defmodule MyTaskManager do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end
def process_urls(server, urls) do
GenServer.cast(server, {:process_urls, urls})
end
def example_function(server, args) do
GenServer.call(server, {:do_this_please, args})
end
###
def init(_) do
# We'll keep track of all live tasks using the `tasks` map.
{:ok, %{tasks: %{}}}
end
def handle_cast({:process_urls, urls}, state) do
new_tasks = spawn_tasks(urls)
{:noreply, %{state | tasks: Map.merge(state.tasks, new_tasks)}}
end
def handle_call({:do_this_please, args}, _from, state) do
# Just a normal handle_call.
# do something with args...
{:reply, :ok, state}
end
# When a task is spawned via `Task.async` or `Task.Supervisor.async_nolink`, it will eventually return its result as
# a message in the form `{task_ref, result}` to the calling process if it finishes normally. If it crashes, though,
# it won't send a result back (but the calling process will still get a :DOWN message, see below).
def handle_info({task_ref, result}, %{tasks: tasks} = state) do
{url, updated_tasks} = Map.pop(state.tasks, task_ref)
# do something with the url and the result returned for it...
{:noreply, %{state | tasks: updated_tasks}}
end
# Tasks are also monitored by the calling process, so for each task process we'll get a :DOWN message when it
# terminates, regardless of the termination reason.
def handle_info({:DOWN, task_ref, :process, _task_pid, exit_reason}, state) do
updated_tasks = Map.delete(state.tasks, task_ref)
{:noreply, %{state | tasks: updated_tasks}}
end
# Here we create a new task for each URL. In principle, it's possible to limit the number
# of concurrent tasks just like `Task.async_stream` does, but here I'm simplifying a bit.
defp spawn_tasks(urls) do
Map.new(urls, fn url ->
%Task{ref: task_ref} = Task.async(MyWorkerModule, :process_url, [url])
{task_ref, url}
end)
end
end
I’d also recommend reading up on Task.Supervisor
. It’s useful for keeping all tasks processes rooted under a dedicated supervisor, especially when there’s a possibility that a task may crash. A task spawned using Task.async
will take its calling process down if it crashes, whereas when using Task.Supervisor.async_nolink
, the calling process will not be linked to the task and will be able to receive the :DOWN
message like i showed in my example above.