How to monitor/restart a dependent genserver producer

Hi, I’m trying to get my head around how to link/monitor and restart a couple of dependent genservers.

Imagine a simple application which

  • monitors a config file on disk.
  • when the file changes it is reloaded, parsed and an event message sent to interested listeners (simple producer)
  • So we need a FileSystem watcher keeping an eye on a file and sending us a message when it changes (simple consumer)
  • We need some state to hold our listeners
  • We need some state to hold the parsed file (and a call will return it)

I’m tempted to structure the app as:

defmodule Database.ConfigDB.Worker do
  use GenServer

  @doc false
  def start_link(args) do
    {opts, args} = Keyword.split(args, [:name])
    GenServer.start_link(__MODULE__, args, opts)
  end

  @doc false
  def init(args) do
    dirs = "/some/config/file"
    args = Keyword.put(args, :dirs, dirs)

    with {:ok, watcher_pid} <- FileSystem.start_link(args),
        :ok <- FileSystem.subscribe(watcher_pid) do
      {:ok, %{watcher_pid: watcher_pid, subscribers: %{}}}
    else
      :ignore -> {:stop, "failed to start filewatcher"}
    end
  end

  def handle_call(:subscribe, {pid, _}, state) do
    ref = Process.monitor(pid)
    state = put_in(state, [:subscribers, ref], pid)
    {:reply, :ok, state}
  end

  def handle_info(
        {:file_event, watcher_pid, {file_path, _events}},
        %{backend_pid: watcher_pid} = state
      ) do
    # reload, alert subscribers
    # ...
  end

So above I start the FileSystem watcher with start_link, so I think this means if it crashes then it’s going to also take out the ConfigDB.Watcher? This is undesirable because I’ve chosen to store some state in the genserver (only subscribers in the example, but lets pretend there is more). So I don’t want to lose this state, just to restart the file watcher

My first thought was to introduce a DynamicSupervisor, have that start my FileSystem and set that up within my init(args) call above. The tricky part seems to be that I need to call:
FileSystem.subscribe(watcher_pid)

So that seems like I need to monitor my FileSystem, notice if it goes down and then keep calling subscribe until it comes back up again? This seems like a problematic solution?

So my next thought is to return to the original setup, so things are linked:

Supervisor -> ConfigDB.Worker -> FileSystem

Then I need to move my state somewhere else. Since right now state is only a list of “send an alert to subscribers” there might even be a well worn path to handle this?

This seems like it should be a simple problem to solve and most of the options above seem subtle and tricky to get right. So what’s the generic (and simple) solution for:

  • Some simple producer genserver (could be tailing a file, reading a serial port, spooling tweets, etc)
  • Consumer needs to run some kind of simple setup when the producer starts, eg subscribe to it
  • Desired behaviour is if the producer falls over it should be a non critical event, just keep trying to restart it and run the setup (subscribe) so that we get events again.
  • It’s assumed that failures are rare and not persistent, eg file watcher segfaults, not worried about handling reading Twitter which has gone away for a few hours.

Thanks for advice

Would perhaps a GenStage based solution work for you? It has quite a bit of crash handling in place, and there are higher level abstractions available on top of it (flow, broadway, …)

Sure - however, that is quite heavy weight. I think my use of “consumer/producer” probably made this sound a bit more grand.

I guess imagine you just find some off the shelf library, it watches something or listens to telegram, tweets, etc.

Obviously things can get infinitely complex, I’m thinking here more just bugs that cause a process to quit, rather than dealing with extended outages and stuff which won’t start up.

So like in my case above, I’m positive there is some file existing, I just want to be notified when something happens to it. I grabbed a library which basically does all this, BUT, it needs to be “initialised” after startup, ie in this case I need to call "FileSystem.subscribe). How to keep these two processes in sync?

My further reflection is that:

App Supervisor -> Dynamic Supervisor -> FileSystem (temporary, ie no autorestart)
App supervisor -> ConfigDB
ConfigDB monitor -> FileSystem 

By monitoring the process, I will get a notification if it goes down. At that point I can then ask the DynamicSupervisor to restart_child/2 and this step looks roughly identical to the init/1 in my ConfigDb process.

Is this the right idiom to use with OTP processes?

Those are exactly the scenarios OTP excels in and are the reason why I abandoned working with Ruby, JS, PHP, Java, C/C++ and a few others.

However I can’t think off the top of my head for something ready-made for your needs. I suppose it’s deemed easy enough to do by yourself and not easily generalizable as a library so I suggest you give it a try and make your own small OTP management code – but what’s more likely is that you will just change policies and make the proper supervision tree for your needs.

I guess it depends on the exact use case… I’ve often used Flow to get things done where it would benefit and it’s just a few lines of code. Using raw gen_stage can indeed get more heavy on the implementation.

This is why start_monitor exists, as well as exit trapping. (more below)

Personally, I would:

  • Process.flag(:trap_exit, true) in ConfigDB
  • start_link the FileSystem process from ConfigDB
  • catch {:EXIT, from, reason} messages in ConfigDB.handle_info, and when the FileSystem object goes down, restart it there

Did you make a typo with “start_monitor”? I can’t find any reference to this in the docs (or with google)? Any further hints?

Your thought on the error trapping seems like a better solution. I think I will use that. Thanks!

I would be interested to see how you would use a super basic Flow module though? Do you have a micro example perhaps? Thankyou for replying!

Sorry, yes, I had GenServer on the brain :wink: It’s spawn_monitor

That’s the nice thing about flow: it looks a lot like regular function pipelining. You don’t need to write modules as with GenStage, it hides that behind wrapper functions. See the docs which have examples here, like:

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

Depending on what you are needing to accomplish, this API is sometimes enough.

2 Likes