Timeout Task.Supervisor.async Call

I have a small ETS cache on each node that need when a user brings up their log stream. Works very well except during deployments. I’m working on timing the libcluster updates better but also how exactly do I timeout this fn after 5 seconds? Putting the timeout on yield_many doesn’t actually do anything.

def get_logs_across_cluster(source_id) when is_atom(source_id) do
    nodes = Cluster.Utils.node_list_all()

    results =
      for n <- nodes do
        Task.Supervisor.async(
          {Logflare.TaskSupervisor, n},
          __MODULE__,
          :get_logs,
          [source_id]
        )
      end
      |> Task.yield_many()
      |> Enum.map(fn {task, res} ->
        res || Task.shutdown(task, :brutal_kill)
      end)

    for {:ok, events} <- results do
      events
    end
    |> List.flatten()
    |> Enum.sort_by(& &1.body.timestamp, &<=/2)
    |> Enum.take(-100)
  end

Can you elaborate on what “doesn’t actually do anything” means? Does the function return? Does it error?

Thanks, yeah this is what I get…

[error] ** Node :"orange@127.0.0.1" not responding **
** Removing (timedout) connection **

[error] #PID<0.1301.0> running LogflareWeb.Endpoint (connection #PID<0.1213.0>, stream id 26) terminated
Server: localhost:4000 (http)
Request: GET /sources/16897
** (exit) exited in: GenServer.call({Logflare.TaskSupervisor, :"orange@127.0.0.1"}, {:start_task, [{:"pink@127.0.0.1", #PID<0.1301.0>, #PID<0.1301.0>}, [#PID<0.1301.0>], :monitor, {Logflare.Source.Data, :get_logs, [:"f74abf47-76e2-4af6-997a-a79802c2874b"]}], :temporary, nil}, :infinity)
    ** (EXIT) no connection to orange@127.0.0.1

Ah it’s not even getting to yield_many because it can’t find the node to start the task on.

Ah, async_nolink doesn’t blow up. Now to figure out how to shorten the timeout.

And I guess I should do:

...
|> Enum.map(fn {%Task{pid: pid}, res} ->
   res || Task.Supervisor.terminate_child(Logflare.TaskSupervisor, pid)
end)

As long as it doesn’t blow up I think I’m okay for now.

This actually doesn’t seem to be the case. Maybe I just need to use my own supervisor gen server and handle the exit.

Well wrapping it all in it’s own Task does the trick also:

def get_logs_across_cluster(source_id) when is_atom(source_id) do
    nodes = Cluster.Utils.node_list_all()

    task =
      Task.async(fn ->
        for n <- nodes do
          Task.Supervisor.async_nolink(
            {Logflare.TaskSupervisor, n},
            __MODULE__,
            :get_logs,
            [source_id]
          )
        end
        |> Task.yield_many()
        |> Enum.map(fn {%Task{pid: pid}, res} ->
          res || Task.Supervisor.terminate_child(Logflare.TaskSupervisor, pid)
        end)
      end)

    case Task.yield(task, 5_000) || Task.shutdown(task) do
      {:ok, results} ->
        for {:ok, events} <- results do
          events
        end
        |> List.flatten()
        |> Enum.sort_by(& &1.body.timestamp, &<=/2)
        |> Enum.take(-100)

      _else ->
        get_logs(source_id)
    end
  end

Double tasking it seems… odd. What issue did you run into with async_nolink?

Not sure exactly I was just getting a similar error. One time it looked like it didn’t error but on continued testing it did.

This makes sense to me though because Task.Supervisor is trying to start a task on a networked node, so it’s stalling on the network connection there, not on the actual task it’s trying to spawn.