Thank you already for your beautiful input @lucaong & @alvises. Let’s talk source code to make it more specific. This is my issue, runnable, but “quick” and (certainly) dirty:
It needs Flow:
$ git clone https://github.com/plataformatec/flow.git
$ cd flow
$ mix deps.get
foo.exs
:
require Logger
defmodule Source do
use GenStage
def start_link(_) do
GenStage.start_link(__MODULE__, nil, name: __MODULE__)
end
@impl true
def init(_) do
{:producer, nil}
end
@impl true
def handle_demand(_demand, _state) do
Logger.info "Source handle_demand"
:timer.sleep(1_000)
{:noreply, [:rand.uniform(100)], nil}
end
end
defmodule Server do
use GenServer
def start_link do
Logger.info "Server start"
GenServer.start_link __MODULE__, nil, name: __MODULE__
end
@impl true
def init(_) do
server = self()
spawn_link(fn -> start_flow(server) end)
{:ok, nil}
end
def start_flow(server) do
Logger.info "Server start_flow"
Flow.from_specs([Source], window: Flow.Window.periodic(1, :second), stages: 1)
|> Flow.reduce(fn -> [] end, fn thing, acc -> [thing | acc] end)
|> Flow.on_trigger(fn things -> add_things(server, things); {[], []} end)
|> Flow.run
raise "never exits"
end
def add_things(pid, things) do
GenServer.cast pid, {:stuff, things}
end
@impl true
def handle_cast({:stuff, things}, state) do
Logger.info "Server stuff things=#{inspect things}"
{:noreply, state}
end
end
defmodule T do
use Task # restart: :transient # does not make a difference here !?
def start_link do
Logger.info "Task start"
Task.Supervisor.start_child Foo.TaskSupervisor, __MODULE__, :run, [], restart: :transient # defining restart here makes a difference
end
def run do
Logger.info "Task run"
case Server.start_link do
{:ok, _pid} -> Logger.info "Task run server started successfully"
error -> Logger.info "Task run: error starting server: error=#{inspect error}"
end
:timer.sleep :infinity # long running task
end
end
# Application fragment
#
children = [
{Task.Supervisor, name: Foo.TaskSupervisor},
]
opts = [strategy: :one_for_one, name: __MODULE__]
Supervisor.start_link(children, opts)
{:ok, task} = T.start_link
# simulate task crash
:timer.kill_after(5_000, task)
:timer.sleep :infinity
To run it:
mix run foo.exs
Expected: Everything boots, runs smoothly, once the task fails (simulated after 5s) everything crashes and restarts correctly, and the system runs smoothly again.
Actual: Once the task fails, it can not be restarted anymore b/c the GenStage
is already running, but it should have been killed as well upon task failure and therefore not be in the way of the task being restarted.
Thank you for your input—much, much appreciated!