Hi,
I want to create a websocket API that pushes all the messages it receives to rabbitmq. To push the message, I created a genserver worker that already establishes the connection on init. I then cast the messages in the websocket view. I am worried that when there are too many ws connections sending the message simultaneously, the since genserver process would be overloaded i.e it’s process mailbox would be full.
worker.ex
defmodule CflogsWeb.Worker do
use GenServer
require Logger
def start_link do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "logs", durable: true)
{:ok, %{channel: channel, connection: connection} }
end
def publish(data) do
GenServer.cast(__MODULE__, {:publish, data})
end
def handle_cast({:publish, message}, state) do
Logger.debug("Handling cast....")
AMQP.Basic.publish(state.channel, "", "logs", message)
Logger.debug("Handled cast")
{:noreply, state}
end
def terminate(_reason, state) do
AMQP.Connection.close(state.connection)
end
end
log_channel.ex
defmodule CflogsWeb.LogChannel do
use Phoenix.Channel
def join("log:fe", _messsage, socket), do: {:ok, socket}
def join("log" <> _anything_else, _params, _socket), do: {:error, %{reason: 'Unauthorized'}}
def handle_in("new_msg", %{"body" => body}, socket) do
broadcast! socket, "new_msg", %{body: body}
{:noreply, socket}
end
def handle_in("log", %{"body" => body}, socket) do
Poison.encode!(body)
|> CflogsWeb.Worker.publish
# broadcast! socket, "new_msg", %{body: body}
{:noreply, socket}
end
end
I am using tsung to replicate mutiple ws connections and sending messages. Here is the config
tsung.xml
<?xml version="1.0"?>
<!DOCTYPE tsung SYSTEM "/usr/local/Cellar/tsung/1.7.0/share/tsung/tsung-1.0.dtd" [] >
<tsung loglevel="info">
<clients>
<client host="localhost" use_controller_vm="true" maxusers="7000"/>
</clients>
<servers>
<server host="localhost" port="4000" type="tcp"></server>
</servers>
<load>
<arrivalphase phase="1" duration="10" unit="minute">
<users maxnumber="7000" arrivalrate="500" unit="second" />
</arrivalphase>
</load>
<options>
<option name="websocket_path" value="/socket/websocket"></option>
</options>
<sessions>
<session name="websocket" probability="100" type="ts_websocket">
<request>
<websocket type="connect" path="/socket/websocket"></websocket>
</request>
<request subst="true">
<websocket type="message">
{"topic":"log:fe", "event":"phx_join", "payload": {"event":"payload"}, "ref":"1"}
</websocket>
</request>
<request subst="true">
<websocket type="message">
{"topic":"log:fe", "event":"log", "payload": {"body": {"hello": "world"}}, "ref":"1"}
</websocket>
</request>
<thinktime min="2" max="10" random="true"></thinktime>
</session>
</sessions>
</tsung>
I am running the application by running these commands:
iex -S phx.server
:observer.start
I am using :observer.start
, so that I can monitor the worker process’s mailbox. However when I am run tsung start
and in the :observer.start
window see the worker process’s details, I am unable to see any increase in memory or any items in the messages section
Is there something I am doing wrong? How to check and see whenever a process’s mailbox is overloaded and it starts dropping messages?