Batching persistence of GenServer state - flagging/updating their information in the Registry?

The Registry module can hold additional data per process / GenServer. Is it a good idea to use this for (micro-)batching the “save state to database” step of GenServers? I have the following idea:

  • Given: I have thousands or even tens of thousands of GenServers holding state of entities that are updated with frequent event messages.
  • Instead of saving each GenServer’s state to the database after every single message, I might flag them as dirty (unsaved changes). Could (or should) this be done in the Registry, by the GenServer process itself?
  • A separate “persistence worker” would then periodically (every 5 or 10 seconds or so) query the Registry for all dirty GenServers, ask those entity GenServers their current state and save them in bulk; afterwards telling the entity GenServers their state (as of when it was asked) has been saved.

Other patterns also seem possible, such as the entity GenServers actively sending their state to a central persistor that would hold it in a map until the next save. But that central process might become a bottleneck, whereas the Registry has partitioning built-in. Or maybe I could use Broadway with a custom GenStage? I am quite unsure what the possibilities are and how to choose among them. Maybe even mnseia could offer benefits regarding data safety in the event of a crash?

Also: How good or bad of an idea is it to have GenServers very frequently update their associated data in the Registry anyway? Would it even be a good idea to write a couple of additional bits of interest from the GenServer state into the Registry, e.g. for browsing the entity GenServers from a web app?

1 Like

First of all, it smells like an XY problem to me, because it looks like you are trying to treat processes like mutable objects which represent rows in database. I may be wrong about it, but I am not in the context, so… It would be nice if you could provide more information about the original problem you’re solving with these processes which save their state to database.


Your solution with Registry is async (client calls GenServer, GenServer updates state and replies, client receives reply, state saved to the database). I don’t know if that is just a limitation of your approach or if it is really okay to silently lose some data in case of server failing right before saving to the database.

It is a tricky thing to implement correctly. For example, you need to mark dirty process as clean in the Registry once it was saved. If you do it in a persistence worker, you can have dataraces. If you do it in the entity process during the call when persistence worker asks it for it’s state, you can end up in a situation where entity process marked itself as a clean one, but then persistence worker crashed and this entity is now marked as clean, but it was never saved.

Then, you’d need to order the processes in the Registry and keep a cursor, to avoid a situation where persistence worker can’t get to save state of some entities, because the state of just saved entities has changed while persistence worker was saving it and this persistence worker picks recently saved entities again.


I propose a solution which can be sync, with batching and without a bottleneck

You have a GenServer which does this:

defmodule Entity do
  def init(some_state) do
    {:ok, %{state: some_state, reply_to: []}}
  end
  
  def handle_call({:update_state, update_func}, from, %{state: state, reply_to: reply_to}) do
    new_state = update_func.(state)
    {:noreply, %{state: new_state, reply_to: [from | reply_to]}, 0} # This 0 is important
  end

  def handle_info(:timeout, %{state: state, reply_to: reply_to}) do
    PersistenceServer.save(state)
    for x <- reply_to, do: GenServer.reply(reply_to, :ok)
    {:noreply, %{state: state, reply_to: []}}
  end
end

And PersistenceServer should do the same pattern with 0 timeout in handle_call return

defmodule PersistenceServer do
  def init(some_state) do
    {:ok, %{batch: batch, reply_to: []}}
  end
  
  def handle_call({:save, state}, from, %{batch: batch, reply_to: reply_to}) do
    {:noreply, %{batch: [state | batch], reply_to: [from | reply_to]}, 0} # Again, this 0 is important
  end

  def handle_info(:timeout, %{batch: batch, reply_to: reply_to}) do
    Repo.insert!(batch)
    for x <- reply_to, do: GenServer.reply(reply_to, :ok)
    {:noreply, %{batch: [], reply_to: []}}
  end
end

This way you have both sync updates and batching.

Is PersistenceServer a bottleneck here? It is not, but let’s check out the possible scenarios to make sure. Let’s say we can save to database N states per second and all Entity processes receive M updates per second in total. If N > M, then it’s obvious there is no bottleneck. But what happens if M > N? Well, then there will be backpressure. While PersistenceServer is still processing the batch, every call to it will just be waiting for it to save the batch, including calls from Entity processes. And that means that those processes which are calling the Entity.update_state will be blocked too. But once PersistenceServer inserts the batch, all the pending updates will be reduced (because with use this {:noreply, ..., 0} pattern) and PersistenceServer will receive the full batch, which would unblock all callers to the Entity processes.

2 Likes

Thanks for the sync suggestion. If I understand correctly, unhandled inbox messages will always be handled before any (overdue) timeout, otherwise this would have no queueing effect with 0

It would be nice if you could provide more information about the original problem you’re solving with these processes which save their state to database.

Most basically, I am doing “Event Sourcing” (parsing and saving events from a message queue via Broadway) and additionally keeping materialized projections of entities (DDD “Aggregates”); my question is about the latter. The explicit real-world context is that those entities represent items being recognized/detected every couple of minutes as they move through our production plant. I opted for the GenServer, among other thoughts, because I want to leverage timeouts to react if an item is not detected any more for a predefined period of time.

I don’t know if that is just a limitation of your approach or if it is really okay to silently lose some data in case of server failing right before saving to the database.

It is okay to be eventually consistent, but in the end it is not okay to lose data. I thought I could get around this issue in the materialized projections this way:

  • The saving of plain events from the message queue is safe: messages are only acked in the queue after their batch has been saved (courtesy of Broadway).
  • The entities (DDD aggregates) hold a list of message IDs (together with timestamps of those message IDs), even those that are superseded by newer messages. After all, they even fulfill CRDT properties with this design; out-of-order updates don’t matter, neither do duplicate updates.
  • I can run another process (or cronjob) that periodically compares the saved event messages’ IDs+timestamps with the referenced message IDs+timestamps in the persisted entities. If there are missing message IDs, those messages could be sent again to the GenServers.

It is a tricky thing to implement correctly. For example, you need to mark dirty process as clean in the Registry once it was saved. If you do it in a persistence worker, you can have dataraces. If you do it in the entity process during the call when persistence worker asks it for it’s state, you can end up in a situation where entity process marked itself as a clean one, but then persistence worker crashed and this entity is now marked as clean, but it was never saved.

Yes, those are clearly not the way I would go about it.

Then, you’d need to order the processes in the Registry and keep a cursor, to avoid a situation where persistence worker can’t get to save state of some entities, because the state of just saved entities has changed while persistence worker was saving it and this persistence worker picks recently saved entities again.

I would not think of it as a cursor, but my approach was to ask the entity GenServers their state, and they would reply with a tuple of state + current time (which might be an internal counter in the GenServer, being increased on every state change). After successful save, the async persistent worker would tell the entity GenServer back that timestamp (or counter value), so the entity GenServer can see if it is really clean or if there were intermittent updates.

This way you have both sync updates and batching.

Your suggestion is a valuable option, and to be honest I really like it, as it uses “batching without waiting”. What I take into consideration is that the GenServers block whenever they have unsaved changes. That would add potential latency to “asking their current state” if I have this additional use case for them down the road, as I hinted in my first post. It would be nice to build observability directly in here, instead of accessing the persisted data with another process (but that would also be an option).

Unless these items at your production plant are recognized/detected more frequently then a CPU frequency (which I highly doubt), you can just have a single GenServer which processes all entities states and handles timeouts for all entities

Event Sourcing as an idea has a very narrow use case and very strict requirements. First of all, saving an event to persistent storage and updating read/write models (you call them materialized projections or aggregates) needs to be done atomically, which is not the case for your proposed solution. Second, is that events need to be saved and processed in strictly linear order, one by one, which is again, not true for your solution.

Overall, I don’t think that you need event sourcing in the first place. Event sourcing is a good fit for systems which require the ability to be able to replay some part of event stream and to get to the exact same state the system got into in runtime, without having even a single bit of difference. This is useful for projects which rely on the automated decision systems or work with money directly. For example, you want to detect fraudulent transactions inside your banking system, so you take out the event log for some day and you replay it until you find the transaction. Or you have a trading bot and you want to debug some incorrect decision which was made when market was in high-volatility.


But even if you stick to this solution:

I don’t quite understand that point about out-of-order updates. Consider we implement a counter.
I start with 0 as my state. Then I receive {:add, 1 = amount, 100 = timestamp}. My state now is 1. Then I receive an out-of-order {:add, 2 = amount, 50 = timestamp} (see, that this message has a lower timestamp than a previous one). My state now is 3. But if I were receiving updates in the correct order, I would have first be in state 2 and only then with 3. So, it doesn’t work for out-of-order updates.

If you want to support out-of-order, you need to have strictly increasing sequence number and you’d have to postpone the messages which arrive earlier than those which have lower sequence number. Thats what TCP does internally. And you need this sequence number per-GenServer obviously.

Same example as above

1 Like

But if I were receiving updates in the correct order, I would have first be in state 2 and only then with 3. So, it doesn’t work for out-of-order updates.

That’s what I mean with eventual consistency: Arriving at the same state no matter the ordering. I am paying close attention that my state update function does not introduce what I would call a “path dependency” (i.e., the state transition function is commutative). I am not interested in all intermediate states. Most importantly, I am only saying Event Sourcing and not CQRS. My system is not making any decisions, or receiving commands, in that domain. It is a “downstream” analytics system only, but with a high emphasis on (eventual) data completeness. So you are right in that it does not have some of the needs that other, more typical Event Sourcing systems have. But I do want to be able to recompute my state based on the saved events, and I want to be future-proof in that I may want to compute something in my projection in the future that I am not computing today, but with today’s data. In the end it’s a philosophical debate whether my system is an example of Event Sourcing or not.

Unless these items at your production plant are recognized/detected more frequently then a CPU frequency (which I highly doubt), you can just have a single GenServer which processes all entities states and handles timeouts for all entities

I want to compute the projections in near real time, and they take quite a few CPU cycles each - every incoming measurement is matched against the item’s production plan, which is also saved in the projection (and may change at any time, as another kind of event, which causes re-matching). Then there are a couple more kinds of events per item, such as creation/update/delete of manual annotations (which also come as events from the upstream system in my case).

Having only one GenServer for all items would require re-implementing bookkeeping functionality that the GenServer+Registry combination handles so nicely for me. Juggling thousands of semantic timeouts in one GenServer seems odd to me. I would not really see the point in using Elixir if I were implementing all that by hand in a single GenServer. The “one GenServer per tracked item (aggregate)” in my solution looks like a good fit to me, and if by any means OTP/BEAM holds up to what is promised, then the additional context-switching between those Erlang processes should be a piece of cake for the BEAM.

I implemented your sync suggestion today, and I experienced timeouts (the default 5000 ms) on the Broadway while dispatching to the GenServers because the storage process may take some time due to file I/O (the backpressure you promised). I’ll keep it in the back of my mind as an option, but will now go for some middle ground between my original “async” plan,and still using the “batching via a timeout of 0” technique and try out setting values in the Registry. I’m still curious how much I should save in the Registry values (up to the entire state of multiple kilobytes?), or if there’s a point in keeping it updated in an ETS table for other processes to read, etc. In particular, I will also implement a web app around it in which the “active” items can be browsed, with a detail page that automatically updates with state changes.

1 Like