Hi,
I created a phoenix application which has one websocket API that takes the data and pushes it to rabbitmq. Here is the code:
worker.ex (the worker (genserver) that starts on application start and pushes the data to rabbitmq whenever it’s publish
method is called.)
defmodule CflogsWeb.Worker do
use GenServer
require Logger
def start_link do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "logs", durable: true)
{:ok, %{channel: channel, connection: connection} }
end
def publish(data) do
GenServer.cast(__MODULE__, {:publish, data})
end
def handle_cast({:publish, message}, state) do
Logger.debug("Handling cast....")
AMQP.Basic.publish(state.channel, "", "logs", message)
Logger.debug("Handled cast")
{:noreply, state}
end
def terminate(_reason, state) do
AMQP.Connection.close(state.connection)
end
end
application.ex (phoenix main)
defmodule Cflogs.Application do
use Application
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
def start(_type, _args) do
import Supervisor.Spec
# Define workers and child supervisors to be supervised
children = [
# Start the endpoint when the application starts
supervisor(CflogsWeb.Endpoint, []),
worker(CflogsWeb.Worker, [])
# Start your own worker by calling: Cflogs.Worker.start_link(arg1, arg2, arg3)
# worker(Cflogs.Worker, [arg1, arg2, arg3]),
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Cflogs.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
def config_change(changed, _new, removed) do
CflogsWeb.Endpoint.config_change(changed, removed)
:ok
end
end
log_channel.ex (that takes the data received from the client and calls the worker to send the data to rabbitmq)
defmodule CflogsWeb.LogChannel do
use Phoenix.Channel
def join("log:fe", _messsage, socket), do: {:ok, socket}
def join("log" <> _anything_else, _params, _socket), do: {:error, %{reason: 'Unauthorized'}}
def handle_in("log", %{"body" => body}, socket) do
Poison.encode!(body)
|> CflogsWeb.Worker.publish
# broadcast! socket, "new_msg", %{body: body}
{:noreply, socket}
end
end
All this is working fine. But when the no. of web-socket connection increases, I want to able to start many worker processes that can push data. Can I achieve that in the current setup? Or is there any better way to go about this?