Questions about saving chat messages to database. Failed inserts? Spawning? More Efficient Way?

The spawn/1 function call will create a new process and the supplied function will run in that process and the rest of the logic in handle_in will continue concurrently.

From the blog post, this claims to run things faster, and that is true in terms of just the handle_in function as it would not have to wait for the message to be persisted. I would not use the method in a production system.

Firstly, a pure spawn function call will create an unlinked and unmonitored process. This means if that process dies, hangs or fails to save the message there is no-one there to know about it. It also sound like you would want every message to be persisted. Or at least try for a bit before giving up and spawn will not give you those guarantees.

Secondly, if there is a lot of messages you will be spawning lots of concurrent process. Each concurrent process will try to write to your database at the same time and I can foresee this being a bottleneck and this concurrency needs to be better managed.

In terms of architecture, I would (instead of the spawn) have a queue accepting all the messages and then writing to the database in batches. If all messages must be persisted then the queue should be a persistent queue and the calling process (handle_in in this case) should be synchronous.

To get things going, you could start with a prototype having the queue as a GenServer.

defmodule MyQueue do

  require Logger

  @interval :timer.seconds(5)

  # External API
  #
  def save_message(msg) do
    GenServer.call(__MODULE__, {:save, msg})
  end

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

# Callbacks
  def init(_) do
    state = [] # State in this case is a list of messages
    :timer.send_interval(@interval, __MODULE__, :flush_messages)
    {:ok, state}
  end

  def handle_call({:save, msg}, _from, state) do
    {:reply, :ok, [msg | state]}
  end

  def handle_info(:flush_messages, state) do
    case Db.insert_all(state) do
      {:ok, _} ->
        Logger.info("Saving to database")
        {:noreply, []} # Clear the messages
      {:error, _msg} ->
        Logger.error("Failed saving to database")
        {:noreply, state} # Keep messages in here and try again later
    end
  end
end

Notes regarding the GenServer:

  • The GenServer is a singleton, meaning all calls to it are serialized.
  • The handle_info call is blocking the database call. While this function is running the GenServer will not accept any new messages (one message at a time (see above)). The actual writing to the database may be delegated to another process so that the queue can accept more messages while waiting. This requires more code and coordination.
  • The save_message is handled in a handle_call. This is a synchronous call which should be the default when dealing with GenServers. If this was done using handle_cast then it is asynchronous but have the problem that the GenServer’s mailbox can easily be overloaded in case of a large number of messages coming in whilst it is writing to the database.
  • It uses :timer.send_interval, meaning that it will try to save messages every second. An alternative approach is to use :timer.send_after instead and you can manually start a new timer after every database write. The difference is that the first approach will run every second. The second approach will run every second + the amount of time it takes for the database call to complete. Both are useful depending on use case.
  • The Queue can be improved by either writing every X seconds, or if the number of messages are larger than a specific value.
  • If persistence is needed, you may want to store every messages in an interim persistent queue before writing to the database. For example you could use disk_log for this, or perhaps even mnesia. Again, more code, more coordination and more things that can go wrong. It is all about trade-off.
  • The GenServer should be hooked up to your applications supervision tree so that if it fails it gets restarted (but with current implementation it loses its current message queue if that would happen. Again trade-offs).

Once the queue is done you can do this in your handle_in

 def handle_in("new_msg", %{"body" => body}, socket) do
    if (socket.assigns.is_anon == 0) do
      IO.inspect("user is logged in - send message")
      :ok = MyQueue.save_message(%{body: body, userid: String.to_integer(socket.assigns.user_id), roomid: socket.topic})
      broadcast!(socket, "new_msg", %{body: body})
      {:noreply, socket}
    else
      IO.inspect("user is anonymous - do not send message")
      {:noreply, socket}
    end
  end

This looks like sequential code and the saving of the messages have been abstracted away.

One important thing here is to abstract away he MyQueue implementation. Then you can easily swap it out for another implementation later or use a different one for testing. (MyQueue might even be a bad name as queue in this case is an implementation detail, but naming things is hard)

5 Likes