How do I check the no. of mailbox entries for a genserver process

Hi,

I want to create a websocket API that pushes all the messages it receives to rabbitmq. To push the message, I created a genserver worker that already establishes the connection on init. I then cast the messages in the websocket view. I am worried that when there are too many ws connections sending the message simultaneously, the since genserver process would be overloaded i.e it’s process mailbox would be full.

worker.ex

defmodule CflogsWeb.Worker do
  use GenServer

  require Logger

  def start_link do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Queue.declare(channel, "logs", durable: true)
    {:ok, %{channel: channel, connection: connection} }
  end

  def publish(data) do
    GenServer.cast(__MODULE__, {:publish, data})
  end

  def handle_cast({:publish, message}, state) do
    Logger.debug("Handling cast....")
    AMQP.Basic.publish(state.channel, "", "logs", message)
    Logger.debug("Handled cast")

    {:noreply, state}
  end

  def terminate(_reason, state) do
    AMQP.Connection.close(state.connection)
  end

end

log_channel.ex

defmodule CflogsWeb.LogChannel do
  use Phoenix.Channel
  def join("log:fe", _messsage, socket), do: {:ok, socket}
  def join("log" <> _anything_else, _params, _socket), do: {:error, %{reason: 'Unauthorized'}}

  def handle_in("new_msg", %{"body" => body}, socket) do
    broadcast! socket, "new_msg", %{body: body}
    {:noreply, socket}
  end

  def handle_in("log", %{"body" => body}, socket) do
    Poison.encode!(body)
    |> CflogsWeb.Worker.publish

    # broadcast! socket, "new_msg", %{body: body}
    {:noreply, socket}
  end
end

I am using tsung to replicate mutiple ws connections and sending messages. Here is the config

tsung.xml

<?xml version="1.0"?>
<!DOCTYPE tsung SYSTEM "/usr/local/Cellar/tsung/1.7.0/share/tsung/tsung-1.0.dtd" [] >
<tsung loglevel="info">
    <clients>
      <client host="localhost" use_controller_vm="true" maxusers="7000"/>
    </clients>

    <servers>
        <server host="localhost" port="4000" type="tcp"></server>
    </servers>

    <load>
        <arrivalphase phase="1" duration="10" unit="minute">
            <users maxnumber="7000" arrivalrate="500" unit="second" />
        </arrivalphase>
    </load>

    <options>
        <option name="websocket_path" value="/socket/websocket"></option>
    </options>

    <sessions>
        <session name="websocket" probability="100" type="ts_websocket">
            <request>
                 <websocket type="connect" path="/socket/websocket"></websocket>
            </request>

            <request subst="true">
                <websocket type="message">
                    {"topic":"log:fe", "event":"phx_join", "payload": {"event":"payload"}, "ref":"1"}
                </websocket>
            </request>

            <request subst="true">
                <websocket type="message">
                    {"topic":"log:fe", "event":"log", "payload": {"body": {"hello": "world"}}, "ref":"1"}
                </websocket>
            </request>

            <thinktime min="2" max="10" random="true"></thinktime>
        </session>
    </sessions>
</tsung>

I am running the application by running these commands:

iex -S phx.server
:observer.start

I am using :observer.start, so that I can monitor the worker process’s mailbox. However when I am run tsung start and in the :observer.start window see the worker process’s details, I am unable to see any increase in memory or any items in the messages section

Is there something I am doing wrong? How to check and see whenever a process’s mailbox is overloaded and it starts dropping messages?

Maybe Process.info(pid, :message_queue_len) would help.

and it starts dropping messages?

It probably won’t do that unless you add such functionality.

2 Likes

BEAM’s scheduler penalizes processes sending messages to processes with large mailboxes. This gives the target process a chance to catch up, so the mailbox can never increase indefinitely.

This may change in the future, though: [erlang-questions] Sender punishment removed

2 Likes

BEAM’s scheduler penalizes processes sending messages to processes with large mailboxes.

Does it cause the message to be dropped or postponed?

No, at least not as a result of the target mailbox being ‘full’ or ‘big’. It (currently) gradually throttles back the scheduler allocation for the sending process(es), after the message has been sent

1 Like

The process won’t drop any messages, but it certainly can be overloaded by messages and become a bottleneck of the system. If not monitored, this can be really bad because this problems tend to become even worse with time (for example: overloaded systems often starts to receive more requests as clients retry timeout requests).

Process.info(pid, :message_queue_len) will give you a message queue for a single process. Not that you should do this from different process because Process.info(self(), :message_queue_length) will, most of the times, give you false results.

In my opinion, a good practice if to monitor a sum of largest queues in the system. You can use recon library (https://github.com/ferd/recon) and do something like this periodically:

msg_queue_length = 
  :message_queue_len
  |> :recon.proc_count(50)
  |> Enum.map(&elem(&1, 1))
  |> Enum.sum

and then report this value to the metric service of your choice (and set up some monitors). If this value grows too much, it’s generally a good sign that something is wrong.

3 Likes