Supervision strategy for a stateful web client?

I wrote a small GenServer which performs a long-running web task (specifically device code authentication against an identity provider). When the server is launched, it fetches an initial value from the identity provider, and then regularly polls via HTTP for updates. So I have state (the values fetched initially), and a potentially crashing sequence of HTTP calls.

If both, state and the polling loop, are in the same gen_server, a failing HTTP call wipes away my state. So I understand I need to keep that state in an Agent, have a process for polling (which uses the PID of the Agent to store/update/fetch state), and an overall process which supervises the state Agent, and the polling worker.

               SUP
              /   \
             /     \
            /       \
        State <---- Worker
        Agent       polling

My question is: Where should the API be implemented for interacting with the whole thing? For example, I want to check if the authentication was successful, so I need to read values from the state. The client only has the PID of the overall supervisor, which in turn has the PID of the state agent. So when my client wants to see parts of the state, I need to ask the overall supervisor, which then forwards the call to the state agent.

Is that how it’s supposed to be?

1 Like

Maybe store state in shared ETS table?

1 Like

Communicating with genservers (and agents) can also be done by looking them up in a Registry or naming them.

The Agent documentation has a named agent as its first example https://hexdocs.pm/elixir/Agent.html#module-examples

1 Like

Thanks for the pointers to ETS and named processes.

Hi @hauleth, given that the state is for a specific sign-in operation, I don’t want it to live “globally” in ETS, but co-located to the sign-in process.

Hi @jola, given that there are multiple concurrent sign-in operations, it doesn’t feel right to give the agent a “name”, because it’s only dedicated to a specific process.

:wave:

You might also be able to use a “single” process with supervised / retried http requests. Thus when the polling fails, it wouldn’t crash the genserver. You can keep the pid of that genserver in a registry under the device code.

1 Like

Right, but a Registry might still work, take a look at using via tuples ({:via, module, name}) to both register and call/cast. If an operation has any kind of ID, a request ID or whatever, you would be able to use that to dynamically register processes and look them up. You can even pass the via tuple as an argument to GenServer.call etc.

2 Likes

I think @jola is correct here. You want to name these processes somehow in order to look them up. Otherwise when your polling process crashes it won’t have access to the existing state. So you can either use a registry or give them well defined names. Without knowing more about your problem I’d build something similar to this:

defmodule Authenticator do
  def child_spec(args) do
    children = [
      Worker,
      {State, name: args[:name]},
    ]

    %{
      id: __MODULE__,
      type: :supervisor,
      start: {Supervisor, :start_link, [children, [strategy: :one_for_one]]}
    }
  end

  def lookup(name) do
    Agent.get(name, & &1)
  end
end

Now you can add Authenticator to your supervision tree. When you need to look things up it’ll go through the authenticator module and the details of how things are stored and accessed can be hidden away. For instance if you decide to go the ETS route because you need concurrent reads then this will be encapsulated in Authenticator and your callers won’t have to change.

3 Likes

+1 for naming them.

  • a :public table can only be accessed “globally” if it is named - otherwise any process accessing it has to somehow have to get ahold of the table ID. So it isn’t uncommon for a supervisor to create a public table for one of its child processes and hand the child process the table id. Then each time the process is restarted it takes over the existing table.

  • with protected tables you can play the heir - give_away game. A simple owner process transfers ownership to a requesting process but gets it back when that process dies.

Demo script:

# file: lib/demo.ex
#
defmodule Demo do
  def run do
    cycle(3)
  end

  defp cycle(n) when n < 1 do
    :ok
  end

  defp cycle(n) do
    increment(3)
    pid = kill_and_wait()
    if(pid, do: cycle(n - 1), else: :error)
  end

  defp increment(n) when n < 1 do
    :ok
  end

  defp increment(n) do
    increment()
    increment(n - 1)
  end

  defp increment do
    {:count, value} = DontLose.Counter.increment()
    IO.puts("#{value}")
  end

  defp kill_and_wait() do
    name = DontLose.Counter
    pid = Process.whereis(name)
    ref = Process.monitor(pid)
    Process.exit(pid, :kill)

    receive do
      {:DOWN, ^ref, :process, ^pid, :killed} ->
        :ok
    end

    wait(name, 10, 10)
  end

  defp wait(_, _, left) when left < 1 do
    nil
  end

  defp wait(name, timeout, left) do
    case Process.whereis(name) do
      nil ->
        Process.sleep(timeout)
        wait(name, timeout, left - 1)

      pid ->
        pid
    end
  end
end

Demo session:

Interactive Elixir (1.8.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Demo.run()
1
2
3
4
5
6
7
8
9
:ok
iex(2)> 

Public table:

# file: lib/dont_lose/application.ex
#
defmodule DontLose.Application do
  use Application

  def start(_type, _args) do
    DontLose.Supervisor.start_link([])
  end
end

# file: lib/dont_lose/supervisor.ex
#
defmodule DontLose.Supervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    table = :ets.new(:counter_storage, [:set, :public])
    :ets.insert(table, {:counter, 0})

    children = [
      {DontLose.Counter, table}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

# file: lib/dont_lose/counter.ex
#
defmodule DontLose.Counter do
  use GenServer

  def start_link(table) do
    GenServer.start_link(__MODULE__, table, name: __MODULE__)
  end

  @impl true
  def init(table) do
    # retrieve value from backup
    [{:counter, count}] = :ets.lookup(table, :counter)
    {:ok, {table, count}}
  end

  @impl true
  def handle_call(:increment, _from, {table, count}) do
    new_count = count + 1
    # backup value
    :ets.insert(table, {:counter, new_count})

    {:reply, {:count, new_count}, {table, new_count}}
  end

  # --- API
  def increment,
    do: GenServer.call(__MODULE__, :increment)
end

Protected “heir” table:

# file: lib/dont_lose/application.ex
#
defmodule DontLose.Application do
  use Application

  def start(_type, _args) do
    supervisor = DontLose.Supervisor

    children = [
      {DontLose.Keeper, nil},
      {DontLose.Counter, supervisor}
    ]

    opts = [strategy: :rest_for_one, name: supervisor]
    Supervisor.start_link(children, opts)
  end
end

# file: lib/dont_lose/keeper.ex
#
defmodule DontLose.Keeper do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  @impl true
  def init(_args) do
    # create and initialize table
    table = :ets.new(:counter_storage, [:set, :protected, {:heir, self(), :counter_heir}])
    :ets.insert(table, {:counter, 0})

    {:ok, table}
  end

  @impl true
  def handle_call({:request_table, pid}, _from, table) when not is_nil(table) do
    :ets.give_away(table, pid, :counter_transfer)
    {:reply, :ok, nil}
  end

  @impl true
  def handle_info({:"ETS-TRANSFER", table, _pid, :counter_heir}, _state) do
    {:noreply, table}
  end

  # --- API
  def request_table(keeper, pid),
    do: GenServer.call(keeper, {:request_table, pid})
end

# file: lib/dont_lose/counter.ex
#
defmodule DontLose.Counter do
  use GenServer

  alias DontLose.Keeper

  def start_link(sup) do
    GenServer.start_link(__MODULE__, sup, name: __MODULE__)
  end

  @impl true
  def init(sup) do
    state = []
    {:ok, state, {:continue, sup}}
  end

  @impl true
  def handle_continue(sup, _state) do
    children = Supervisor.which_children(sup)

    case find_keeper(children) do
      nil ->
        {:stop, :no_keeper, nil}

      pid ->
        # request table from keeper
        :ok = Keeper.request_table(pid, self())
        {:noreply, nil}
    end
  end

  @impl true
  def handle_call(:increment, _from, {table, count}) do
    new_count = count + 1
    # backup value
    :ets.insert(table, {:counter, new_count})

    {:reply, {:count, new_count}, {table, new_count}}
  end

  @impl true
  def handle_info({:"ETS-TRANSFER", table, _pid, :counter_transfer}, _state) do
    # retrieve current count
    [{:counter, count}] = :ets.lookup(table, :counter)
    {:noreply, {table, count}}
  end

  defp find_keeper([]) do
    nil
  end

  defp find_keeper([{DontLose.Keeper, pid, :worker, _modules} | _tail]) do
    pid
  end

  defp find_keeper([_ | tail]) do
    find_keeper(tail)
  end

  # --- API
  def increment,
    do: GenServer.call(__MODULE__, :increment)
end

The above is for simple demonstration only as not all edge cases are covered.

3 Likes

Yesterday I played a bit with the idea. I’m currently not using ETS, just on Supervisor, GenServer (as worker) and Agent. Essentially, the job of the worker is to increment a counter. The public API is implemented in the SuperVisor.

iex(1)> [p1, p2] = Demo.demo()
Worker #PID<0.153.0> initialized. Supervisor #PID<0.151.0>. Initial count 0
Worker #PID<0.156.0> initialized. Supervisor #PID<0.154.0>. Initial count -10
[#PID<0.151.0>, #PID<0.154.0>]
iex(2)> [p1, p2] |> Demo.show()
["9", "38"]
iex(3)> [p1, p2] |> Demo.show()
["13", "55"]
iex(4)> p1 |> WorkerSupervisor.kill_worker
Worker #PID<0.160.0> initialized. Supervisor #PID<0.151.0>. Initial count 25
true
iex(5)> [p1, p2] |> Demo.show()           
["28", "131"]
iex(6)> p1 |> WorkerSupervisor.kill_state()
true
iex(7)> [p1, p2] |> Demo.show()            
["39", "188"]
iex(8)> p1 |> WorkerSupervisor.kill_worker()
true
Worker #PID<0.166.0> initialized. Supervisor #PID<0.151.0>. Initial count 46
iex(9)> [p1, p2] |> Demo.show()             
["49", "237"]
  • The WorkerSupervisor starts the Agent and the Worker (and passes it’s own SuperVisor PID to the Worker).
  • The Worker discovers (through the Supervisor) which is it’s associated State Agent process.
  • The Worker continuously overwrites the state in the Agent (upon state changes). Only single-directional writes from Worker to Agent.
  • When the Agent is killed, the Supervisor creates a new Agent, and the state in that new Agent is overwritten by the running Worker (which always discovers which is the proper agent).
  • When the Worker is killed or crashes, the re-started Worker fetches the current state from the Agent.

It would be interesting to hear your opinions on that approach.

#
# lib/demo.ex
#
defmodule Demo do
  def demo do
    { :ok, sup1 } = WorkerSupervisor.start_link()
    { :ok, sup2 } = WorkerSupervisor.start_link(%{interval: 200, counter: -10 })

    [ sup1, sup2 ]
  end

  def show(supervisors) do
    supervisors
    |> Enum.map(fn (sup) ->
      sup
      |> WorkerSupervisor.get_counter()
      |> Integer.to_string()
    end)
  end
end

#
# lib/state.ex
#
defmodule State do
  use Agent

  def start_link(state, options \\ []) do
    Agent.start_link(fn -> state end, options)
  end
end

#
# lib/worker.ex
#
defmodule Worker do
  use GenServer

  def start_link(state = %{supervisor_pid: supervisor_pid}, opts \\ [])
      when is_pid(supervisor_pid) do
    GenServer.start_link(__MODULE__, state, opts)
  end

  def init(state) do
    #
    # After the worker is started, it needs to fetch current state from state Agent.
    #
    {:ok, state, {:continue, :post_init}}
  end

  def handle_continue(:post_init, state) do
    state =
      state
      |> WorkerSupervisor.get_agent_state()

    IO.puts("Worker #{inspect(self())} initialized. Supervisor #{inspect(state.supervisor_pid)}. Initial count #{state.counter}")

    self()
    |> Process.send_after(:tick, state.interval)

    {:noreply, state}
  end

  def handle_info(:tick, state) do
    state =
      state
      |> Map.update!(:counter, &(&1 + 1))

    state
    |> WorkerSupervisor.set_agent_state()

    self()
    |> Process.send_after(:tick, state.interval)

    {:noreply, state}
  end
end

#
# lib/worker_supervisor.ex
#
defmodule WorkerSupervisor do
  use Supervisor

  defp get_child_pid(supervisor_pid, child_type)
       when is_pid(supervisor_pid) and child_type in [State, Worker] do
    supervisor_pid
    |> Supervisor.which_children()
    |> Enum.filter(fn {type, _pid, :worker, _} ->
      case type do
        ^child_type -> true
        _ -> false
      end
    end)
    |> hd()
    |> elem(1)
  end

  def get_state_pid(supervisor_pid), do: supervisor_pid |> get_child_pid(State)
  def get_worker_pid(supervisor_pid), do: supervisor_pid |> get_child_pid(Worker)

  defp kill_child(supervisor_pid, child_type)
       when is_pid(supervisor_pid) and child_type in [State, Worker] do
    supervisor_pid
    |> get_child_pid(child_type)
    |> Process.exit(:kill)
  end

  def kill_worker(supervisor_pid), do: supervisor_pid |> kill_child(Worker)
  def kill_state(supervisor_pid), do: supervisor_pid |> kill_child(State)

  def get_interval(supervisor_pid) when is_pid(supervisor_pid) do
    supervisor_pid
    |> get_state_pid()
    |> Agent.get(& &1.interval)
  end

  def get_counter(supervisor_pid) when is_pid(supervisor_pid) do
    supervisor_pid
    |> get_state_pid()
    |> Agent.get(& &1.counter)
  end

  def get_agent_state(%{supervisor_pid: supervisor_pid}) do
    supervisor_pid
    |> get_state_pid()
    |> Agent.get(& &1)
  end

  def set_agent_state(worker_state = %{supervisor_pid: supervisor_pid}) do
    supervisor_pid
    |> get_state_pid()
    |> Agent.update(fn _ -> worker_state end)
  end

  def start_link(state \\ %{interval: 1_000, counter: 0}) do
    Supervisor.start_link(__MODULE__, state)
  end

  @impl true
  def init(initial_state = %{interval: _, counter: _}) do
    supervisor_pid = self()

    children = [
      {State, initial_state |> Map.put(:supervisor_pid, supervisor_pid)},
      {Worker, %{supervisor_pid: supervisor_pid}}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end
1 Like

In my opinion, you are not far from a perfect solution, but there is room for improvement.
a) agent can exist when the worker dies, but worker can’t exist without the agent: this begs for :rest_for_one strategy
b) extracting the agent PID every time seems a little bit inefficient, it would be better to get the Agent PID once and store in a worker as its state.

A way to perform b) is this:

defmodule Worker do
  use GenServer

  def start_link() do
    state = %{supervisor_pid: self()} #note that self() is pid supervisor because start_link is in its context
    GenServer.start_link(__MODULE__, state, opts)
  end

  def init(state) do
    {:ok, state, {:continue, :post_init}}
  end

  def handle_continue(:post_init, state) do
    agent_pid = WorkerSupervisor.get_agent_pid(state.supervisor_pid)
    counter = Agent.get(agent_pid, & &1)
    {:noreply, %state{agent_pid: agent_pid, counter: counter}}
  end

  def handle_info(:tick, state) do
    state.agent_pid
    State.set_state(state.counter)
  end

  ...
end

defmodule State do
  use Agent

  def start_link ... #same as before

  def set_state(pid, counter), do: Agent.update(fn _ -> counter end)
end 

I hope you get what I mean :slight_smile: I would leave Supervisor alone and put the logic inside Worker and State. The public API would be in the Worker module.

There are of course other options. You’ve assumed that HTTP calls are potentially crashing, but most HTTP libraries don’t throw exceptions but return {:error, reason}. It might be OK to keep the state and work in one process as @idi527 suggested. It is also very common to keep state in ets tables as @peerreynders suggested and usually those are created in the supervisor.

1 Like

Just to be clear, I suggested still crashing the http requests, but do it in a supervised non-linked task (or stream if there are multiple requests), so that the calling process that keeps the state doesn’t get linked and crash with the failing http request, but we still get the nice error message from the exception with the stacktrace.

# adapted the demo snippet from the docs for Task.Supervisor

  # etc ...
  def handle_info({:http_get, url} %{tasks: tasks} = state) do
    task =
      Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
        HTTPoison.get!(url) # <-- still using the `!` version to crash when appropriate
      end)

    {:reply, :ok, %{state | tasks: Map.put(tasks, task.ref, %{task: task, url: url})}}
  end

  # The task completed successfully
  def handle_info({ref, %HTTPoison.Response{body: _body}}, %{tasks: %{ref => %{url: url}} = tasks} = state) do
    # do something with the http reply
    Process.demonitor(ref, [:flush])
    {:noreply, %{state | tasks: Map.delete(tasks, ref)}}
  end

  def handle_info({ref, _}, state) do
    # "response" from a "stray" task? Not sure when this can happen
    {:noreply, state}
  end

  # The task failed
  def handle_info({:DOWN, ref, :process, _pid, _reason}, %{tasks: %{ref => %{url: url}} = tasks} = state) do
    # Log and possibly restart the task... (we still have the url for the request)
    {:noreply, %{state | tasks: Map.delete(tasks, ref)}}
  end

  def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
    # I sometimes get the DOWN message even though the process has been demonitored, probably some internal race condition
    {:noreply, state}
  end

  # etc ...
1 Like

Thanks @tomekowal for the insights. Totally get the b) topic on Worker.start_link/1 running in the context of the WorkerSupervisor, and therefore somehow bringing the self() call ‘down’.

Regarding your a) suggestion to use a :rest_for_one strategy, and that the worker can’t exist without the agent: In principle I understand that the worker needs an agent to store it’s state. However, the implementation currently has an interesting quirk: The worker doesn’t store the Agent's pid, but always asks the parent Supervisor for the agent’s pid. In a situation where the worker is humming along, it has an up-to-date copy of the current state in the worker. If the Agent now crashes, the Supervisor re-starts a fresh (empty) agent. In the next ‘save’ operation (from the worker to the agent), the latest state get’s ‘replicated’ from the worker to the agent. So by keeping the worker running when the agent dies, I still have a copy of the current state.

Does that make sense?

Thanks (!) @idi527 for the feedback. I need to look deeper into the behavior of my HTTP client (Tesla with :ibrowse). In my current Azure storage SDK code, I’ve integrated support for using Fiddler (Windows HTTP proxy) for debugging my calls. When Fiddler isn’t listing on 127.0.0.1:8080, the HTTP calls brought the process down, no friendly tuples and stuff :slight_smile: . Potentially I’m doing something wrong over there


This prototype here is also for me to learn how to properly leverage the OTP pieces properly, sorry if I mix too many things together.

This prototype here is also for me to learn how to properly leverage the OTP pieces properly, sorry if I mix too many things together.

I have a demo repo with different tasks crashing without bringing down the caller. Hope it might help.

1 Like

I understand what you’re writing, but I don’t think it makes sense :wink: The worker can fail because it has some work to do, that is complex and can crash. Why would Agent with state crash? If the only functions are set_state and get_state, it doesn’t do any computations. The only interactions it has, are with the Worker. If the Agent crashed, that means that Worker did something terrible to it and potentially has a bad state. You don’t want to copy that state back. You’d instead restart both.

I guess the idea of guarding state in both processes comes from a wrong mode of thinking: “what happens if one of the processes randomly crashes?”. Processes don’t crash randomly. They crash when they are either doing complex work that can produce an unexpected/unhandled result or use external resources like files or network.

The second rule of thumb is that “computation is cheap, but data is sacred”. It is usually better to wipe out potentially broken state than to let the error propagate. Your state process might guard itself against the wrong state by using an interface that crashes the worker process. E.g.

def set_state(pid, %{counter: counter, interval: interval} = state) when is_integer(counter) and is_integer(interval) do
  Agent.update(pid, fn _ -> state end)
end

You can do other checks before calling Agent, and it will potentially crash worker guarding the data.

I believe a lot of fault tolerance in the BEAM stems from not letting the error propagate. You would continue with the initial state if something went wrong.

Let’s look at those two solutions in the light of the use case from the first post: authentication.
In the solution when crashing the Agent starts with a clean state the device will need to reauthenticate. It is an inconvenience but not a big deal.
In the solution that copies state back and forth, the crash could happen because the worker was poorly implemented and did something strange to the state. It then propagates that error back to the agent, and you can potentially authenticate someone else or let anyone connect or end up in a dangerous security breach.

To sum up: I don’t think it is a good idea to get the state back from the worker when an agent crashes.

2 Likes

Ultimately that is my beef with Agents - their state is entirely at the mercy of the functions that are sent to them by other processes. At least with a GenServer it can be easily enforced that interactions with the state are simple.


Aside: In Programming Elixir 1.3 - Chapter 18: OTP Supervisors

This tree structure was used:

Supervisor
|__ Stash
|__ SubSupervisor
    |__ Server

https://media.pragprog.com/titles/elixir13/code/otp-supervisor/2/sequence/lib/sequence.ex
https://media.pragprog.com/titles/elixir13/code/otp-supervisor/2/sequence/lib/sequence/supervisor.ex
https://media.pragprog.com/titles/elixir13/code/otp-supervisor/2/sequence/lib/sequence/stash.ex
https://media.pragprog.com/titles/elixir13/code/otp-supervisor/2/sequence/lib/sequence/sub_supervisor.ex
https://media.pragprog.com/titles/elixir13/code/otp-supervisor/2/sequence/lib/sequence/server.ex

In Programming Elixir 1.6 that was replaced with a :rest_for_one strategy on

Supervisor
|__ Stash
|__ Server

https://media.pragprog.com/titles/elixir16/code/otp-supervisor/2/sequence/lib/sequence/application.ex

The primary issue with the server code was that the stash was only updated in the terminate callback which won’t always run. The backing store needs to be updated whenever we are certain that the new state is consistent.

3 Likes

Great feedback, that was what I was looking for. I must admit I’m still wrapping my head around the intricacies of how actor-based systems should behave. When I read it in a book, it all makes sense, but the books always describe the happy path. The thorough feedback here in the Elixir Forum is awesome.

Thanks, everybody!

1 Like

I really think that you want to use names here instead of always going through the supervisor. This removes a bottleneck from your system and makes it easier to reason about crashes. I also want to add some nuance to what @tomekowal is saying about state.

Processes absolutely can crash for no reason. This is typically do to a supervisor being restarted due to an unrelated crash. For instance your worker and agent may be working fine but their supervisor is managed by a supervisor with an all_for_one strategy. If one of your supervisors siblings crashes then your worker and agent will be restarted as well. This is pretty rare but is worth keeping in mind.

But @tomekowal’s main point is correct. Most of the time a process will crash because it ends up in a bad state. When that happens it’s better to just allow the process to crash and come back in a good state.

The main issue with the way you’ve built your system currently is that the worker is responsible for pushing state into the agent. What that means is that if your worker state is bad than you’ll push bad state into the agent, and the agent will crash. This will continue happening repeatedly because the bad state is never being cleaned up. We aren’t allowing the agent to come back up in a known good state. These kinds of bugs crop up all the time especially when people intermingle persistence with their process state. The bad state is persisted somewhere, process crashes, process restarts and loads data into memory, next message it crashes again, etc.

What I try to do is to isolate my state to a single process as much as possible. I then send commands to that process and allow that process to update its own internal state. If something gets into a bad state the crash will be isolated and I can restart in a good state.

Here’s how I would re-write what you have so far: https://gist.github.com/keathley/36e536e7b55d7444752bd29fb8d25d4d

5 Likes

I’d just like to mention a good blog post on state isolation. It introduces the idea of an “error kernel”:

Erlang programs have a concept called the error kernel . The kernel is the part of the program which must be correct for its correct operation. Good Erlang design begins with identifying the error kernel of the system: What part must not fail or it will bring down the whole system? Once you have the kernel identified, you seek to make it minimal. Whenever the kernel is about to do an operation which is dangerous and might crash, you “outsource” that computation to another process, a dumb slave worker. If he crashes and is killed, nothing really bad has happened - since the kernel keeps going.

5 Likes