Hello,
(Sorry, this post is kind of a mind dump, there is no precise question, just my
thoughts. I just hope it will resonate with some people here. Edit: yeah, it’s huge!)
I would like to discuss a pattern that I have found myself using quite often,
but that does not feel quite right.
From a distant point of view the pattern is to lookup for a registered process
before delivering a message, and starting the process if it was not found. Then
deliver the message.
First, let’s see the reasons why I do this, and why the pattern does not fit
very well.
Imagine we are implementing a gaming service like Board Game
Arena. There are lots of games to play and lots
of users. A game party is called a “table” (at least in the french version that
I use), meaning the actual piece of furniture where you can play board games in
real life. Note that this duscussion is not intended to focus on gaming, there
are different use cases where the pattern could apply.
If I am not mistaking, their implementation is PHP/MySQL, meaning that each
player action leads to an execution of the PHP script and the game state is
persisted in MySQL and concurrency is handled with transactions. Imagine now
that we have so many game tables that the database is overloaded.
In Elixir there is a simple solution for that: spin up a process that will
receive the players actions and keep the game state, and have that process
persist the state regularly. Saving less often will let the database catch up.
We can lose state if the game process crashes but for now let’s say that the
code is pretty stable and handles edge cases correctly.
Doing this allows to develop quickly as you don’t actually need any database in
the begining, or at least when developing, you can just remove the persistence
layer, tell the process to stay alive until the game party ends, and the games
still work. There was a good article about this from Chris McCord IIRC but I
could not find it (if someone has the link please post it below as it is
interesting).
The first problem is that with many users, all current game states cannot fit in
memory. We start our jurney by using a single, cheap server. So what we want to
do is to start the game process when there is a player action, and keep the
process around because some games will receive many players actions in a short
period of time. Then use a timeout to let the process terminate when no more
action is received for a couple seconds.
Now, how to do that? The pattern is a bit convoluted but conceptually simple.
First, we need a way to identify a game table process, that also allows to start
it, so we need an ID and the game type. We will use a game_ref
function that
wraps the type/ID as a single value, and a via
function that returns a via
tuple.
Now, when sending a message to the process we can use a function like this. Our
process runs code that is able to reply to GenServer.call/3
.
def game_ref(game_type, game_id) do
{game_type, game_id}
end
def player_action(gref, action) do
call(gref, {:player_action, action})
end
def call(gref, msg) do
with {:ok, pid} <- fetch_process(gref) do
GenServer.call(pid, msg)
end
end
def via(gref) do
# ...
end
defp fetch_process({game_type, game_id} = gref) do
child_spec = {__MODULE__, game_type: game_type, id: game_id, name: via(gref)}
case MyGameSupervisor.start_child(child_spec) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
{:error, _} = err -> err
end
end
On initialization, the game process will load the state from the persistence
layer with the game ID. When creating a game, the initial state is written to
the database before starting the process first to handle game initialization ;
i.e. the INSERT
is not done from the process, and when initializing there
will always be an existing state in database (or the game does not exist, which
is a genuine exception).
To allow the process to terminate, we will return timeout from handle_call/3
.
In this exemple the timeout can be decided from state (depending on what game we
play). The problem is that while terminating, the process can still receive new
messages, so we do not “save and quit”. We save, and a return a zero timeout to
be able to handle more messages. The hypothetical handle_action/2
function
marks the state as not-persisted.
def handle_call({:player_action, action}, _, state) do
state = handle_action(state, action)
publish_state_to_websockets_or_whatever(state)
{:reply, :ok, state, timeout(state)}
end
def handle_info(:timeout, state) do
if game_is_persisted?(state) do
{:stop, {:shutdown, :server_timeout}, state}
else
state = persist_state(state)
{:noreply, state, 0}
end
end
Finally, there is a race condition where we would want to deliver a message
while the process is terminating. That is because depending on what is used for
registration, we can fetch the pid right before the process will stop with
{:shutdown, :server_timeout}
, or right after it has stopped but not yet
unregistered. (We can unregister explicitly but that does not fix the race
condition).
So we rewrite our call
function to handle
that case:
def call(gref, msg) do
pid = fetch_process(gref)
GenServer.call(pid, msg)
catch
:exit, {{:shutdown, :server_timeout}, _} -> call(gref, msg)
:exit, {:noproc, _} -> call(gref, msg)
end
That works well in many cases and has some pros.
- On a single hardware server, there is no need to manage database transactions,
as all calls to a single game are serialized to the game process. That allows
to use different persistence layers depending on the use case. (Again, this is
not a topic about game servers specifically). - Once the
call/2
function is written, handling edge cases and limiting the
recursion on too many errors, the rest of the code can be written as if there
is always a spinning server for any game table. That is very valuable to me.
Now, why use a process for this? Why not just use a short-lived cache for game
state and just apply the game state changes directly from the calling process
(for instance, a Phoenix controller or socket channel process)? Here are the
reasons:
- We want the application to be able to run code on it’s own, after the
calling process is done. For instance, in a game, you want to deliver a player
action and reply OK fast, but then the game logic can start timers, run IA
moves. On a payment solution you want to return “I am processing” to the
caller but then call a couple microservices to handle stuff, etc. - Keeping the state around in a process prevents too much copying between the
caller and the game process : only the action is copied. If the calling
process would run the game actions, we would have to copy the state to it
before. - We may need to wait a message from an external source (let’s say a global
timer service that syncrhonizes games in a tournament), so you need a pid,
basically. We cannot use the pid of a Phoenix request for that as it is short
lived.
Finally, let’s see some cons of the pattern:
- Depending on the timeout duration, and the load (number of interacting users),
we may end up with all processes running constantly, as if they were not
transient but persistent, which make the pattern useless. - Removing the pattern and using cache will still have all games states in
memory, so we need a cache cleanup and handle concurrency at the cache level. - It works on a single machine. If we want to write a distributed application,
we will have to use global registration for the:via
tuples. So we have to
useglobal
,gproc
, orHorde
. It can work but it comes with problems:
sync latency, passing the players requests from node to nodes if the load
balancer does not direct a request to the same node where the game process is
running, and of course netsplits. - With netsplits, we will have to use global locking, or database transactions,
or at least optimistic writes. And that means loading the game state from the
persistence layer before running every action, and either running the game
logic in a transaction or ensuring that the state was not modified from
elsewhere when writing (e.g. comparing a interation count). In that case the
pattern adds burden but does not solve anything. We can do the same from the
calling process (Phoenix request), or just start a supervisedTask
if we
need to continue processing after the request is served. - Calling
start_link
before each call just to check if the server is running
does not feel right. It’s slower, and the supervisor can be a bottleneck (the
recentPartitionSupervisor
is a neat addition but it also adds a layer of
indirection when we want to useDynamicSupervisor
underneath). We can
reverse the logic by passing the via tuple tocall
, catching:noproc
and
only then callstart_child
on our supervisor but it requires to extract the
data to create the child spec from the via tuple ; that is not always possible
or just adds burden to the registration system.
So I see two actual use cases here.
First, with a persistence layer and concurrency problems.
- One solution is to embrace database transactions, and just run whatever code
is needed from the calling process direcly or from a supervised task. - One other is to balance the load with a hash system that will ensure that all
external events are sent to a specific node. This is external to the BEAM
runtime and requires parsing the event to know which node to use (hopefully
just parsing a URL segment for an ID). Within the BEAM, the same hash
algorithm can be used to start processes on the right node. Both of these
requirements needs a static set of named nodes.
The other use case is when the requirements are not about the state and
persistence, but the need for an actual process. For instance we have a data
processing pipeline tool that let users send many requests to our service (let’s
say 100). But the users, depending on their plan, can only run 10 pipelines
concurrently, for instance. So pipeline requests are stored. We allow to send a
single HTTP request to register several works to avoid doing 100 HTTP requests
in a row, but the “jobs” are executed one by one.
When a request (or a batch of requests) is stored, we must check if there is a
running process for that user that will pull from the available jobs, if not
start it, and then that process will pull from the batch and supervise a
fixed-size set of workers (10) to run the batch. (That “process” is more likely
to be a controller, a worker supervisor, a feeder, that is a “group” but not
“group” as in pg/pg2. It can also be a GenStage
pipeline). If another batch is
created, we do not want to start a duplicate process group for the same user,
just ensure that it is started and consequently will pull work from the stored
requests. The group will exit after a while when there is no more work to do,
thanks to the significant children feature. So basically those are queues, but
with concurrency defined at the queue level and we do not want to have the
queues running 27/7 just to handle a couple batches once in a while.
In that case the pattern works. There is still ID-to-node hashing required when
distributed though, and the call / catch exit pattern still feels risky in some
cases.
To sum up I would say that the pattern described here always seems to me a good
solution in the beginning, kind of a silver bullet, but when digging in the
requirements and the different problems to handle it very rarely ends up to be
right solution. Handling persistent or temporary processes is easy with OTP but
I feel tools are lacking (or my knowledge is lacking of course) regarding the
architecture of transient, identified workloads.
Thanks for reading, any thoughs are appreciated !