Long running Task w/ subprocess?

Ok, this is probably a bloody beginner’s question:

I have long running Tasks that are started via Task.Supervisor.start_child. To compute their result, they need to start a couple individual GenServers which should shutdown as well in case of task failure or shutdown. It seems I can not provide :children to Task.Supervisor.start_child—what is best practice/OTP behavior to start subprocesses of tasks?

Or is it simply to use GenServer.start_link within my run function of the Task?

Thank you!

Do you need to supervise/restart the GenServer as well in case of crash, or you just want to crash everything if one of these GenServers crash?

Thank you for your help @alvises.

I want to crash everything (in particular incl. the subprocesses) in case the task crashes.

defmodule MyTask do
  use Task

  def start_link(arg) do
    Task.start_link(__MODULE__, :run, [arg])
  end

  def run(args) do
       ...
        {:ok, pid} = YourGenServer1.start_link([])
        {:ok, pid} = YourGenServer2.start_link([])
  	some side effects
  	...
  end

end

The YourGenServerX.start_link does not seem to do the trick. What’s best practice—can I start these GenServers as part of starting the task itself or do I have to trap exit in the GenServers?

Background: one of the GenServer itself starts another subprocess (with spawn_link — probably a bad idea) which then starts a Flow which then starts via Flow.from_specs a GenStage… The last part is not getting restarted anymore…

If you start the GenServer using start_link (assuming that YourGenServer.start_link actually calls GenServer.start_link under the hood), they will be linked to the process that creates them (the Task). Therefore, if YourGenServer crashes, it will also cause the Task to crash. Whether the supervisor of the Task will restart it or not depends on how you set the restart option for your Task:

  • By default, it will be temporary, so the Task won’t be restarted on failure

  • You can change it to use Task, restart: :permanent to make the Supervisor restart the Task if it crashes (it will still let the Task terminate upon successful completion)

So, if I understand correctly your case, you could just use start_link like in your last example, but set restart: :permanent in use Task to cause the Task to be restarted if the “children” crash (causing the Task to crash too).

2 Likes

If these are all linked processe, when one of these processes crashes it starts a chain reaction crashing all the linked processes.



defmodule MyTask do
  use Task

  def run(args) do
    IO.inspect(args, label: "args")
  	{:ok, _agent1} = Agent.start_link(fn -> :agent_1 end, name: Agent1)
  	{:ok, _agent2} = Agent.start_link(fn -> :agent_2 end, name: Agent2)
  	Process.sleep 20_000
  end
end



ExUnit.start()

defmodule SupervisedTaskTest do
  use ExUnit.Case, async: true

  test "when task crashes, it brings everything down" do
    #starting the task supervisor
    {:ok, supervisor} = Task.Supervisor.start_link([])

    #starting a supervised task
    {:ok, task_pid} = Task.Supervisor.start_child(supervisor, MyTask, :run, ["hello"])

    Process.sleep(100) #let's wait that everything is up

    agent1_pid = Process.whereis(Agent1)
    agent2_pid = Process.whereis(Agent2)

    assert Process.alive?(agent1_pid)
    assert Process.alive?(agent2_pid)

    Process.exit(task_pid, :kill)

    Process.sleep(100) # let's wait that everything is down
    refute Process.alive?(task_pid)

    refute Process.alive?(agent1_pid)
    refute Process.alive?(agent2_pid)

    IO.inspect(Process.alive?(agent1_pid), label: "is agent1 alive?")
  end

  test "when agent crashes, it brings everything down" do
    #starting the task supervisor
    {:ok, supervisor} = Task.Supervisor.start_link([])

    #starting a supervised task
    {:ok, task_pid} = Task.Supervisor.start_child(supervisor, MyTask, :run, ["hello"])

    Process.sleep(100) #let's wait that everything is up

    agent1_pid = Process.whereis(Agent1)
    agent2_pid = Process.whereis(Agent2)

    assert Process.alive?(agent1_pid)
    assert Process.alive?(agent2_pid)

    Process.exit(agent1_pid, :kill)

    Process.sleep(100) # let's wait that everything is down
    refute Process.alive?(task_pid)

    refute Process.alive?(agent1_pid)
    refute Process.alive?(agent2_pid)

    IO.inspect(Process.alive?(agent1_pid), label: "is agent1 alive?")
  end

end


If you run this test, you see that if manually kill the task process, since the other processes are linked, they will crash.

The Task.Supervisor starts children with :temporary strategy, meaning they will not be restarted after a crash.

UPDATE: added a second test where killing a process linked to the task

1 Like

In general though, judging from what you describe about the dependencies between several nested processes, this might benefit from a bit of upfront design, possibly with a supervision hierarchy. Designing supervision hierarchies is not simple at the beginning, but there are good resources and established patterns once you clarify what you want to achieve, and the relationship between the processes.

Start by asking yourself which processes need restart, which ones should “share the same destiny” and fail together, which one should run in parallel rather than sequentially. Map the information flow between them: who needs to communicate with who? Can you identify sub-systems that could leave under their own supervisor?

1 Like

Thank you already for your beautiful input @lucaong & @alvises. Let’s talk source code to make it more specific. This is my issue, runnable, but “quick” and (certainly) dirty:

It needs Flow:

$ git clone https://github.com/plataformatec/flow.git
$ cd flow
$ mix deps.get

foo.exs :

require Logger

defmodule Source do
  use GenStage

  def start_link(_) do
    GenStage.start_link(__MODULE__, nil, name: __MODULE__)
  end

  @impl true
  def init(_) do
    {:producer, nil}
  end

  @impl true
  def handle_demand(_demand, _state) do
    Logger.info "Source handle_demand"
    :timer.sleep(1_000)
    {:noreply, [:rand.uniform(100)], nil}
  end
end

defmodule Server do
  use GenServer

  def start_link do
    Logger.info "Server start"
    GenServer.start_link __MODULE__, nil, name: __MODULE__
  end

  @impl true
  def init(_) do
    server = self()
    spawn_link(fn -> start_flow(server) end)
    {:ok, nil}
  end

  def start_flow(server) do
    Logger.info "Server start_flow"
    Flow.from_specs([Source], window: Flow.Window.periodic(1, :second), stages: 1)
    |> Flow.reduce(fn -> [] end, fn thing, acc -> [thing | acc] end)
    |> Flow.on_trigger(fn things -> add_things(server, things); {[], []} end)
    |> Flow.run

    raise "never exits"
  end

  def add_things(pid, things) do
    GenServer.cast pid, {:stuff, things}
  end

  @impl true
  def handle_cast({:stuff, things}, state) do
    Logger.info "Server stuff things=#{inspect things}"
    {:noreply, state}
  end
end

defmodule T do
  use Task # restart: :transient   # does not make a difference here !?

  def start_link do
    Logger.info "Task start"
    Task.Supervisor.start_child Foo.TaskSupervisor, __MODULE__, :run, [], restart: :transient  # defining restart here makes a difference
  end

  def run do
    Logger.info "Task run"
    case Server.start_link do
      {:ok, _pid} -> Logger.info "Task run server started successfully"
      error -> Logger.info "Task run: error starting server: error=#{inspect error}"
    end
    :timer.sleep :infinity  # long running task
  end
end


# Application fragment
#
children = [
  {Task.Supervisor, name: Foo.TaskSupervisor},
]
opts = [strategy: :one_for_one, name: __MODULE__]
Supervisor.start_link(children, opts)



{:ok, task} = T.start_link

# simulate task crash
:timer.kill_after(5_000, task)

:timer.sleep :infinity

To run it:

mix run foo.exs

Expected: Everything boots, runs smoothly, once the task fails (simulated after 5s) everything crashes and restarts correctly, and the system runs smoothly again.

Actual: Once the task fails, it can not be restarted anymore b/c the GenStage is already running, but it should have been killed as well upon task failure and therefore not be in the way of the task being restarted.

Thank you for your input—much, much appreciated!

First, spawning processes is cheap, but in general I would be really careful to spawn a lot of processes around because it adds complexity, especially with spawn_link or nested spawning (in this case a task, that starts a genserver that spawns a process… it’s easy to loose track of the processes). A good article about it: To spawn, or not to spawn?.

Now you have

[Task Supervisor] ---> [ Task ] ----> [ Server ] ----> flow 

I’d do a bit of uncoupling and redesign, trying to make the things simpler and avoid the nested structure. Something like

[Supervisor] -- [Server]
       |
     [ Flow ]

(Sorry, I’m terrible with ascii art :sweat_smile:)

I see that Server (a genserver), when started spawns a linked long-runnig process …(start_flow) is supposed to never return right? Are you using the genserver to call start_flow to make the flow part supervised? Or do you need it?

Let’s consider we need the Server to store the result of flow and react doing some other stuff… then we just need Source, Server, but we don’t need Task. Instead I would create a separate flow module which can be supervised

defmodule MyFlow do
  use Flow

  def start_link(server) do
    Flow.from_specs([Source], window: Flow.Window.periodic(1, :second), stages: 1)
    |> Flow.reduce(fn -> [] end, fn thing, acc -> [thing | acc] end)
    |> Flow.on_trigger(fn things -> Server.add_things(server, things); {[], []} end)
    |> Flow.start_link()
  end
end

and I would supervise only Server and MyFlow (if both need to stay alive all the time). The argument of MyFlow is the server name.

children = [
  {Server, []},
  {MyFlow, Server}
]

opts = [strategy: :one_for_one, name: MySupervisor]
Supervisor.start_link(children, opts)

In this way MyFlow and Server are started (and restarted) independently by the supervisor. This is just a quick re-design, for sure is possible to make the things cleaner… but I still don’t understand what you need to do :smile: What’s the purpose of Server?

2 Likes

I bet that’s where my ‘design’ break, just don’t know how to fix it yet.

Correct

Both. It pulls data from the flow, stores it in its state and then (the part that’s not in the example) it serves requests and fulfills based on its state.

Is see where you are going with this, and I like it, but unfortunately I need a vehicle like Task to start this whole thing many, many times in parallel. Currently using Supervised Tasks allow me to do so, but handling subprocesses correctly seems to be tricky.

Interesting path. Perhaps I could use this and supervise it from a supervisor which is dynamically started from Task and supervises Server & MyFlow. And I then have this battle only to sync Task vs. (sub) Supervisor restarts…? Another process on top of this already ugly vehicle… or?

Can you please explain this? You have a infinite-running flow with a Server process that acts as a consumer (btw any reason why not using a GenStage consumer)?. When you need to start these server/flow in parallel?

Why do you want to start the supervisor with a task? If I understood what you want to do, you don’t need Task, you just need to start a different Supervisor with Server and MyFlow. At this point would be better to use a Registry instead of fixed names.

Check also a DynamicSupervisor

[DynamicSupervisor] 
     -> Supervisor (1) -> [Server, Flow]
     -> Supervisor (2) -> [Server, Flow]

No, that would be the correct design. And since my quick and dirty solution can’t be easily fixed, I will implement this part now correctly via GenStage|consumer.

That’s actually correct. Since my ‘tasks’ have sub processes which need supervision, I guess I should not use Task, but a custom Supervisor which then is started through DynamicSupervisor…

But where does the computation from my Task go then? Guess I can move that to just another process which will be part of the supervised tree.

[DynamicSupervisor] 
     -> Supervisor (1) -> [Server, Flow, TaskLogic]
     -> Supervisor (2) -> [Server, Flow, TaskLogic]

Of course, just simplified for the example code. Cheers.

Oh boy, let me restructure this beast… :smiley:

Thanks a ton @alvises !

Something like this

defmodule Server do

  # I'm using the Registry named SupervisorsRegistry (check below)
  def name(sup_id) do
    {:via, Registry, {SupervisorsRegistry, {sup_id, Server}}}
  end

  def start_link(sup_id) do
    Logger.info "Starting Server with id #{sup_id}"
    GenServer.start_link(__MODULE__, sup_id, name: name(sup_id))
  end
  ...

end

defmodule MyFlow do
  use Flow

  def start_link(sup_id) do
    Logger.info "starting MyFlow with id #{sup_id}"
    
    server = Server.name(sup_id)
    ...
  end
end


defmodule ServerFlowSupervisor do
 use Supervisor

  def start_link(sup_id) do
    Supervisor.start_link(__MODULE__, sup_id)
  end

  def init(sup_id) do
    children = [
      {Server, sup_id},
      {MyFlow, sup_id}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end 
end


## And then we start our registry and dynamic supervisor

children = [
  {Registry, [keys: :unique, name: SupervisorsRegistry]},
  {DynamicSupervisor, strategy: :one_for_one, name: MyDynamicSupervisor}
]

Supervisor.start_link(children, strategy: :one_for_one)

DynamicSupervisor.start_child(MyDynamicSupervisor, {ServerFlowSupervisor, 1})
Process.sleep 5_000
DynamicSupervisor.start_child(MyDynamicSupervisor, {ServerFlowSupervisor, 2})
DynamicSupervisor.start_child(MyDynamicSupervisor, {ServerFlowSupervisor, 3})

Process.sleep :infinity

In this example we are able to dynamically start our ServerFlowSupervisor instead of using Task. We assign an sup_id to each server/flow and use a Registry for naming. 1, 2, 3 are the ids.

Starting Server with id 1
starting MyFlow with id 1
...
Server (1) stuff things='V'
...
Starting Server with id 2
starting MyFlow with id 2
...
Server (2) stuff things=[14]
Server (1) stuff things=[]
...

But without knowing what this is about exactly, there could be a better way to do what you need to do… In general using GenStage and Flow library modules/functions as much as you can you get a lot for free.

Independent Q (have not used a Registry yet): why not simply calc a name like def name(id), do: :"#{__MODULE__}[#{id}]" ? (I am not using IDs, but a limited set of [store] names)

This looks actually pretty good… :slight_smile:

Well, I have many user accounts (actually their stores, however). For each of them I have three streams (better flows) that need to be joined together (I avoid Flow.join since I broke Erlang with this and I failed to join two streams as I desired—feel I’d need a join with two separate windows, periodic left and global right) and the resulting flow with all information is batched an pushed to Google Spreadsheets. Not sure if this helps for anything tho.

Will try your design now.

the name can be an atom, a {:global, term} or {:via, module, term}. You can use the last one to set on a registry your key (a tuple like my example or a string) to the pid of the genserver.

Since the registry monitors the process, if this crashes is removed from the registry. In our case, when the Server (with its sup_id) is restarted (with a new pid), the new pid is set again in the registry, with the same {sup_id, Server} name.

Take a look at the Registry, it’s pretty useful for many things.

But once you get your result and you push all the information in Google Spreadsheets, do you still need that Flow running?

Will be using one now based on your code, thanks!

Yes, since new events come in over time and shall be pushed somewhat near realtime to a sheet.

ok, then take also a look at broadway

1 Like