DGen - A distributed GenServer

DGen - A distributed GenServer

I love GenServer. There are only 2 things stopping me from writing an entire app with them:

  1. Durability: The state is lost when the process goes down.
  2. High availability: The functionality is unavailable when the process goes down.

What if we could guarantee the GenServer never went down? Could we build a stateful application without a database?


Many Erlang and Elixir developers have this fantasy at some point in their journey. But how close can we actually get to the dream? I’d like to find out with DGen. This is v0.1.0 stuff, early days.

What DGen does

DGen provides a ā€œdistributed GenServerā€ (DGenServer). It’s meant to work just like a GenServer, but the message queue and the module state are durably stored in FoundationDB, with other backends possible.

Quick example

Our simplest example looks almost exactly like a GenServer.

defmodule Counter do
  use DGenServer

  def start(tenant), do: DGenServer.start(__MODULE__, [], tenant: tenant)

  def increment(pid), do: DGenServer.cast(pid, :increment)
  def value(pid), do: DGenServer.call(pid, :value)

  @impl true
  def init([]), do: {:ok, 0}

  @impl true
  def handle_call(:value, _from, state), do: {:reply, state, state}

  @impl true
  def handle_cast(:increment, state), do: {:noreply, state + 1}
end

However, the state lives on beyond the lifetime of the original Elixir process.

{:ok, pid} = Counter.start(tenant)
Counter.increment(pid)
Counter.increment(pid)
2 = Counter.value(pid)

# Restart the process
Process.exit(pid, :kill)
{:ok, pid2} = Counter.start(tenant)
2 = Counter.value(pid2)  # State persisted!

The tenant argument is the only unique piece here. This tells DGenServer where to persist the queue and state in the datastore.

Beyond the basics

The simple example demonstrates the durable state. But the benefits inherited by a serializable distributed system are all here:

  • Start one DGenServer per node, with state mutations processed exactly once, without rpc coordination or process registration.
  • Separate where messages are pushed from where they are processed. Producers can run anywhere in the cluster, while consumers — the processes that mutate state — can be pinned to specific nodes or hardware.
  • Perform side effects such as sending emails or performing network requests, with similar transactional guarantees.

Embracing side effects

The simplest side effect is one that happens after a state mutation. For example, a log message is a side effect! Your callback can optionally return a function to be executed after the state change is committed.

def handle_cast(:increment, state) do
  action = &Logger.info("Counter is now #{&1}") # runs after commit
  {:noreply, state + 1, [action]}
end

On the other hand, when a side effect needs to update the state, then we must lock out the queue from processing messages while our side effect executes outside of the transaction.

def handle_cast(:send_email, state) do
  # executes inside the transaction
  {:lock, state}
end

def handle_locked(:cast, :send_email, state) do
  # executes outside of a transaction
  Req.post(...)
  {:noreply, %{state | sent: state.sent + 1}}
end

Under the hood - performance characteristics

Message Queue: The critical piece of DGenServer is the message queue. A caller must be able to push new messages onto the queue with serializability guarantees and high concurrency. This is achieved by using versionstamped keys, which exactly tie the underlying commit order with the key order.

Writes: The module state could be stored as a single term_to_binary blob. However, doing so would amplify the number of writes necessary for incremental changes. Instead, DGenServer adopts the design decisions of LiveView’s assigns and component lists. A module state consisting of a map with atom-keys or a list with string-id’d elements are optimized for incremental diffs on write. This means that standard Elixir structs are the preferred terms for the DGenServer module state.

Reads: And finally, DGenServer will cache the module state in memory to improve performance, with perfect cache invalidation. A single hot consumer will never have to read the full state, unless it restarts.

These components together allow for adequate performance. Still, you shouldn’t replace all your GenServers tomorrow. DGenServer should be reserved for stateful mutations that require durability and high availability guarantees, such that the performance tradeoff is acceptable.

Let it crash?

A DGenServer consumer can crash, just like any other Elixir process. But a key difference here is that the message queue is durable. If the crash is due to a poison message, a supervisor restart of a DGenServer consumer will simply try to process the same message again. DGen has yet to learn what this means in a production setting. Crash semantics themselves are well-defined, but system recovery is not automatic, like it is with GenServer. An operator may have to manually delete a poison message from the queue - an operation that is not possible with a standard GenServer.

Other backends?

DGen requires a strictly serializable key-value datastore with transactions, like FoundationDB, to provide the consistency guarantees to match the semantics of a GenServer. The first backend implementation available in DGen is FoundationDB, via erlfdb. But, we hope that other datastores providing a similar featureset can be implemented as alternative backends (such as Hobbes, Bedrock, etc.). I’m very open to flexing the current backend behaviour (:dgen_backend) to support other projects.

Links

Community input

So I’m continuing my obsession with exploring different kinds of state engines on the BEAM. I know there are like-minded folks around, so I’d love to hear thoughts and feedback about the approach. This project isn’t meant to be integrated into your production app today, but hoping it can evolve into something useful.

15 Likes

Does this have the same bottleneck issues as a GenServer does?

Yes, there is a single message queue and messages are processed in serial. There is no throughput advantage.

Speaking of ā€œCould we build a stateful application without a database?ā€œ, there is an interesting presentation on this topic or better 2 versions of it:

4 Likes

That is so great that I’ll have a hard time not pulling away from my work to try it.

Before I get too child-like excited: are other storage backends offering less guarantees that make them bad candidates for DGen or is it simply a matter of bandwidth on your part? I’d love to explore DGen backed by Postgres (and maybe SQLite3 in the future, if it even makes the cut).

1 Like

Interesting experimentation! Did you find any prior art?

I wonder if the team at Ericsson considered that when designing OTP…

1 Like

Bandwidth, mainly. :smiley: I think most modern storage engines are written with serializability in mind. Shoot, you could probably do it with S3’s Strong Consistency modes. However, FDB was specifically designed for clients to implement sophisticated and language-specific data structures, so it has all the abstractions to make it convenient and fun.

I think a sqlite backend would be totally doable and successful. Obviously you lose the distributed guarantees, but you still have the durability.

For postgres, I’ve read that it takes careful precision to write a concurrency safe queue in postgres.

FDB is an ideal datastore to go zero-to-one on an experiment like this. Going from one-to-two will take a serious rethink of the backend interface. Bedrock and Hobbes are both inspired by FDB, so they are prime candidates.

Even still, how do we generalize versionstamps and watches? - an open question.

I’m hoping to keep :dgen_backend’s key-value abstractions mostly in place, and to avoid making it too SQL-y. The tradeoff of generalization is that you lose the fidelity of the best-fitting implementation.

1 Like
  • ra: Raft-consensus state machine. ra is great, and provides the same basic ideas. You are responsible for forming the cluster with your BEAM nodes.
  • oban: Job processing with Postgres, MySQL, Sqlite, and others. It’s a message queue + your code. Universally loved, I hear it’s a great project.
  • Process registries such as global, Registry, and Horde
2 Likes

What about mnesia, I feel like that was kinda design for persistent configuration state like what we keep in a GenSever.

1 Like

Yes, absolutely, I am with you.

I, like most commercial devs, simply have operational concerns and in this case – worries about a split-brain problem. I am desperately looking for something like Ash team’s Reactor or Flowstone that gives me an Oban-like persistent workflow orchestration with some extra bells and whistles, but that’s obviously the Y in the XY problem; persistent GenServers would do this just the same, if not better (I bet on the latter). Hence my interest in your library.

Or maybe there’s a way to embed FDB a la SQLite3? Can you give me some reading to do on what would it take to use FDB in an Erlang/Elixir project with operational concerns minimized?

And to circle back to the split-brain problem: we have a SaaS in our backend and I more or less hate it at this point – slow (45-60ms per request in prod, and we sometimes need multiple to do a full business operation), has async / eventual consistency traps, and the little value it brings is completely overshadowed by its drawbacks in general.

We need a good orchestrator before removing it though.

I’ll try DGen in the next few weeks and will let you know how it is holding up for us.

This is how I run FDB: ex_fdbmonitor | ExampleApp

It’s not an embedded database. You’re still self-hosting a service with this method, and going to production does require an operational investment in the technology, so I understand the hesitation.

2 Likes

Why is it ā€œdistributed GenServerā€? I don’t see any ā€œdistributedā€ here, it’s more like ā€œdurableā€ or even ā€œpersistentā€

General idea of durable state machines is nice and very useful, but implementing it with GenServer semantics is a dead end. I think that it is a bad pattern and a very misleading approach.

If I were to make this library a good one, I’d think about it like this:

DGenServer does not work as a GenServer in any sense.

You can’t safely restart this server in order to reset the state, which is a big deal when it comes to how OTP approaches failure safety. Then, performance is completely different. Then, timeouts handling is completely different. Then, interface is completely different (postponing side effects, etc). Then, use-case is completely different. In the end, any comparison with GenServer is wrong and is bringing more harm than good for users who may think that they can just replace their use GenServer with use DGenServer and go on.

So, I’d change the name to something like DurableStateMachine.

Messages

Even if you rename it and change the interface, there is still a problem: input comes from messages! That is very bad. Most of messages are temporary and contain temporary data like pids, references. Monitors and links result in temporary data. And you save them.

So, I’d separate the state machine from BEAM’s message boxes and remove this process-based handle_info, handle_call, etc. semantic completely.

I’d have just handle_input with manual state handling, without any processes involved.

Diff

You do diffs to not save the whole state, but your diff implementation is best effort. If you give user the ability to handle the update themselves, it would bring a huge performance benefit in some cases

Batching

And it would be nice to have the ability to batch the changes, because FDB works much more efficiently with one transaction changing 100 keys than 100 subsequent transactions changing 1 key


So, it would look something like this

defmodule MyStateMachine do
   use DurableStateMachine
   
   @impl true
   def init(id) do
     {:ok, %{id: id, status: :off, calls: 0}}
   end

   def switch(state) do
     with {:ok, state} <- handle_input(:switch, state) do
       Logger.info("Switched #{state.id} to #{state.status}")
       Logger.debug("#{state.id} received #{state.calls} so far")
       {:ok, state}
     end
   end
   
   @impl true
   def handle_input(:switch, state) do
     next_status =
       case state.status do
         :off -> :on
         :on -> :off
       end

     next_calls = state.calls + 1
 
     # Only status is saved, because `calls` is for debugging
     DurableStateMachine.set({state.id, :status}, next_status)
     
     {:ok, %{state | status: next_status, calls: next_calls}}
   end
end

# And used like this

id = # Some ID here
{:ok, state} = MyStateMachine.get_or_create_new(id)
{:ok, state} = MyStateMachine.switch(state)
IO.inspect state.status

But this functionality has no batching. If you were to add batching, you’d have to make the transaction explicit and it would make the whole need for library obsolete.

You might want to take a look at my approach to the problem: GitHub - am-kantox/peeper: Almost drop-in replacement for `GenServer` to preserve state between crashes

It uses a heir subprocess and preserves not only the state of the GenServer, but also its process dictionary and its ets tables.

3 Likes

I fear there may be a misunderstanding here. I blame myself for my poor delivery. DGen is not saving Erlang process messages to a datastore. It saves the cast/call request portion only. It’s up to the programmer to avoid using temporary or localized data such as pids in their cast/call requests. For example, in the ā€˜increment cast’, we store the term :increment. Also, DGen is in fact storing references in order to dispatch appropriately. However, as far as I know references alone are neither temporary nor local. (A reference can refer to something that is local, but we’re not doing that here)

You are able to start several instances of a single DGenServer. For a given message, only one of them will consume it and process it. Thus, the state mutations are distributed across several nodes. The functionality provided by that DGenServer is highly available, as any number of them can go down, as long as a single consumer is still alive. They do not coordinate with each other, but rather they coordinate via the distributed message queue and take advantage of strict serializability.

Can be added as an enhancement if needed.

Thank you for the feedback, especially that the name is misleading. If the DGenServer module name is too bold, I welcome the requests to change it.

2 Likes

The Waterpark videos are excellent. Would love to see a full technical write up on how they do it. From memory, nothing is actually persisted to disk - they just have 10+ nodes in a cluster or something similar.

1 Like

Indeed. They just have the data in memory on multiple machines and count on the fact that they won’t run into a case where all copies of any individual piece of information drop out at the same time.

2 Likes

Man, I wouldn’t be able to sleep at night knowing that all data of my app is just in RAM (probably why the idea is so interesting and waterpark videos were indeed great)

2 Likes

I am 100% with you. Wouldn’t matter if it was 50 machines, I just want the data written to file somewhere. Probably irrational, but very hard for me to shake. Still, they have had zero downtime in 5 or 7 years or something, so they clearly know what they are doing.

After watching the first Waterpark talk, it sounds like they’ve implemented an eventually consistent storage engine. This is genuinely hard to get right. Coordinating writes across nodes without a central authority, while gracefully recovering from failures, and regular hot code loading. Getting this wrong can lead to some hair pulling at 2am.

Our ecosystem has some of the strongest eventual consistency tooling: consider the EEF working group OpenRiak, probably the most production-hardened EC system in the BEAM.

It’s also possible to get EC semantics without a separate database with riak_core_lite. You can get consistent hashing, vnodes, conflict resolution directly in an Elixir application. A different use case than OpenRiak, but worth mentioning.

That said, EC comes with tradeoffs. Namely, eventual consistency at the key-value level doesn’t automatically compose into consistency at the application level. I’ve personally misused EC in the past, suffered the consequences (invariant violations), and landed on strict serializability systems to recover.

2 Likes

I don’t think I can recall the exact specifics, but it was something like this:

  • Each patient gets a unique process
  • Each process is replicated 3 or more times across nodes
  • The Master location for each process was determinable by some sort of filter (bloom??) or hashing function.
  • Any node could independently work out what node should hold the master record using the same filter/function (so it was fully deterministic and didn’t rely on a shred lookup table)

I really wish I had more details, because the system uses a genuinely unique architecture that seems to be working incredibly well.

1 Like