I have a small ETS cache on each node that need when a user brings up their log stream. Works very well except during deployments. I’m working on timing the libcluster updates better but also how exactly do I timeout this fn after 5 seconds? Putting the timeout on yield_many
doesn’t actually do anything.
def get_logs_across_cluster(source_id) when is_atom(source_id) do
nodes = Cluster.Utils.node_list_all()
results =
for n <- nodes do
Task.Supervisor.async(
{Logflare.TaskSupervisor, n},
__MODULE__,
:get_logs,
[source_id]
)
end
|> Task.yield_many()
|> Enum.map(fn {task, res} ->
res || Task.shutdown(task, :brutal_kill)
end)
for {:ok, events} <- results do
events
end
|> List.flatten()
|> Enum.sort_by(& &1.body.timestamp, &<=/2)
|> Enum.take(-100)
end
Can you elaborate on what “doesn’t actually do anything” means? Does the function return? Does it error?
Thanks, yeah this is what I get…
[error] ** Node :"orange@127.0.0.1" not responding **
** Removing (timedout) connection **
[error] #PID<0.1301.0> running LogflareWeb.Endpoint (connection #PID<0.1213.0>, stream id 26) terminated
Server: localhost:4000 (http)
Request: GET /sources/16897
** (exit) exited in: GenServer.call({Logflare.TaskSupervisor, :"orange@127.0.0.1"}, {:start_task, [{:"pink@127.0.0.1", #PID<0.1301.0>, #PID<0.1301.0>}, [#PID<0.1301.0>], :monitor, {Logflare.Source.Data, :get_logs, [:"f74abf47-76e2-4af6-997a-a79802c2874b"]}], :temporary, nil}, :infinity)
** (EXIT) no connection to orange@127.0.0.1
Ah it’s not even getting to yield_many
because it can’t find the node to start the task on.
Ah, async_nolink
doesn’t blow up. Now to figure out how to shorten the timeout.
And I guess I should do:
...
|> Enum.map(fn {%Task{pid: pid}, res} ->
res || Task.Supervisor.terminate_child(Logflare.TaskSupervisor, pid)
end)
As long as it doesn’t blow up I think I’m okay for now.
This actually doesn’t seem to be the case. Maybe I just need to use my own supervisor gen server and handle the exit.
Well wrapping it all in it’s own Task
does the trick also:
def get_logs_across_cluster(source_id) when is_atom(source_id) do
nodes = Cluster.Utils.node_list_all()
task =
Task.async(fn ->
for n <- nodes do
Task.Supervisor.async_nolink(
{Logflare.TaskSupervisor, n},
__MODULE__,
:get_logs,
[source_id]
)
end
|> Task.yield_many()
|> Enum.map(fn {%Task{pid: pid}, res} ->
res || Task.Supervisor.terminate_child(Logflare.TaskSupervisor, pid)
end)
end)
case Task.yield(task, 5_000) || Task.shutdown(task) do
{:ok, results} ->
for {:ok, events} <- results do
events
end
|> List.flatten()
|> Enum.sort_by(& &1.body.timestamp, &<=/2)
|> Enum.take(-100)
_else ->
get_logs(source_id)
end
end
Double tasking it seems… odd. What issue did you run into with async_nolink
?
Not sure exactly I was just getting a similar error. One time it looked like it didn’t error but on continued testing it did.
This makes sense to me though because Task.Supervisor
is trying to start a task on a networked node, so it’s stalling on the network connection there, not on the actual task it’s trying to spawn.