I am trying to follow this blog Beyond Task.Async. So far I have got the following code working:
defmodule Aggregator do
@moduledoc """
Testing async tasks
"""
def new, do: 0
def aggregate(result), do: result
def aggregate(aggregator, result) do
Process.sleep(500)
aggregator + result
end
end
defmodule Calc do
@moduledoc """
Testing async tasks
"""
@spec run(Integer.t) :: Integer.t
def run(id) when id > 0 do
IO.puts "\n\n sleeping: #{id}"
Process.sleep(id)
IO.puts "\n\n waking from slumber: #{id}"
id
end
end
defmodule Async do
@moduledoc """
Testing async tasks
"""
def run(diff \\ 0, timeout \\ 900) do
1..10
|> Enum.map(fn _ -> Enum.random(1..1000) - diff end)
|> Enum.map(&Task.async(fn ->
try do
{:ok, Calc.run(&1)}
rescue _ ->
:error
end
end))
|> collect_result timeout
end
defp collect_result(tasks, timeout) do
ref = make_ref()
timer = Process.send_after(self(), {:timeout, ref}, timeout)
try do
collect_result(tasks, Aggregator.new, ref)
after
:erlang.cancel_timer(timer)
receive do
{:timeout, ^ref} ->
:ok
after 0 ->
:ok
end
end
end
defp collect_result([], aggregator, _), do: {:ok, Aggregator.aggregate(aggregator)}
defp collect_result(tasks, aggregator, ref) do
receive do
{:timeout, ^ref} ->
{:timeout, Aggregator.aggregate(aggregator)}
msg ->
case Task.find(tasks, msg) do
{{:ok, result}, task} ->
collect_result(
List.delete(tasks, task),
Aggregator.aggregate(aggregator, result),
ref
)
# if task errors, stop monitoring task and ignore its result
{:error, task} ->
collect_result(List.delete(tasks, task), aggregator, ref)
nil ->
collect_result(tasks, aggregator, ref)
end
end
end
end
So when I run: iex(295)> Async.run 700
, I get {:ok, 489}
However, I read that Task.find
was deprecated in favour of explicit message matching. So I refactored collect_result/3
to:
defp collect_result(tasks, aggregator, ref) do
receive do
{:timeout, ^ref} ->
{:timeout, Aggregator.aggregate(aggregator)}
{task, {:ok, result}} ->
collect_result(
List.delete(tasks, task),
Aggregator.aggregate(aggregator, result),
ref
)
# if task errors, stop monitoring task and ignore its result
{task, :error} ->
collect_result(List.delete(tasks, task), aggregator, ref)
end
end
Now Async.run
times out no matter the value of diff
. For examples, iex(306)> Async.run 700
returned {:timeout, 260}
, which is ridiculous since Calc.run(260)
was the only successful task.
I can’t seem to figure out how to refactor the code using explicit message matching and still get same behaviour as first iteration of the code. Any help from the community will be greatly appreciated.