Increasing workers dynamically based on load

Hi,

I created a phoenix application which has one websocket API that takes the data and pushes it to rabbitmq. Here is the code:

worker.ex (the worker (genserver) that starts on application start and pushes the data to rabbitmq whenever it’s publish method is called.)

defmodule CflogsWeb.Worker do
  use GenServer

  require Logger

  def start_link do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Queue.declare(channel, "logs", durable: true)
    {:ok, %{channel: channel, connection: connection} }
  end

  def publish(data) do
    GenServer.cast(__MODULE__, {:publish, data})
  end

  def handle_cast({:publish, message}, state) do
    Logger.debug("Handling cast....")
    AMQP.Basic.publish(state.channel, "", "logs", message)
    Logger.debug("Handled cast")

    {:noreply, state}
  end

  def terminate(_reason, state) do
    AMQP.Connection.close(state.connection)
  end

end

application.ex (phoenix main)

defmodule Cflogs.Application do
  use Application

  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  def start(_type, _args) do
    import Supervisor.Spec

    # Define workers and child supervisors to be supervised
    children = [
      # Start the endpoint when the application starts
      supervisor(CflogsWeb.Endpoint, []),
      worker(CflogsWeb.Worker, [])
      # Start your own worker by calling: Cflogs.Worker.start_link(arg1, arg2, arg3)
      # worker(Cflogs.Worker, [arg1, arg2, arg3]),
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Cflogs.Supervisor]
    Supervisor.start_link(children, opts)
  end

  # Tell Phoenix to update the endpoint configuration
  # whenever the application is updated.
  def config_change(changed, _new, removed) do
    CflogsWeb.Endpoint.config_change(changed, removed)
    :ok
  end
end

log_channel.ex (that takes the data received from the client and calls the worker to send the data to rabbitmq)

defmodule CflogsWeb.LogChannel do
  use Phoenix.Channel
  def join("log:fe", _messsage, socket), do: {:ok, socket}
  def join("log" <> _anything_else, _params, _socket), do: {:error, %{reason: 'Unauthorized'}}

  def handle_in("log", %{"body" => body}, socket) do
    Poison.encode!(body)
    |> CflogsWeb.Worker.publish

    # broadcast! socket, "new_msg", %{body: body}
    {:noreply, socket}
  end
end

All this is working fine. But when the no. of web-socket connection increases, I want to able to start many worker processes that can push data. Can I achieve that in the current setup? Or is there any better way to go about this?

1 Like

If I get it right You fear that CflogsWeb.Worker.publish would get overflooded in case of traffic increase? In that case could You handle this part outside of your worker?

Logger.debug("Handling cast....")
AMQP.Basic.publish(state.channel, "", "logs", message)
Logger.debug("Handled cast")

It seems You don’t really care about result of operation. So maybe a really simplistic solution would be to spawn a Task instead?

Task.async(fn ->
  Logger.debug("Handling cast....")
  AMQP.Basic.publish(state.channel, "", "logs", message) 
  Logger.debug("Handled cast")
end)

There a many solutions for managing load increase, some are more complicated than others.

1 Like

Yes, I can spawn a new task every time I push a message. But the downside to this solution is that it would create a new connection for every data push. I will need to create a new channel for every data push. And with the kind of application this is, I think that even a single websocket connection would push thousands of messages. Thus, opening a new connection for every message seems to me a bad idea.

Instead, Can I do something like (just off the top of my head), there is one process that is listening to the no. of open connections? And when the no. of connections reach a particular threshold. It spawns a new worker (worker.ex) that pushes the data to rabbitmq. And I can have another wrapper process that the socket view (log_channel.ex) uses that would call one of the available workers (instead of calling via the worker process itself) to push the data.

However I am not sure how to go about doing this. Also, I am wondering whether there is a simpler way.

Poolboy alows resizing queue in case of traffic increase… The mecanism is described in the little elixir otp guidebook. Although syntax has changed with Elixir 1.6.x

The idea is to have a certain number of workers, that can be allocated to a certain task.

If the load increase, the number of workers is adjusted.

2 Likes

BTW What do You mean by?

it would create a new connection for every data push

In fact with Task it would be

  def handle_cast({:publish, message}, state) do
    Task.async(fn ->
      Logger.debug("Handling cast....")
      AMQP.Basic.publish(state.channel, "", "logs", message) 
      Logger.debug("Handled cast")
    end)
    {:noreply, state}
  end

There would be one entry point, how You dispatch behind is your implementation detail.

Oh…I thought instead of having worker.ex, you meant I use Task.async(..) directly.

Anyways, I think the problem with this approach too is that a single connection is doing the data push. There will be huge lag when the no. of requests are substantially high, and the rabbitmq connection doing the data push is just one.

poolboy seems interesting though. I will read more about it and try it out. Thanks!

I think Rabbits Web STOMP plugin may be useful to you here. Instead of using Phoenix Channels to push to Rabbit, you can just push directly to rabbit.

Thanks for sharing. This looks interesting.

The only problem is that it works only with websocket-capable clients. This means that it won’t work with older browsers and safari. The advantage phoenix-channel gives it that it fallbacks to long polling if the client doesn’t support web-sockets.