First of all, it smells like an XY problem to me, because it looks like you are trying to treat processes like mutable objects which represent rows in database. I may be wrong about it, but I am not in the context, so… It would be nice if you could provide more information about the original problem you’re solving with these processes which save their state to database.
Your solution with Registry is async (client calls GenServer, GenServer updates state and replies, client receives reply, state saved to the database). I don’t know if that is just a limitation of your approach or if it is really okay to silently lose some data in case of server failing right before saving to the database.
It is a tricky thing to implement correctly. For example, you need to mark dirty process as clean in the Registry once it was saved. If you do it in a persistence worker, you can have dataraces. If you do it in the entity process during the call when persistence worker asks it for it’s state, you can end up in a situation where entity process marked itself as a clean one, but then persistence worker crashed and this entity is now marked as clean, but it was never saved.
Then, you’d need to order the processes in the Registry and keep a cursor, to avoid a situation where persistence worker can’t get to save state of some entities, because the state of just saved entities has changed while persistence worker was saving it and this persistence worker picks recently saved entities again.
I propose a solution which can be sync, with batching and without a bottleneck
You have a GenServer which does this:
defmodule Entity do
def init(some_state) do
{:ok, %{state: some_state, reply_to: []}}
end
def handle_call({:update_state, update_func}, from, %{state: state, reply_to: reply_to}) do
new_state = update_func.(state)
{:noreply, %{state: new_state, reply_to: [from | reply_to]}, 0} # This 0 is important
end
def handle_info(:timeout, %{state: state, reply_to: reply_to}) do
PersistenceServer.save(state)
for x <- reply_to, do: GenServer.reply(reply_to, :ok)
{:noreply, %{state: state, reply_to: []}}
end
end
And PersistenceServer should do the same pattern with 0 timeout in handle_call return
defmodule PersistenceServer do
def init(some_state) do
{:ok, %{batch: batch, reply_to: []}}
end
def handle_call({:save, state}, from, %{batch: batch, reply_to: reply_to}) do
{:noreply, %{batch: [state | batch], reply_to: [from | reply_to]}, 0} # Again, this 0 is important
end
def handle_info(:timeout, %{batch: batch, reply_to: reply_to}) do
Repo.insert!(batch)
for x <- reply_to, do: GenServer.reply(reply_to, :ok)
{:noreply, %{batch: [], reply_to: []}}
end
end
This way you have both sync updates and batching.
Is PersistenceServer a bottleneck here? It is not, but let’s check out the possible scenarios to make sure. Let’s say we can save to database N states per second and all Entity processes receive M updates per second in total. If N > M, then it’s obvious there is no bottleneck. But what happens if M > N? Well, then there will be backpressure. While PersistenceServer is still processing the batch, every call to it will just be waiting for it to save the batch, including calls from Entity processes. And that means that those processes which are calling the Entity.update_state will be blocked too. But once PersistenceServer inserts the batch, all the pending updates will be reduced (because with use this {:noreply, ..., 0} pattern) and PersistenceServer will receive the full batch, which would unblock all callers to the Entity processes.