The spawn/1
function call will create a new process and the supplied function will run in that process and the rest of the logic in handle_in
will continue concurrently.
From the blog post, this claims to run things faster, and that is true in terms of just the handle_in
function as it would not have to wait for the message to be persisted. I would not use the method in a production system.
Firstly, a pure spawn
function call will create an unlinked
and unmonitored
process. This means if that process dies, hangs or fails to save the message there is no-one there to know about it. It also sound like you would want every message to be persisted. Or at least try for a bit before giving up and spawn will not give you those guarantees.
Secondly, if there is a lot of messages you will be spawning lots of concurrent process. Each concurrent process will try to write to your database at the same time and I can foresee this being a bottleneck and this concurrency needs to be better managed.
In terms of architecture, I would (instead of the spawn) have a queue accepting all the messages and then writing to the database in batches. If all messages must be persisted then the queue should be a persistent queue and the calling process (handle_in
in this case) should be synchronous.
To get things going, you could start with a prototype having the queue as a GenServer
.
defmodule MyQueue do
require Logger
@interval :timer.seconds(5)
# External API
#
def save_message(msg) do
GenServer.call(__MODULE__, {:save, msg})
end
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
# Callbacks
def init(_) do
state = [] # State in this case is a list of messages
:timer.send_interval(@interval, __MODULE__, :flush_messages)
{:ok, state}
end
def handle_call({:save, msg}, _from, state) do
{:reply, :ok, [msg | state]}
end
def handle_info(:flush_messages, state) do
case Db.insert_all(state) do
{:ok, _} ->
Logger.info("Saving to database")
{:noreply, []} # Clear the messages
{:error, _msg} ->
Logger.error("Failed saving to database")
{:noreply, state} # Keep messages in here and try again later
end
end
end
Notes regarding the GenServer:
- The GenServer is a singleton, meaning all calls to it are serialized.
- The
handle_info
call is blocking the database call. While this function is running the GenServer will not accept any new messages (one message at a time (see above)). The actual writing to the database may be delegated to another process so that the queue can accept more messages while waiting. This requires more code and coordination. - The
save_message
is handled in ahandle_call
. This is a synchronous call which should be the default when dealing with GenServers. If this was done usinghandle_cast
then it is asynchronous but have the problem that the GenServer’s mailbox can easily be overloaded in case of a large number of messages coming in whilst it is writing to the database. - It uses
:timer.send_interval
, meaning that it will try to save messages every second. An alternative approach is to use:timer.send_after
instead and you can manually start a new timer after every database write. The difference is that the first approach will run every second. The second approach will runevery second + the amount of time it takes for the database call to complete
. Both are useful depending on use case. - The Queue can be improved by either writing every X seconds, or if the number of messages are larger than a specific value.
- If persistence is needed, you may want to store every messages in an interim persistent queue before writing to the database. For example you could use
disk_log
for this, or perhaps evenmnesia
. Again, more code, more coordination and more things that can go wrong. It is all about trade-off. - The GenServer should be hooked up to your applications supervision tree so that if it fails it gets restarted (but with current implementation it loses its current message queue if that would happen. Again trade-offs).
Once the queue is done you can do this in your handle_in
def handle_in("new_msg", %{"body" => body}, socket) do
if (socket.assigns.is_anon == 0) do
IO.inspect("user is logged in - send message")
:ok = MyQueue.save_message(%{body: body, userid: String.to_integer(socket.assigns.user_id), roomid: socket.topic})
broadcast!(socket, "new_msg", %{body: body})
{:noreply, socket}
else
IO.inspect("user is anonymous - do not send message")
{:noreply, socket}
end
end
This looks like sequential code and the saving of the messages have been abstracted away.
One important thing here is to abstract away he MyQueue
implementation. Then you can easily swap it out for another implementation later or use a different one for testing. (MyQueue
might even be a bad name as queue in this case is an implementation detail, but naming things is hard)