Monitoring messages in process mailboxes

I am wondering for advice on how to monitor an application with many genservers doing long-running work. Often we want to know “how many messages are in the queue?” The process has a mailbox not a queue but this is the same idea : how can we see what our app is doing? I have seen GenStage.estimate_buffered_count/2 and I am wondering if that is the best way. There is a mix of genserver and genstage, so I thought maybe Process.info/1 could help with genservers, but I noticed that this function seems to require a pid and it will not work with a process name. Also I do not see the messages in the process info? Only message_queue_len?
Another idea was because we are using pubsub, maybe we could add subscribers that would count messages but that feels maybe smelly.

Thank you for suggestions!

To find a pid by process name we could check Process.whereis(:name)

And now we could see the messages via Process.info(pid, :messages), but only be careful, as this operation copies all messages to the process that called. So if mailbox is huge - the operation might be expensive.

2 Likes

A way to get all ‘monitor-able’ processes is to register them in a process group under the :pg module at startup. Then you’ll have a list of pids to call Process.info/1 on.

:pg.join(:message_length_procs, self())

2 Likes

If you need to do that kind of stuff a general patter is to do the actual work in a side process of the gen server (a Task process managed by the gen server), while the gen server just accept messages and put them in a queue (with the :queue module). So you can do whaterver you want with the queue : inspect it, filter it, or use another queue with priorization, etc.

1 Like

A simple way of of finding the length of the message queue is Process.info(pid, :message_queue_len) which avoids copying the whole message queue. Only having :message)_queue_len in the default info is to NOT copy all the messages unless you explicitly request it.

11 Likes

Thank you! These are helpful suggestions. I think I can make this work with Process.info(pid, :message_queue_len) – I will look at the :pg module and :queue module to help make a better solution

Now that I am trying to make this work a bit more, I am not finding success. I have tried to simplify this by starting a genserver and sending messages to it using send/2, but I never see messages and message_queue_len is always zero.

Could you please show the code you try?

Here is the simple example in iex shell:

iex(1)> Process.info(self, :message_queue_len)
{:message_queue_len, 0}
iex(2)> send(self, :hello)
:hello
iex(3)> Process.info(self, :message_queue_len)
{:message_queue_len, 1}
iex(4)> Process.info(self, :messages)
{:messages, [:hello]}

Using self() works – I tried something a little more complicated where I named a GenServer like this:

defmodule Genfoo.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      Supervisor.child_spec(
        {Genfoo.Thing, %{name: :thing1}},
        id: :thing1
      )
    ]

    opts = [strategy: :one_for_one, name: Genfoo.Supervisor]
    Supervisor.start_link(children, opts)
  end
end


defmodule Genfoo.Thing do
  @moduledoc false
  use GenServer

  def start_link(%{name: name} = state) do
    GenServer.start_link(__MODULE__, state, name: name)
  end

  @impl true
  def init(state) do
    {:ok, state}
  end

  @impl true
  def handle_info(_msg, state) do
    IO.puts("handle_info")
    {:noreply, state}
  end

  @impl true
  def handle_call(_msg, _from, state) do
    IO.puts("handle_call")
    {:reply, "Called", state}
  end

  @impl true
  def handle_cast(_msg, state) do
    IO.puts("handle_cast")
    {:noreply, state}
  end
end

Then I tried this:

pid = Process.whereis(:thing1)
send(pid, "xx")
:thing1 |> Process.whereis() |> Process.info(:messages)

Sorry I must be doing something wrong. I expected to be able to inspect the named genserver process.

As soon as your process successfully fetches the message from mailbox (which it does in your handle_info) - it’s not there anymore. As a result your mailbox gets emptied out pretty fast =)


to continue showing in the simple example in iex

iex(1)> Process.info(self, :message_queue_len)
{:message_queue_len, 0}
iex(2)> send(self, :hello)
:hello
iex(3)> Process.info(self, :message_queue_len)
{:message_queue_len, 1}
iex(4)> Process.info(self, :messages)
{:messages, [:hello]}

# fetch the first message from the message_queue
iex(5)> receive do
...(5)>   any_message -> any_message
...(5)> end
:hello

# now message_queue is empty
iex(6)> Process.info(self, :message_queue_len)
{:message_queue_len, 0}
iex(7)> Process.info(self, :messages)
{:messages, []}

I see. I guess I am not fully understanding how handle_info processes a message.
I added Process.sleep(5000) to handle_info and I think I understand how this works now.

It is not really the handle_info callback which processes the actual message. The GenServer has a top-loop which sits and receives messages arriving at the process. When a message arrives it checks the message format and and calls the relevant callback to handle the message. When the callback returns it goes back into the top-loop and waits for and then processes the next message aand then waits for the next one and then … . In your case it calls the handle_info callback with the message.

This means that there is no real buffering of messages in the GenServer message queue so checking its length will generally return a small number. The only time the queue can become long is if the GenServer becomes overloaded.

3 Likes

Thank you for the explanation! This is making sense as I study it.

This is worth reading : [erlang-questions] The gen server simplified (how it works)

I made an Elixir version of the mini genserver for you, with small changes to demonstrate how handle_info works:

defmodule MiniGs do
  def start_link(module, arg) do
    pid = spawn_link(fn -> start(module, arg) end)
    {:ok, pid}
  end

  def call(pid, request, timeout \\ 5000) do
    ref = make_ref()
    send(pid, {:minigs_call, self(), ref, request})

    receive do
      {^ref, reply} -> reply
    after
      timeout -> exit({:timeout, {MiniGs, :call, [pid, request, timeout]}})
    end
  end

  defp start(module, arg) do
    {:ok, state} = module.init(arg)
    loop(module, state)
  end

  defp loop(module, state) do
    receive do
      {:minigs_call, from, ref, request} ->
        {:reply, reply, new_state} = module.handle_call(request, from, state)
        send(from, {ref, reply})
        loop(module, new_state)

      other_message ->
        {:noreply, new_state} = module.handle_info(other_message, state)
        loop(module, new_state)
    end
  end
end

defmodule Stack do
  def start_link(base) when is_list(base) do
    MiniGs.start_link(__MODULE__, base)
  end

  def push(pid, item) do
    :ok = MiniGs.call(pid, {:push, item})
  end

  def pop(pid) do
    MiniGs.call(pid, :pop)
  end

  def init(base) do
    {:ok, _stack = base}
  end

  def handle_call({:push, item}, _from, stack) do
    {:reply, :ok, [item | stack]}
  end

  def handle_call(:pop, _from, [item | stack]) do
    {:reply, {:ok, item}, stack}
  end

  def handle_call(:pop, _from, []) do
    {:reply, {:error, :empty}, []}
  end

  def handle_info(info, stack) do
    IO.puts("got info: #{inspect(info)}")
    {:noreply, stack}
  end
end

{:ok, pid} = Stack.start_link([])
:ok = Stack.push(pid, 1)
:ok = Stack.push(pid, 2)
:ok = Stack.push(pid, 3)
{:ok, 3} = Stack.pop(pid)
{:ok, 2} = Stack.pop(pid)
{:ok, 1} = Stack.pop(pid)
send(pid, "hello")
{:error, :empty} = Stack.pop(pid)

As an exercise, you could try to implement the possibility to return timeout as in {:reply, reply, state, timeout} and the mini gs would call module.handle_info(:timeout, state) :smiley:

1 Like

Wowwwwww, this is very much educational! Thank you for the very thorough reply! I appreciate your efforts to educate me!

1 Like