Questions about saving chat messages to database. Failed inserts? Spawning? More Efficient Way?

I am planning on building multiple chat rooms with different names “room:lobby, room:1, room:2, etc”

In my room_channel.ex file, I set up something simple like this to save chat messages to the database:

 def handle_in("new_msg", %{"body" => body}, socket) do
    if (socket.assigns.is_anon == 0) do
      IO.inspect("user is logged in - send message")
      spawn(fn -> save_message(%{body: body, userid: String.to_integer(socket.assigns.user_id), roomid: socket.topic}) end)
      broadcast!(socket, "new_msg", %{body: body})
      {:noreply, socket}
    else
      IO.inspect("user is anonymous - do not send message")
      {:noreply, socket}
    end
  end

  defp save_message(message) do  # Use a schema called SaveChats here. Note: saved in schemas folder.
    Exchat.Repo.insert(%SaveChats{roomid: message.roomid, userid: message.userid, message: message.body})
  end

The first question: Does the spawn function to save_message look appropriate? I got this advice from an online guide: Adding a Persistence layer , so I don’t know how long ago this guide was written or if this is an outdated method or not. If there are millions of people chatting at once, I would like the database saves being done on the side, in it’s own process to prevent slowing down everything (as the guide stated).

Second question: As you can see from my spawn function, I am grabbing the userid from socket.assigns and converting it to an integer, and the roomid from socket.topic. Is this a solid way of getting this information, or am I supposed to send the roomid and userid from the client side along with the message? I’m using the socket.assigns.userid to prevent malicious users from injection fake userids from the client side. In my script, is socket.topic actually showing the roomid at the time the message is sent (accurate roomid every time?), or is it fixed to just one value when joining a room, or can it accumulate a list of room ids, or topics?

How would you obtain this info in your own app?

Last question: Once in awhile, the database can go down, or an insert might not execute (mysql server down, aborted connection, etc). Is there a way to save these “inserts” on the Phoenix server for later, and insert them when the database comes back up? When I first read the Elixir docs, I recall something about mailboxes that can store messages in the system for awhile. I was wondering if Phoenix had some simple way of utilizing this for MySQL - saving the failed queries so they can run later when the database is up again.

The spawn/1 function call will create a new process and the supplied function will run in that process and the rest of the logic in handle_in will continue concurrently.

From the blog post, this claims to run things faster, and that is true in terms of just the handle_in function as it would not have to wait for the message to be persisted. I would not use the method in a production system.

Firstly, a pure spawn function call will create an unlinked and unmonitored process. This means if that process dies, hangs or fails to save the message there is no-one there to know about it. It also sound like you would want every message to be persisted. Or at least try for a bit before giving up and spawn will not give you those guarantees.

Secondly, if there is a lot of messages you will be spawning lots of concurrent process. Each concurrent process will try to write to your database at the same time and I can foresee this being a bottleneck and this concurrency needs to be better managed.

In terms of architecture, I would (instead of the spawn) have a queue accepting all the messages and then writing to the database in batches. If all messages must be persisted then the queue should be a persistent queue and the calling process (handle_in in this case) should be synchronous.

To get things going, you could start with a prototype having the queue as a GenServer.

defmodule MyQueue do

  require Logger

  @interval :timer.seconds(5)

  # External API
  #
  def save_message(msg) do
    GenServer.call(__MODULE__, {:save, msg})
  end

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

# Callbacks
  def init(_) do
    state = [] # State in this case is a list of messages
    :timer.send_interval(@interval, __MODULE__, :flush_messages)
    {:ok, state}
  end

  def handle_call({:save, msg}, _from, state) do
    {:reply, :ok, [msg | state]}
  end

  def handle_info(:flush_messages, state) do
    case Db.insert_all(state) do
      {:ok, _} ->
        Logger.info("Saving to database")
        {:noreply, []} # Clear the messages
      {:error, _msg} ->
        Logger.error("Failed saving to database")
        {:noreply, state} # Keep messages in here and try again later
    end
  end
end

Notes regarding the GenServer:

  • The GenServer is a singleton, meaning all calls to it are serialized.
  • The handle_info call is blocking the database call. While this function is running the GenServer will not accept any new messages (one message at a time (see above)). The actual writing to the database may be delegated to another process so that the queue can accept more messages while waiting. This requires more code and coordination.
  • The save_message is handled in a handle_call. This is a synchronous call which should be the default when dealing with GenServers. If this was done using handle_cast then it is asynchronous but have the problem that the GenServer’s mailbox can easily be overloaded in case of a large number of messages coming in whilst it is writing to the database.
  • It uses :timer.send_interval, meaning that it will try to save messages every second. An alternative approach is to use :timer.send_after instead and you can manually start a new timer after every database write. The difference is that the first approach will run every second. The second approach will run every second + the amount of time it takes for the database call to complete. Both are useful depending on use case.
  • The Queue can be improved by either writing every X seconds, or if the number of messages are larger than a specific value.
  • If persistence is needed, you may want to store every messages in an interim persistent queue before writing to the database. For example you could use disk_log for this, or perhaps even mnesia. Again, more code, more coordination and more things that can go wrong. It is all about trade-off.
  • The GenServer should be hooked up to your applications supervision tree so that if it fails it gets restarted (but with current implementation it loses its current message queue if that would happen. Again trade-offs).

Once the queue is done you can do this in your handle_in

 def handle_in("new_msg", %{"body" => body}, socket) do
    if (socket.assigns.is_anon == 0) do
      IO.inspect("user is logged in - send message")
      :ok = MyQueue.save_message(%{body: body, userid: String.to_integer(socket.assigns.user_id), roomid: socket.topic})
      broadcast!(socket, "new_msg", %{body: body})
      {:noreply, socket}
    else
      IO.inspect("user is anonymous - do not send message")
      {:noreply, socket}
    end
  end

This looks like sequential code and the saving of the messages have been abstracted away.

One important thing here is to abstract away he MyQueue implementation. Then you can easily swap it out for another implementation later or use a different one for testing. (MyQueue might even be a bad name as queue in this case is an implementation detail, but naming things is hard)

5 Likes

spawn will certainly do that; under high load, instead of dropping connections because the DB is overwhelmed it will keep accepting messages and then lose them all when the system runs out of processes.

Whether this is preferable depends on your specific application. :man_shrugging:

2 Likes

It smells like premature optimization to me. For non-busy server this should not matter and one might as well save the messages in-process, which is much simpler. Under high load, weird things might happen, such as messages might be saved out-of-order.

2 Likes

This is a really awesome script and explanation, thanks a lot, I greatly appreciate it! I was able to learn quite a bit more about Elixir and GenServer today while trying to integrate it into my app. It’s exactly what I was looking for and it seems so lightweight and simple.

This script works great, except for a case error that crashes the script at every interval:

[error] GenServer ExchatWeb.SaveChatsQueue terminating
** (CaseClauseError) no case clause matching: {1, nil}
    (exchat 0.1.0) lib/exchat_web/genservers/savechatsqueue.ex:35: ExchatWeb.SaveChatsQueue.handle_info/2
    (stdlib 3.15.2) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.15.2) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.15.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :flush_messages
State: [%{message: "fasdfasdf", roomid: "room:lobby", timestamp: ~U[2021-11-24 03:00:54Z], userid: 299}]

[error] GenServer ExchatWeb.SaveChatsQueue terminating
** (CaseClauseError) no case clause matching: {0, nil}
    (exchat 0.1.0) lib/exchat_web/genservers/savechatsqueue.ex:35: ExchatWeb.SaveChatsQueue.handle_info/2
    (stdlib 3.15.2) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.15.2) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.15.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :flush_messages
State: []

It looks like the script attempts to insert into the database every 5 seconds, whether there is a message or not. It returns the number of inserts with a nil struct. I made a slight change to the script to skip the database call and case conditions if the state is empty []. What do you think about the result below? Is there a way to return {:noreply, _} and do nothing to the state, instead of {:noreply, []}, so it doesn’t empty the state in case a message comes in while this handle_info is running? Otherwise if I try using an underscore, I get an compilation error invalid use of _. "_" represents a value to be ignored in a pattern and cannot be used in expressions.

Also, I’m not sure how to fix the case matching error, as it seems like it happens even if the database call is successful. Here is my work so far:

defmodule ExchatWeb.SaveChatsQueue do
  # Details on how this works: https://elixirforum.com/t/questions-about-saving-chat-messages-to-database-failed-inserts-spawning-more-efficient-way/44027/2
  use GenServer
  require Logger

  @interval :timer.seconds(5)

  # External API
  #
  def save_message(msg) do
    GenServer.call(__MODULE__, {:save, msg})
  end

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

# Callbacks
  def init(_) do
    state = [] # State in this case is a list of messages
    :timer.send_interval(@interval, __MODULE__, :flush_messages)
    {:ok, state}
  end

  def handle_call({:save, msg}, _from, state) do
    {:reply, :ok, [msg | state]}
  end

  def handle_info(:flush_messages, []) do
    Logger.info("Nothing to save - Do nothing")
    {:noreply, []}
  end

  def handle_info(:flush_messages, state) do
    case Exchat.Repo.insert_all(SaveChats,state) do
      {:ok, _} ->
        Logger.info("Saving to database")
        {:noreply, []} # Clear the messages
      {:error, _msg} ->
        Logger.error("Failed saving to database")
        {:noreply, state} # Keep messages in here and try again later
    end
  end
end

Lastly, how would you delegate the database call to another process in this example and keep the state accurate? This is definitely important, because we will want to continue accepting messages while the database is inserting previous ones, which can certainly take a long time if there is some kind of database slowdown happening.

As you’ve noticed, this won’t work - Repo.insert_all returns a tuple shaped like {integer(), term()}.

If you want to return the previous state unchanged, write that: {:noreply, state}. There’s no special syntax because there’s no need for any.

Keep in mind that messages are placed into the process mailbox if they are received while handle_info is running; the GenServer is the only process that can change state.

“Accurate” gets blurrier the more interacting processes you have - yay distributed systems - so you want to avoid that sort of thing for as long as possible.

Let’s imagine the behavior of two systems under heavy database load:

  • a GenServer that does insert_all before updating state. If DB writes are slow, its mailbox accumulates messages that haven’t been written yet. Uses one process and one database connection.

  • a GenServer that spawns to call insert_all. If DB writes are slow, it spawns more and more processes. Uses as many processes and database connections as are available.

Extreme overload in the first case will abort the GenServer and lose the messages, but then recover. Extreme overload in the second case will exhaust all system resources before crashing the entire BEAM.

I think I might understand how the case matching works a little better. I’m still new at Elixir, so I’m still figuring things out. This is what I have so far:

  def handle_info(:flush_messages, []) do
    Logger.info("Nothing to save - Do nothing")
    {:noreply, []}
  end

  def handle_info(:flush_messages, state) do
    case Exchat.Repo.insert_all(SaveChats,state) do
      {0, _} ->
        Logger.error("Failed saving to database")
        {:noreply, state} # Keep messages in here and try again later
      {_, _} ->
        Logger.info("Saving to database")
        {:noreply, []} # Clear the messages
    end
  end

The first part def handle_info(:flush_messages, []) do, I’m attempting to pattern match an empty state [], to avoid any database call if there are no messages in the queue. I did try using {:noreply, state}, I get this error because state is never actually passed into this function:

warning: variable "state" does not exist and is being expanded to "state()", please use parentheses to remove the ambiguity or change the variable name

Again, I’m still learning, so maybe there is a different or simpler way to do this. Preferably, if the state is empty, I’d like to be “hands off” with the state and database, and do as little processing as possible.

For the second handle_info function, I have it set up to run only if the state has items in it and do the database call. I should mention that I am using MySQL. From what the docs describe (Ecto.Repo — Ecto v3.7.1), It sounds like the database call will return a number of items inserted, whether there is an error or not. The doc isn’t clear on what exactly gets returned in cases of an error. It mentions something about “attempted” inserts, but I’m not sure if that is for :on_conflict option set (which I am not doing). Is it even possible to know if there was an error during inserts???

My thinking is that if the second handle_info function is called, there is going to be at least one insert. So if it returns “0”, then there has to be some kind of error. Otherwise if it’s not “0”, then everything is good.

Let me know what you think, or how you would write this up?

A SQL database is intended to store transactional data, ie: store the items as it happens, synchronously. So the database is an integral part of the system, if it fails, the whole thing should fail.

If you are only interested in a backup store, you may as well use a no sql database that is optimized for this sort of things, like Google BigQuery or Amazon Redshift.