DGen - A distributed GenServer

DGen - A distributed GenServer

I love GenServer. There are only 2 things stopping me from writing an entire app with them:

  1. Durability: The state is lost when the process goes down.
  2. High availability: The functionality is unavailable when the process goes down.

What if we could guarantee the GenServer never went down? Could we build a stateful application without a database?


Many Erlang and Elixir developers have this fantasy at some point in their journey. But how close can we actually get to the dream? I’d like to find out with DGen. This is v0.1.0 stuff, early days.

What DGen does

DGen provides a “distributed GenServer” (DGenServer). It’s meant to work just like a GenServer, but the message queue and the module state are durably stored in FoundationDB, with other backends possible.

Quick example

Our simplest example looks almost exactly like a GenServer.

defmodule Counter do
  use DGenServer

  def start(tenant), do: DGenServer.start(__MODULE__, [], tenant: tenant)

  def increment(pid), do: DGenServer.cast(pid, :increment)
  def value(pid), do: DGenServer.call(pid, :value)

  @impl true
  def init([]), do: {:ok, 0}

  @impl true
  def handle_call(:value, _from, state), do: {:reply, state, state}

  @impl true
  def handle_cast(:increment, state), do: {:noreply, state + 1}
end

However, the state lives on beyond the lifetime of the original Elixir process.

{:ok, pid} = Counter.start(tenant)
Counter.increment(pid)
Counter.increment(pid)
2 = Counter.value(pid)

# Restart the process
Process.exit(pid, :kill)
{:ok, pid2} = Counter.start(tenant)
2 = Counter.value(pid2)  # State persisted!

The tenant argument is the only unique piece here. This tells DGenServer where to persist the queue and state in the datastore.

Beyond the basics

The simple example demonstrates the durable state. But the benefits inherited by a serializable distributed system are all here:

  • Start one DGenServer per node, with state mutations processed exactly once, without rpc coordination or process registration.
  • Separate where messages are pushed from where they are processed. Producers can run anywhere in the cluster, while consumers — the processes that mutate state — can be pinned to specific nodes or hardware.
  • Perform side effects such as sending emails or performing network requests, with similar transactional guarantees.

Embracing side effects

The simplest side effect is one that happens after a state mutation. For example, a log message is a side effect! Your callback can optionally return a function to be executed after the state change is committed.

def handle_cast(:increment, state) do
  action = &Logger.info("Counter is now #{&1}") # runs after commit
  {:noreply, state + 1, [action]}
end

On the other hand, when a side effect needs to update the state, then we must lock out the queue from processing messages while our side effect executes outside of the transaction.

def handle_cast(:send_email, state) do
  # executes inside the transaction
  {:lock, state}
end

def handle_locked(:cast, :send_email, state) do
  # executes outside of a transaction
  Req.post(...)
  {:noreply, %{state | sent: state.sent + 1}}
end

Under the hood - performance characteristics

Message Queue: The critical piece of DGenServer is the message queue. A caller must be able to push new messages onto the queue with serializability guarantees and high concurrency. This is achieved by using versionstamped keys, which exactly tie the underlying commit order with the key order.

Writes: The module state could be stored as a single term_to_binary blob. However, doing so would amplify the number of writes necessary for incremental changes. Instead, DGenServer adopts the design decisions of LiveView’s assigns and component lists. A module state consisting of a map with atom-keys or a list with string-id’d elements are optimized for incremental diffs on write. This means that standard Elixir structs are the preferred terms for the DGenServer module state.

Reads: And finally, DGenServer will cache the module state in memory to improve performance, with perfect cache invalidation. A single hot consumer will never have to read the full state, unless it restarts.

These components together allow for adequate performance. Still, you shouldn’t replace all your GenServers tomorrow. DGenServer should be reserved for stateful mutations that require durability and high availability guarantees, such that the performance tradeoff is acceptable.

Let it crash?

A DGenServer consumer can crash, just like any other Elixir process. But a key difference here is that the message queue is durable. If the crash is due to a poison message, a supervisor restart of a DGenServer consumer will simply try to process the same message again. DGen has yet to learn what this means in a production setting. Crash semantics themselves are well-defined, but system recovery is not automatic, like it is with GenServer. An operator may have to manually delete a poison message from the queue - an operation that is not possible with a standard GenServer.

Other backends?

DGen requires a strictly serializable key-value datastore with transactions, like FoundationDB, to provide the consistency guarantees to match the semantics of a GenServer. The first backend implementation available in DGen is FoundationDB, via erlfdb. But, we hope that other datastores providing a similar featureset can be implemented as alternative backends (such as Hobbes, Bedrock, etc.). I’m very open to flexing the current backend behaviour (:dgen_backend) to support other projects.

Links

Community input

So I’m continuing my obsession with exploring different kinds of state engines on the BEAM. I know there are like-minded folks around, so I’d love to hear thoughts and feedback about the approach. This project isn’t meant to be integrated into your production app today, but hoping it can evolve into something useful.

13 Likes

Does this have the same bottleneck issues as a GenServer does?

Yes, there is a single message queue and messages are processed in serial. There is no throughput advantage.

Speaking of “Could we build a stateful application without a database?“, there is an interesting presentation on this topic or better 2 versions of it:

Waterpark Bryan Hunter

Waterpark Bryan Hunter Version 2

3 Likes

That is so great that I’ll have a hard time not pulling away from my work to try it.

Before I get too child-like excited: are other storage backends offering less guarantees that make them bad candidates for DGen or is it simply a matter of bandwidth on your part? I’d love to explore DGen backed by Postgres (and maybe SQLite3 in the future, if it even makes the cut).

1 Like

Interesting experimentation! Did you find any prior art?

I wonder if the team at Ericsson considered that when designing OTP…

1 Like

Bandwidth, mainly. :smiley: I think most modern storage engines are written with serializability in mind. Shoot, you could probably do it with S3’s Strong Consistency modes. However, FDB was specifically designed for clients to implement sophisticated and language-specific data structures, so it has all the abstractions to make it convenient and fun.

I think a sqlite backend would be totally doable and successful. Obviously you lose the distributed guarantees, but you still have the durability.

For postgres, I’ve read that it takes careful precision to write a concurrency safe queue in postgres.

FDB is an ideal datastore to go zero-to-one on an experiment like this. Going from one-to-two will take a serious rethink of the backend interface. Bedrock and Hobbes are both inspired by FDB, so they are prime candidates.

Even still, how do we generalize versionstamps and watches? - an open question.

I’m hoping to keep :dgen_backend’s key-value abstractions mostly in place, and to avoid making it too SQL-y. The tradeoff of generalization is that you lose the fidelity of the best-fitting implementation.

1 Like
  • ra: Raft-consensus state machine. ra is great, and provides the same basic ideas. You are responsible for forming the cluster with your BEAM nodes.
  • oban: Job processing with Postgres, MySQL, Sqlite, and others. It’s a message queue + your code. Universally loved, I hear it’s a great project.
  • Process registries such as global, Registry, and Horde
2 Likes

What about mnesia, I feel like that was kinda design for persistent configuration state like what we keep in a GenSever.

1 Like

Yes, absolutely, I am with you.

I, like most commercial devs, simply have operational concerns and in this case – worries about a split-brain problem. I am desperately looking for something like Ash team’s Reactor or Flowstone that gives me an Oban-like persistent workflow orchestration with some extra bells and whistles, but that’s obviously the Y in the XY problem; persistent GenServers would do this just the same, if not better (I bet on the latter). Hence my interest in your library.

Or maybe there’s a way to embed FDB a la SQLite3? Can you give me some reading to do on what would it take to use FDB in an Erlang/Elixir project with operational concerns minimized?

And to circle back to the split-brain problem: we have a SaaS in our backend and I more or less hate it at this point – slow (45-60ms per request in prod, and we sometimes need multiple to do a full business operation), has async / eventual consistency traps, and the little value it brings is completely overshadowed by its drawbacks in general.

We need a good orchestrator before removing it though.

I’ll try DGen in the next few weeks and will let you know how it is holding up for us.

This is how I run FDB: ex_fdbmonitor | ExampleApp

It’s not an embedded database. You’re still self-hosting a service with this method, and going to production does require an operational investment in the technology, so I understand the hesitation.

1 Like

Why is it “distributed GenServer”? I don’t see any “distributed” here, it’s more like “durable” or even “persistent”

General idea of durable state machines is nice and very useful, but implementing it with GenServer semantics is a dead end. I think that it is a bad pattern and a very misleading approach.

If I were to make this library a good one, I’d think about it like this:

DGenServer does not work as a GenServer in any sense.

You can’t safely restart this server in order to reset the state, which is a big deal when it comes to how OTP approaches failure safety. Then, performance is completely different. Then, timeouts handling is completely different. Then, interface is completely different (postponing side effects, etc). Then, use-case is completely different. In the end, any comparison with GenServer is wrong and is bringing more harm than good for users who may think that they can just replace their use GenServer with use DGenServer and go on.

So, I’d change the name to something like DurableStateMachine.

Messages

Even if you rename it and change the interface, there is still a problem: input comes from messages! That is very bad. Most of messages are temporary and contain temporary data like pids, references. Monitors and links result in temporary data. And you save them.

So, I’d separate the state machine from BEAM’s message boxes and remove this process-based handle_info, handle_call, etc. semantic completely.

I’d have just handle_input with manual state handling, without any processes involved.

Diff

You do diffs to not save the whole state, but your diff implementation is best effort. If you give user the ability to handle the update themselves, it would bring a huge performance benefit in some cases

Batching

And it would be nice to have the ability to batch the changes, because FDB works much more efficiently with one transaction changing 100 keys than 100 subsequent transactions changing 1 key


So, it would look something like this

defmodule MyStateMachine do
   use DurableStateMachine
   
   @impl true
   def init(id) do
     {:ok, %{id: id, status: :off, calls: 0}}
   end

   def switch(state) do
     with {:ok, state} <- handle_input(:switch, state) do
       Logger.info("Switched #{state.id} to #{state.status}")
       Logger.debug("#{state.id} received #{state.calls} so far")
       {:ok, state}
     end
   end
   
   @impl true
   def handle_input(:switch, state) do
     next_status =
       case state.status do
         :off -> :on
         :on -> :off
       end

     next_calls = state.calls + 1
 
     # Only status is saved, because `calls` is for debugging
     DurableStateMachine.set({state.id, :status}, next_status)
     
     {:ok, %{state | status: next_status, calls: next_calls}}
   end
end

# And used like this

id = # Some ID here
{:ok, state} = MyStateMachine.get_or_create_new(id)
{:ok, state} = MyStateMachine.switch(state)
IO.inspect state.status

But this functionality has no batching. If you were to add batching, you’d have to make the transaction explicit and it would make the whole need for library obsolete.

You might want to take a look at my approach to the problem: GitHub - am-kantox/peeper: Almost drop-in replacement for `GenServer` to preserve state between crashes

It uses a heir subprocess and preserves not only the state of the GenServer, but also its process dictionary and its ets tables.

1 Like