Should I call Task.await if Task.yield returns {:ok, value}?

All is in the title, but to clarify:

It seems that if Task.yield returns {:ok, value} it means that the task result message was received, so there is no risk of receiving an unhandled message later, thus no need to call Task.await (which would give the same value anyway).

But I want to be sure.

Thank you

Please clarify your use case? What are you doing exactly and how is Task.yield better in your scenario, compared to using Task.async + Task.await?

I need to send the task result in a 2500ms timeframe. If the result is not ready yet, I have to send a dummy message, and I have another way to send the result back.

So basically

params = some_data()
ref = make_ref()
parent = self()

spawn(fn ->
  t = Task.async(fn -> mod.run(params) end)

  case Task.yield(t, @max_wait_ms) do
    {:ok, result} ->
      send(parent, {:result, ref, result})

    nil ->
      send(parent, {:result, ref, :still_running})
      result = Task.await(t)
      send_delayed_response(params, result)

    {:exit, reason} ->
      exit(reason)
  end
end)

receive do
  {:result, ^ref, result} -> {:ok, result}
end

So I did a simple test in the shell and I have my answer anyway : after calling Task.yield, no more messages from the task are coming (the monitor is also cleaned).

iex(1)> t = Task.async(fn -> :test end)
%Task{
  owner: #PID<0.104.0>,
  pid: #PID<0.106.0>,
  ref: #Reference<0.1716671456.3384016898.195927>
}
iex(2)> Task.yield t
{:ok, :test}
iex(3)> flush
:ok

I’ve started build a library around oban a few month ago, which does what you’re doing here, but with the persistent queue of oban. I just didn’t finish it properly at the time.

2 Likes

… provided the task has already terminated

iex(1)> f =
...(1)>   fn ->
...(1)>     Process.sleep(500)
...(1)>     :test
...(1)>   end
#Function<21.126501267/0 in :erl_eval.expr/5>
iex(2)> t = Task.async(f)
%Task{
  owner: #PID<0.104.0>,
  pid: #PID<0.111.0>,
  ref: #Reference<0.3116447749.1782841345.230907>
}
iex(3)> Task.yield(t,300)
nil
iex(4)> flush
:ok
iex(5)> Process.sleep(300)
:ok
iex(6)> flush
{#Reference<0.3116447749.1782841345.230907>, :test}
{:DOWN, #Reference<0.3116447749.1782841345.230907>, :process, #PID<0.111.0>,
 :normal}
:ok
iex(7)>
iex(1)> f =
...(1)>   fn ->
...(1)>     Process.sleep(500)
...(1)>     :test
...(1)>   end
#Function<21.126501267/0 in :erl_eval.expr/5>
iex(2)> t = Task.async(f)
%Task{
  owner: #PID<0.104.0>,
  pid: #PID<0.111.0>,
  ref: #Reference<0.4284780453.977797126.180625>
}
iex(3)> case Task.yield(t, 300) || Task.shutdown(t) do
...(3)>   {:ok, result} = result ->
...(3)>     result
...(3)> 
...(3)>   nil ->
...(3)>     "Timed out"
...(3)> end
"Timed out"
iex(4)> flush
:ok
iex(5)> Process.sleep(300)
:ok
iex(6)> flush
:ok
iex(7)> 

And the documentation points this out.

I looked into it but it lack docs indeed.

Here I have 3 processes :

  • The main code block I posted (it is a Phoenix controller). It has to return in the 2500ms timeframe.
  • The spawn process (will be a supervised task that you do not have to await). This one yields from the task and return the data, but if the data is not ready, it will await it forever (I forgot to set infinity timeout on await). That is why I don not use spawn_link.
  • The task itself, a simple worker

The trick is that if the yield is succesful, we do not want to call send_delayed_response as we will just return the data.

So your on_task_gone could be a good fit but I would have to use only Task.await then ? and catch the timeout exit on my own ? I did not plan to use oban or other libraries because my problem is solved with this single block. But if you want to separate that from Omen and create a tiny lib that just redirects the task result to another destination after a special yield I’d use it.

Absolutely, that is why I said if Task.yield returns {:ok, value}?. Of course if yield returns nil you have to await or yield more.

But the documentation does not tell that if yield is successful you can forget about the task entirely, and on the other hand stresses very much about the need to await at all costs.

The point is that in general a timed out yield should be accompanied by a Task.shutdown/2.

It seems a peculiar decision to use the timeout and then not terminate the task - by calling await you are giving the task an additional 5000 ms. If the task still doesn’t respond the process will exit.

Yeah, I created it specifically to add the functionality you have with Task to jobs in Oban, because I don’t like the task being silently dropped if e.g. the machine goes down after your 2500ms timeframe, but before the Task itself actually finished.

I don’t know if it is peculiar, it is just what those who call my code expect : a response within 2500ms or a response elsewhere, later. In either case the task has to be completed. shutdown does not fit in this scheme.

My code above lacks the timeout for await, which will be :infinity actually.

Thank you

Your approach still lacks resilience as the parent may get the still_running message but there never is a follow up because the Task goes zombie or crashes after being late.

I’d look into using Task.Supervisor instead.

Example:

# file: my_app/lib/my_app/application.ex
#
# created with "mix new my_app --sup"
# 
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    name = MyApp.TaskSupervisor

    children = [
      {Task.Supervisor, name: name}
    ]

    opts = [strategy: :rest_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
# file: my_app/lib/demo.ex
#
defmodule Demo do
  @name MyApp.TaskSupervisor
  @max_wait_ms 500
  @timely_timeout div(@max_wait_ms, 2)
  @late_timeout @max_wait_ms + @timely_timeout
  @too_late_timeout 3 * @max_wait_ms

  def launch(arg) do
    task = Task.Supervisor.async_nolink(@name, __MODULE__, :some_work, [arg])

    task
    |> Task.yield(@max_wait_ms)
    |> handle_task_yield(task)
  end

  defp handle_task_yield({:ok, value}, _) do
    IO.puts("result: #{inspect(value)}")
    {:ok, value}
  end

  defp handle_task_yield(nil, task) do
    IO.puts("Timed out")
    await_late_result(task)
  end

  defp handle_task_yield({:exit, reason}, _) do
    IO.puts("Task exit: #{inspect(reason)}")
    {:error, reason}
  end

  defp await_late_result(%Task{ref: mon, pid: pid}) do
    # in GenServer these messages would go through 
    # handle_info
    receive do
      {^mon, value} ->
        Process.demonitor(mon, [:flush])
        IO.puts("Better late than never: #{inspect(value)}")
        {:ok, value}

      {:DOWN, ^mon, _, ^pid, reason} when reason != :normal ->
        Process.demonitor(mon, [:flush])
        IO.puts("Task LATE exit: #{inspect(reason)}")
        {:error, reason}
    after
      @max_wait_ms ->
        IO.puts("I'm not waiting forever!")
        Process.demonitor(mon, [:flush])
        Process.exit(pid, :kill)
        {:error, :far_too_late}
    end
  end

  def some_work(:timely = type) do
    Process.sleep(@timely_timeout)
    type
  end

  def some_work(:late = type) do
    Process.sleep(@late_timeout)
    type
  end

  def some_work(:too_late = type) do
    Process.sleep(@too_late_timeout)
    type
  end

  def some_work(:crash = type) do
    exit(type)
  end

  def some_work(:late_crash = type) do
    Process.sleep(@late_timeout)
    exit(type)
  end

  def some_work(:too_late_crash = type) do
    Process.sleep(@too_late_timeout)
    exit(type)
  end
end
$ iex -S mix
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

Compiling 1 file (.ex)
Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Demo.launch(:timely)
result: :timely
{:ok, :timely}
iex(2)> Demo.launch(:late)
Timed out
Better late than never: :late
{:ok, :late}
iex(3)> Demo.launch(:too_late)
Timed out
I'm not waiting forever!
{:error, :far_too_late}
iex(4)> Demo.launch(:crash)

11:17:17.617 [error] Task #PID<0.153.0> started from #PID<0.145.0> terminating
** (stop) :crash
    (my_app) lib/demo.ex:71: Demo.some_work/1
    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
    Args: [:crash]
Task exit: :crash
{:error, :crash}
iex(5)> Demo.launch(:late_crash)
Timed out
Task LATE exit: :late_crash

11:17:18.370 [error] Task #PID<0.155.0> started from #PID<0.145.0> terminating
** (stop) :late_crash
    (my_app) lib/demo.ex:76: Demo.some_work/1
    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
    Args: [:late_crash]
{:error, :late_crash}
iex(6)> Demo.launch(:too_late_crash)
Timed out
I'm not waiting forever!
{:error, :far_too_late}
iex(7)> flush
:ok
iex(8)> 
1 Like

You are right that await infinity is not enough but I think I could just call yield again (after sending back “processing”) and this second time use shutdown (actually the form yield(…) || shutdown(…)).

That would allow to keep the %Task{} data structure opaque.

Of course I will not simply use spawn as I said before :wink:

I think you demo lacks a layer of spawning though. launch must return in the 2500ms time frame, it can not return after “better late than ever”. At this point the application should use the other mean of sending results, and launch must have returned with a dummy value.

Thait is why I send messages manually.

Thank you very much for the code :slight_smile:

handle_task_yield will run either after the result is delivered or after the first @max_wait_ms timeout expires - so it is there where you would be sending the “it’s going to be late” message.

Also consider that depending on what a GenServer is doing using Task.yield or Task.await directly may not be advisable. So you either have to spawn a separate vanilla process (with it’s own mailbox) or simply handle the Task messages via handle_info - in which case %Task{} can’t be treated as an opaque type.

Sorry but I do not get it.

Imagine a task that will never finish for the example.

handle_task_yield is called after Task.yield, which already waited @max_wait_ms

If the arg to handle_task_yield is nil, it will call await_late_result and again wait @max_wait_ms.

So launch has not returned after the first timeout. The result is not delivered.

Thanks for your time :slight_smile:

Edit: or your IO.puts("Time out") is actually my send(parent, {:result, :still_running}) in which case we agree and that is what I will do, just using yield twice.

using Task.yield or Task.await directly may not be advisable.

This is why I used send : the parent expects one and only one message from the middle task layer (my spawn'ed process).

I will do a proper implementation soon and ask for your review if you do not mind.

1 Like

From the GenServer’s perspective the intermediate timeout doesn’t really serve any purpose - unless it needs to let someone else know - if it handles the tasks directly.

# file: my_app/lib/my_app/application.ex
#
# created with "mix new my_app --sup"
# 
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    name = MyApp.TaskSupervisor

    children = [
      {Task.Supervisor, name: name},
      {Demo, name: name}
    ]

    opts = [strategy: :rest_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
# file: my_app/lib/demo.ex
#
defmodule Demo do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  def init(args) do
    with {:ok, name} <- Keyword.fetch(args, :name) do
      {:ok, {name, nil}}
    else
      _ ->
        {:stop, :badarg}
    end
  end

  def handle_call({:launch, arg}, _from, {name, nil}) do
    handles = launch_task(name, arg)
    {:reply, :ok, {name, handles}}
  end

  def handle_info({:kill_task, mon}, {name, handles} = state) do
    # timeout expired
    case handles do
      {%Task{ref: ^mon} = task, _} ->
        Process.demonitor(mon, [:flush])
        kill_task(task)
        IO.puts("Timed out: #{inspect(task)}")
        {:noreply, {name, nil}}

      _ ->
        IO.puts("REDUNDANT timeout")
        {:noreply, {name, state}}
    end
  end

  def handle_info({:DOWN, mon, _, down_pid, reason}, {name, handles} = state) do
    # :DOWN message arrives when task exits
    case handles do
      {%Task{ref: ^mon} = task, timeout_ref} ->
        Process.cancel_timer(timeout_ref)
        Process.demonitor(mon, [:flush])
        IO.puts("DOWN: #{inspect(down_pid)} #{inspect(reason)}")
        new_handles = if(reason == :normal, do: {task, nil}, else: nil)
        {:noreply, {name, new_handles}}

      _ ->
        IO.puts("REDUNDANT DOWN: #{inspect(down_pid)} #{inspect(reason)}")
        {:noreply, {name, state}}
    end
  end

  def handle_info({mon, value}, {name, handles} = state) when is_reference(mon) do
    # normal task result arrives here - demonitor/flush :normal :DOWN
    case handles do
      {%Task{ref: ^mon}, timeout_ref} ->
        if timeout_ref do
          Process.cancel_timer(timeout_ref)
          Process.demonitor(mon, [:flush])
        end

        IO.puts("result: #{inspect(value)}")
        {:noreply, {name, nil}}

      _ ->
        IO.puts("REDUNDANT result: #{inspect(value)} with #{inspect(mon)}")
        {:noreply, state}
    end
  end

  def handle_info(msg, state) do
    IO.inspect(msg)
    {:noreply, state}
  end

  # ---

  @max_wait_ms 500
  @timely_timeout div(@max_wait_ms, 2)
  @too_late_timeout 3 * @max_wait_ms

  defp launch_task(name, arg) do
    %Task{ref: mon} = task = Task.Supervisor.async_nolink(name, __MODULE__, :some_work, [arg])
    ref = Process.send_after(self(), {:kill_task, mon}, @max_wait_ms)
    {task, ref}
  end

  defp kill_task(%Task{pid: pid}),
    do: Process.exit(pid, :kill)

  def some_work(:timely = type) do
    Process.sleep(@timely_timeout)
    type
  end

  def some_work(:too_late = type) do
    Process.sleep(@too_late_timeout)
    type
  end

  def some_work(:crash = type) do
    Process.sleep(@timely_timeout)
    exit(type)
  end

  def some_work(:too_late_crash = type) do
    Process.sleep(@too_late_timeout)
    exit(type)
  end

  # --- Demo API ---

  def launch(arg),
    do: GenServer.call(__MODULE__, {:launch, arg})
end
$ iex -S mix
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

Compiling 1 file (.ex)
Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Demo.launch(:too_late)
:ok
iex(2)> Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.148.0>, ref: #Reference<0.177062570.1582825475.5428>}

nil
iex(3)> Demo.launch(:crash)   
:ok
iex(4)> 
14:13:26.156 [error] Task #PID<0.151.0> started from Demo terminating
** (stop) :crash
    (my_app) lib/demo.ex:105: Demo.some_work/1
    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
    Args: [:crash]
DOWN: #PID<0.151.0> :crash

nil
iex(5)> Demo.launch(:too_late_crash)
:ok
iex(6)> Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.154.0>, ref: #Reference<0.177062570.1582825480.4596>}

nil
iex(7)> 
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution
a
Peers-MBP:my_app wheatley$ mix format
Peers-MBP:my_app wheatley$ iex -S mix
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

Compiling 1 file (.ex)
Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Demo.launch(:timely)
:ok
iex(2)> result: :timely

nil
iex(3)> Demo.launch(:too_late) 
:ok
iex(4)> Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.151.0>, ref: #Reference<0.981638884.3997958147.252725>}

nil
iex(5)> Demo.launch(:crash)
:ok
iex(6)> 
14:15:12.347 [error] Task #PID<0.154.0> started from Demo terminating
** (stop) :crash
    (my_app) lib/demo.ex:105: Demo.some_work/1
    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
    Args: [:crash]
DOWN: #PID<0.154.0> :crash

nil
iex(7)> Demo.launch(:too_late_crash)
:ok
iex(8)> Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.157.0>, ref: #Reference<0.981638884.3997958147.252801>}

nil
iex(9)> 
1 Like

Here Demo.launch does not return :timely but :ok. Either I cannot grasp what you are trying to show or you did not understand the problem correctly.

The launch function must return the task result if it completes in the timeframe. I need that:

@spec launch(function, function) :: {:ok, result} | :still_running
  def launch(code_to_run, post_late_result)
      when is_function(code_to_run, 0) and is_function(post_late_result, 1) do
    # You need to run code_to_run and return its result within 2500ms

    # If the code is not finished after 2500ms, you must return
    # :still_running but the code must continue to run, and you must
    # call post_late_result/1 with the result.

    # Bonus points : If code_to_run crashes, it must be restarted and ran again.
  end

A function can only return once (and on top of anything it is blocking). And that is doable.

But then there is this:

but the code must continue to run, and you must call post_late_result/1 with the result.

That is not the behaviour of a function - we are now talking about a message protocol between processes. You are essentially mixing two different types of abstractions that shouldn’t be mixed …

That is not the behaviour of a function

It is the bahaviour of any function that start a task though.

Anyway I have the answer to my original question and I know what problems I must think of with my implementation. I think we are done. Thank you :slight_smile:

How I wouldn’t do it.

The problem is that the same result is delivered via two separate channels

  • as a function return value
  • as a message if it resolves late.

From a maintenance perspective that’s a mess - a single channel of delivery should be enough.

# file: my_app/lib/demo.ex
#
defmodule Demo do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  def init(args) do
    with {:ok, name} <- Keyword.fetch(args, :name) do
      {:ok, {name, nil, nil, false}}
    else
      _ ->
        {:stop, :badarg}
    end
  end

  def handle_call({:launch, arg}, from, {name, nil, nil, _}) do
    handles = launch_task(name, arg)
    {:noreply, {name, handles, from, false}}
  end

  @max_wait_ms 500

  def handle_info({:DOWN, mon, _, down_pid, reason}, {name, handles, reply_to, pending} = state) do
    # :DOWN message arrives when task exits
    case handles do
      {%Task{ref: ^mon} = task, timeout_ref} ->
        Process.cancel_timer(timeout_ref)
        Process.demonitor(mon, [:flush])
        IO.puts("DOWN: #{inspect(down_pid)} #{inspect(reason)}")

        if(reason == :normal) do
          {:noreply, {name, {task, nil}, reply_to, pending}}
        else
          reply(reply_to, {:result, {:error, reason}}, pending)
          {:noreply, {name, nil, nil, nil}}
        end

      _ ->
        IO.puts("REDUNDANT DOWN: #{inspect(down_pid)} #{inspect(reason)}")
        {:noreply, state}
    end
  end

  def handle_info({:pending_task, mon}, {name, handles, reply_to, pending} = state) do
    # first timeout expired
    case {handles, pending} do
      {{%Task{ref: ^mon} = task, _}, false} ->
        ref = Process.send_after(self(), {:kill_task, mon}, @max_wait_ms)
        reply(reply_to, {:result, :pending}, pending)
        IO.puts("First time out: #{inspect(task)}")
        {:noreply, {name, {task, ref}, reply_to, true}}

      _ ->
        IO.puts("REDUNDANT first timeout")
        {:noreply, state}
    end
  end

  def handle_info({:kill_task, mon}, {name, handles, reply_to, pending} = state) do
    # last timeout expired
    case handles do
      {%Task{ref: ^mon} = task, _} ->
        Process.demonitor(mon, [:flush])
        kill_task(task)
        reply(reply_to, {:result, {:error, :timeout}}, pending)
        IO.puts("Timed out: #{inspect(task)}")
        {:noreply, {name, nil, nil, nil}}

      _ ->
        IO.puts("REDUNDANT timeout")
        {:noreply, state}
    end
  end

  def handle_info({mon, value}, {name, handles, reply_to, pending} = state)
      when is_reference(mon) do
    # normal task result arrives here - demonitor/flush :normal :DOWN
    case handles do
      {%Task{ref: ^mon}, timeout_ref} ->
        if timeout_ref do
          Process.cancel_timer(timeout_ref)
          Process.demonitor(mon, [:flush])
        end

        reply(reply_to, {:result, {:ok, value}}, pending)

        IO.puts("result: #{inspect(value)}")
        {:noreply, {name, nil, nil, nil}}

      _ ->
        IO.puts("REDUNDANT result: #{inspect(value)} with #{inspect(mon)}")
        {:noreply, state}
    end
  end

  def handle_info(msg, state) do
    IO.inspect(msg)
    {:noreply, state}
  end

  defp reply(reply_to, reply, false),
    do: GenServer.reply(reply_to, reply)

  defp reply({pid, _}, reply, _),
    do: send(pid, reply)

  # ---

  @timely_timeout div(@max_wait_ms, 2)
  @late_timeout @max_wait_ms + @timely_timeout
  @too_late_timeout 3 * @max_wait_ms

  defp launch_task(name, arg) do
    %Task{ref: mon} = task = Task.Supervisor.async_nolink(name, __MODULE__, :some_work, [arg])
    ref = Process.send_after(self(), {:pending_task, mon}, @max_wait_ms)
    {task, ref}
  end

  defp kill_task(%Task{pid: pid}),
    do: Process.exit(pid, :kill)

  def some_work(:timely = type) do
    Process.sleep(@timely_timeout)
    type
  end

  def some_work(:late = type) do
    Process.sleep(@late_timeout)
    type
  end

  def some_work(:too_late = type) do
    Process.sleep(@too_late_timeout)
    type
  end

  def some_work(:crash = type) do
    Process.sleep(@timely_timeout)
    exit(type)
  end

  def some_work(:late_crash = type) do
    Process.sleep(@late_timeout)
    exit(type)
  end

  def some_work(:too_late_crash = type) do
    Process.sleep(@too_late_timeout)
    exit(type)
  end

  # --- Demo API ---

  def launch(arg) do
    {:result, result} = GenServer.call(__MODULE__, {:launch, arg})
    result
  end
end
$ iex -S mix
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

Compiling 1 file (.ex)
Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Demo.launch(:timely)
result: :timely
{:ok, :timely}
iex(2)> Demo.launch(:late)  
:pending
iex(3)> First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.150.0>, ref: #Reference<0.3242099791.3754688513.92878>}
result: :late
flush
{:result, {:ok, :late}}
:ok
iex(4)> Demo.launch(:too_late)
First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.153.0>, ref: #Reference<0.3242099791.3754688516.94279>}
:pending
iex(5)> Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.153.0>, ref: #Reference<0.3242099791.3754688516.94279>}
flush
{:result, {:error, :timeout}}
:ok
iex(6)> Demo.launch(:crash)   

16:51:58.678 [error] Task #PID<0.156.0> started from Demo terminating
** (stop) :crash
    (my_app) lib/demo.ex:142: Demo.some_work/1
    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
    Args: [:crash]
DOWN: #PID<0.156.0> :crash
{:error, :crash}
iex(7)> Demo.launch(:late_crash)
First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.158.0>, ref: #Reference<0.3242099791.3754688516.94341>}
:pending
iex(8)> 
16:52:10.607 [error] Task #PID<0.158.0> started from Demo terminating
** (stop) :late_crash
    (my_app) lib/demo.ex:147: Demo.some_work/1
    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
    Args: [:late_crash]
DOWN: #PID<0.158.0> :late_crash
flush
{:result, {:error, :late_crash}}
:ok
iex(9)> Demo.launch(:too_late_crash)
First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.161.0>, ref: #Reference<0.3242099791.3754688516.94370>}
:pending
iex(10)> Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.161.0>, ref: #Reference<0.3242099791.3754688516.94370>}
flush
{:result, {:error, :timeout}}
:ok
iex(11)> 

Better, cleaner.

Deliver result(s) only one way - as a message.

The initial function call returns :ok to indicate that the request was accepted.

# file: my_app/lib/demo.ex
#
defmodule Demo do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  def init(args) do
    with {:ok, name} <- Keyword.fetch(args, :name) do
      {:ok, {name, nil, nil}}
    else
      _ ->
        {:stop, :badarg}
    end
  end

  def handle_call({:launch, arg}, from, {name, nil, nil}) do
    handles = launch_task(name, arg)
    {:reply, :ok, {name, handles, from}}
  end

  @max_wait_ms 500

  def handle_info({:DOWN, mon, _, down_pid, reason}, {name, handles, reply_to} = state) do
    # :DOWN message arrives when task exits
    case handles do
      {%Task{ref: ^mon} = task, timeout_ref} ->
        Process.cancel_timer(timeout_ref)
        Process.demonitor(mon, [:flush])
        IO.puts("DOWN: #{inspect(down_pid)} #{inspect(reason)}")

        if(reason == :normal) do
          {:noreply, {name, {task, nil}, reply_to}}
        else
          reply(reply_to, {:result, {:error, reason}})
          {:noreply, {name, nil, nil}}
        end

      _ ->
        IO.puts("REDUNDANT DOWN: #{inspect(down_pid)} #{inspect(reason)}")
        {:noreply, state}
    end
  end

  def handle_info({:pending_task, mon}, {name, handles, reply_to} = state) do
    # first timeout expired
    case handles do
      {%Task{ref: ^mon} = task, _} ->
        ref = Process.send_after(self(), {:kill_task, mon}, @max_wait_ms)
        reply(reply_to, {:result, :pending})
        IO.puts("First time out: #{inspect(task)}")
        {:noreply, {name, {task, ref}, reply_to}}

      _ ->
        IO.puts("REDUNDANT first timeout")
        {:noreply, state}
    end
  end

  def handle_info({:kill_task, mon}, {name, handles, reply_to} = state) do
    # last timeout expired
    case handles do
      {%Task{ref: ^mon} = task, _} ->
        Process.demonitor(mon, [:flush])
        kill_task(task)
        reply(reply_to, {:result, {:error, :timeout}})
        IO.puts("Timed out: #{inspect(task)}")
        {:noreply, {name, nil, nil}}

      _ ->
        IO.puts("REDUNDANT timeout")
        {:noreply, state}
    end
  end

  def handle_info({mon, value}, {name, handles, reply_to} = state)
      when is_reference(mon) do
    # normal task result arrives here - demonitor/flush :normal :DOWN
    case handles do
      {%Task{ref: ^mon}, timeout_ref} ->
        if timeout_ref do
          Process.cancel_timer(timeout_ref)
          Process.demonitor(mon, [:flush])
        end

        reply(reply_to, {:result, {:ok, value}})

        IO.puts("result: #{inspect(value)}")
        {:noreply, {name, nil, nil}}

      _ ->
        IO.puts("REDUNDANT result: #{inspect(value)} with #{inspect(mon)}")
        {:noreply, state}
    end
  end

  def handle_info(msg, state) do
    IO.inspect(msg)
    {:noreply, state}
  end

  defp reply({pid, _}, reply),
    do: send(pid, reply)

  # ---

  @timely_timeout div(@max_wait_ms, 2)
  @late_timeout @max_wait_ms + @timely_timeout
  @too_late_timeout 3 * @max_wait_ms

  defp launch_task(name, arg) do
    %Task{ref: mon} = task = Task.Supervisor.async_nolink(name, __MODULE__, :some_work, [arg])
    ref = Process.send_after(self(), {:pending_task, mon}, @max_wait_ms)
    {task, ref}
  end

  defp kill_task(%Task{pid: pid}),
    do: Process.exit(pid, :kill)

  def some_work(:timely = type) do
    Process.sleep(@timely_timeout)
    type
  end

  def some_work(:late = type) do
    Process.sleep(@late_timeout)
    type
  end

  def some_work(:too_late = type) do
    Process.sleep(@too_late_timeout)
    type
  end

  def some_work(:crash = type) do
    Process.sleep(@timely_timeout)
    exit(type)
  end

  def some_work(:late_crash = type) do
    Process.sleep(@late_timeout)
    exit(type)
  end

  def some_work(:too_late_crash = type) do
    Process.sleep(@too_late_timeout)
    exit(type)
  end

  # --- Demo API ---

  def launch(arg),
    do: GenServer.call(__MODULE__, {:launch, arg})
end
$ iex -S mix
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

Compiling 1 file (.ex)
Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Demo.launch(:timely)
:ok
iex(2)> result: :timely
flush
{:result, {:ok, :timely}}
:ok
iex(3)> Demo.launch(:late)  
:ok
iex(4)> First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.151.0>, ref: #Reference<0.20248824.1341128709.106775>}
result: :late
flush
{:result, :pending}
{:result, {:ok, :late}}
:ok
iex(5)> Demo.launch(:too_late)
:ok
iex(6)> First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.154.0>, ref: #Reference<0.20248824.1341128709.106808>}
Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.154.0>, ref: #Reference<0.20248824.1341128709.106808>}
flush
{:result, :pending}
{:result, {:error, :timeout}}
:ok
iex(7)> Demo.launch(:crash)   
:ok
iex(8)> 
17:08:54.993 [error] Task #PID<0.157.0> started from Demo terminating
** (stop) :crash
    (my_app) lib/demo.ex:139: Demo.some_work/1
    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
    Args: [:crash]
DOWN: #PID<0.157.0> :crash
flush
{:result, {:error, :crash}}
:ok
iex(9)> Demo.launch(:late_crash)
:ok
iex(10)> First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.160.0>, ref: #Reference<0.20248824.1341128711.105922>}
DOWN: #PID<0.160.0> :late_crash

17:09:09.046 [error] Task #PID<0.160.0> started from Demo terminating
** (stop) :late_crash
    (my_app) lib/demo.ex:144: Demo.some_work/1
    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
    Args: [:late_crash]
flush
{:result, :pending}
{:result, {:error, :late_crash}}
:ok
iex(11)> Demo.launch(:too_late_crash)
:ok
iex(12)> First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.163.0>, ref: #Reference<0.20248824.1341128709.106905>}
Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.163.0>, ref: #Reference<0.20248824.1341128709.106905>}
flush
{:result, :pending}
{:result, {:error, :timeout}}
:ok
iex(13)>