DNA - Cloudflare Durable Objects style stateful actors for Elixir

DNA is a powerful building block for stateful distributed applications, taking inspiration from Cloudflare’s Durable Objects & Microsoft Orleans. This library streamlines the development of distributed applications while offering a simple, user-friendly API for creating stateful actors.

It uses Scylladb for actors to optionally persist state and to manage clustered actor registration that is resilient against net-splits (Scylla LWT). This is required to guarantee that a single actor is working with the storage bucket at a given time.

It’s still early development and a work-in-progress so there’s no hex package yet, but the code can be accessed at GitHub - cleaton/dna: Durable Named Actors (DNA) - A powerful framework for building stateful distributed applications.
It also includes a demo phoenix application https://dna-demo.fly.dev/

Any suggestions, feedback or help to develop is greatly appreciated!
Thanks!

Example usage:

defmodule MyActor do
  use Dna.Actor
  alias Dna.Storage.KV

  defmodule API do
    alias Dna.Server
    # Define put, put_cast, and get functions for MyActor
    def put(name, key, value) do
      Server.call(MyActor, actor_name(name), {:put, key, value})
    end

    def put_cast(name, key, value) do
      Server.cast(MyActor, actor_name(name), {:put, key, value})
    end

    def get(name, key) do
      Server.call(MyActor, actor_name(name), {:get, key})
    end

    # Generate a unique actor name
    defp actor_name(name) do
      # namespace, actor_id, name
      {"test", 0, name}
    end
  end

  # Define the storage modules used by the actor
  def storage() do
    %{
      kv: KV.new(),
    }
  end

  # Initialize in-memory state for the actor
  def init(_actorname, _storage) do
    {:ok, %{}}
  end

  # handle call/cast events. Internally batched for performance
  # Choose to reply to caller after storage is persisted

  # Reply immediately as no storage is mutated
  def handle_call({:get, key}, _, state, %{kv: kv}) do
    {:reply, KV.read(kv, key), state}
  end

  # Reply after storage is persisted (end of each batch, 1~100msg)
  def handle_call({:put, key, value}, _, state, %{kv: kv}) do
    {:reply_sync, :ok, state, %{kv: KV.write(kv, key, value)}}
  end

  # Storage not immediately persisted, will persist at end of batch (1~100msg)
  def handle_cast({:put, key, value}, state, %{kv: kv}) do
    {:noreply, state, %{kv: KV.write(kv, key, value)}}
  end
end
4 Likes

Thanks for sharing!

After looking through the example app and some of the library code, I feel like there’s a lot of reinventing the wheel in terms of the public API. I wonder if there would be a way to use existing OTP idioms/abstractions. Some thoughts and examples of unfamiliar API:

  • Batched event handling instead of separate handle_call/cast/info. In the example in the OP, I’m not sure this actually provides any real “optimization” since KV writes are happening separately in do_event calls anyways. DNA is dispatching the events and could be calling the appropriate functions, only persisting after it’s dispatched the entire batch of events. I’m not sure that this needs to be exposed to the user at all.
  • Related to the above, but somewhat strange reply handling. In the example app, for instance, the actor has to manually aggregate and dispatch replies. This also feels like something that should be handled by the abstraction.
  • Would it be possible to handle naming through the Registry API and use :via tuples to refer to actors?

From the perspective of adoption, “custom abstraction” is a much harder sell than “your GenServer now has superpowers”. If I could write and test a normal GenServer, then replace use GenServer with use Dna.Server and get distributed named servers with persistence and durability, that would be pretty compelling :slight_smile:

Thank you for the feedback! Indeed the current API needs some work I’ll try to provide some reasoning for current state:

  1. Each actors “storage module” collects all insert/delete statements and flushes them all to a single scylladb partition as a batched statement. storage modules are flushed after call/cast which causes significant single actor throughput loss when events are not batched (though probably still fast enough for many if not most applications). Another way could be to use asynchronous persistence but that causes a lot of other problems: Durable Objects: Easy, Fast, Correct — Choose three

  2. Since there’s state involved, there are some side-effects like reply, calling external service etc that you only want to do after the storage module is persisted. This is what the example does, which makes it more complicated. It’s an great idea to add abstractions for common use cases like this one.

  3. That was indeed something I did consider, as I remember the biggest blocker was that I wanted to automatically “start” an actor if it was not running, which seemed impossible with the current Registry API.

Having it completely compatible with a regular GenServer implementation might restrict features too much, but as close as possible is the goal, but I’m not there yet :slight_smile:

There are many challenges to solve, like migrating or rebalancing actors across the cluster : - )

That’s an interesting project, however, after reading code and examples, I have some questions and notes:

  1. Cluster assumes that nodes have synchronized time (e.g., via NTP)

This claim will be a show stopper for most of the distributed systems. As you may know, NTP does not synchronize time to be the same on all the nodes. Time synchronization algorithms can guarantee that the time difference on the nodes will always be not more than 2 * maximum_latency.

  1. dna/lib/dna/db/db.ex at main · cleaton/dna · GitHub
    This function is very smelly

  2. You can use GenServer.on_start() type as a result of start_link functions like here: dna/lib/dna/db/db.ex at main · cleaton/dna · GitHub

  3. There is a problem with Partition starting Registry and DynamicSupervisor linked directly into it, while I think it should be better linked to another supervisor, with Partition server alongside, using one_for_all strategy

  4. You should never update persistent_term in runtime, since it will trigger stop-the-world gc in every process on the local node. dna/lib/dna/server/cluster.ex at main · cleaton/dna · GitHub

  5. Whole Cluster module is unnecessary, since Erlang runtime automatically heartbeats the nodes. Plus there is a builtin node monitoring functionality.
    https://www.erlang.org/doc/man/net_kernel.html#monitor_nodes-1

  6. You should never use System.system_time/0 for time synchronization, since it is not monotonic and can change drastically in different environments. For monotonic time, you must always use :erlang.monotonic_time(). dna/lib/dna/server/cluster.ex at main · cleaton/dna · GitHub

  7. And the architectural problem here is that ScyllaDB is also a part of the cluster, and it can be disconnected from some nodes, while remain connected to the others. This kind of net split should be the main concern in DNA application. As far as I can see, disconnected or overloaded database will leave a part of cluster unavailable.

  8. I think you should also invest some time into the quality of code. mix format, mix credo and mix dialyzer trinity is the common choice for any application.

But I love the idea of the project. If I was up to writing something like this, I would

  1. Separate distributed supervisor/registry from actor state persistence.
  2. Use rabbitmq’s ra library, which implements the Raft consensus protocol. But plugging into it can be nontrivial, so I’d use a key-value store implemented upon ra (I can’t quite recall the exact name right now).
  3. Use net_kernel for connectivity and cluster topology management in distributed supervisor/registry.
  4. Leave state persistence for the user to implement.
  1. Orleans also allow users to provide their own state storage. Company I work we used Orleans at one point and I wrote our own adapter and it was very easy because anything that can work as kv storage can work. If someone is only running only single node even SQLite might make lot of sense.

Not always. And never is a strong word. Probably shouldn’t do it in a library though.

When a (complex) term is deleted (using erase/1) or replaced by another (using put/2), a global garbage collection is initiated. It works like this:

  • All processes in the system will be scheduled to run a scan of their heaps for the term that has been deleted. While such scan is relatively light-weight, if there are many processes, the system can become less responsive until all processes have scanned their heaps.
  • If the deleted term (or any part of it) is still used by a process, that process will do a major (fullsweep) garbage collection and copy the term into the process. However, at most two processes at a time will be scheduled to do that kind of garbage collection.

Deletion of atoms and other terms that fit in one machine word is specially optimized to avoid doing a global GC. It is still not recommended to update persistent terms with such values too frequently because the hash table holding the keys is copied every time a persistent term is updated.

2 Likes

Thank you for reviewing the project, you bring up fair points.

  1. I don’t agree that NTP synchronization is a show stopper for most distributed systems, there are many large systems that requires them such as cockroachdb etc (Operational FAQs). For DNA, it uses heartbeats and scylladb LWT to create an aliveness lease for which other nodes are not allowed to takeover Actors even if they are unable to connect to it. because of this the time synchronization is not super strict if larger timeouts are configured. I am considering if adding HLC would help but I am not convinced of it, but at least there should be a system to detect and error on large clock-skews. Another alternative I’ve considered is to bind node-id to the storage bucket and use scylladb conditional batches to prevent duplicate node writes to same bucket, however it might cause large db overhead.

  2. This a macro to automatically detect new table modules and run their migration function, without having to also manually add them to the library init. What do you find very smelly about it?

  3. Thanks for sharing, I didn’t occur to me to check if GenServer provided these type aliases.

  4. I opted not to add one more Supervisor here and instead link directly to Partition to reduce the layers and keep it more simple. What problems do you foresee with this approach if I don’t intend to add other children in the future?

  5. Fair, persistent_term might not be good for libraries to use. In this case the Cluster module only uses it on init to store it’s ID, and a crash/restart of cluster which I would consider a stop the world event as it also triggers the whole supervision tree to restart. I am not aware if there’s an general guideline for libraries use of persistent_term.

  6. The cluster module is essentially membership discovery and binding aliveness to each nodes ability to access the storage. The reason I bind it to storage is because I want nodes that are unable to access the DB to disconnect and remove themselves. It’s still using net_kernel for the cluster networking, and automatic reconnects. Even ra raft that you mentioned does not rely on net_kernel for aliveness and timeout, instead they use GitHub - rabbitmq/aten: An adaptive accrual node failure detection library for Elixir and Erlang .

  7. indeed system_time might skew, I think it’s OK as long as the skews can be controlled and monitored within a threshold, If someone has an idea on how to completely avoid wallclock time I am very interested to hear it. as for monotonic_time, it can only be used to measure duration within a single runtime instance not cross node/ or even cross runtime on the same node I believe.

  8. Yes, for now this is a conscious decision. I consider loss of storage access a critical situation in which the server should restart & retry. If scylladb is overloaded and unavailable actors that wants to use their state would be unavailable as well.

  9. Yeah, It’s on my todo, for now I focus on experimenting with API and functionality, once things stabilize I’ll work on improving the linting. Maybe these days asking chatgpt to rewrite/ improve module is also part of the cleanup workflow… :slight_smile:

Thanks!

For now it’s closer to Durable Objects than Orleans. A generic implementation like Orleans creates a lot of complexity, I first want to have something that is just plug-and-play opinionated solution. That said, I think it should be fine to load state from PostgreSQL or other store on init, and just use DNA for event routing etc.

Not sure if it has any help to you but have you looked at Microsoft’s another durable objects project called Dapr?

1 Like

I don’t think I’ve looked at this for a long time, thanks for sharing looks like they have been busy!

So you don’t actually need the NTP synchronization then. You see, you’re using single source of truth model, and you can just use ScyllaDBs builtin timestamps which are guaranteed to be the same in every query running on the single server. There’s no reason to use Elixir’s or Erlang’s timestamps. And also, you can use notification model, where you can just notify every member of the cluster on every update in the actor’s table, and then every node will spin up their own timer for takeover. Does ScyllaDB support listeners? I think it does.

Why not to add them in the library init? You’re not even saving any lines, but introducing implicit behavior. This code assumes that module names are just a CamelCase version of file names, which is not obvious. Then, it assumes that all these tables are present in the subdirectory, which is not obvious too. And what’s the advantage? Specifying all tables modules is even shorter, easier to read and more explicit.

Timeouts. Supervisor set’s up the timeout for each restart, while in your case this timeout may never occur. For example, in my solution, Supervisor will exit when any of the children restarts every N seconds. While in your solution, supervisor will exit when summary of all children restarts is more than N seconds. That’s a common problem with this kind of solutions, and it is not difficult to refactor, to be honest.

Why do you believe so? monotonic_time is adjusted to follow system time, but in monotonic fashion. And you can’t use it to measure time on a single node, since monotonic_time can become increasing slower or faster than actual time. You need monotonicity for your algorithms, otherwise you’ll have interesting bugs. Imagine your cluster gets disconnected from the internet for a week and then gets reconnected back, your time will get synchronized with external NTP server and one node will go backwards in time first, write the timestamp, and another node will check the timestamp, see that is happened a long time ago and takeover the actors.

First of all, your Cluster module is not aten. As you may have found out while reading the aten’s source code, aten is using probability analysis to check if node is disconnected with custom heartbeat and net_kernel monitoring, while you’re using very primitive heartbeat without net_kernel monitoring. The main mechanism used here is still net_kernel monitoring, the heartbeat works alongside, providing probability markers for rabbitmq’s SLA

If you’re interested in development of this project without ScyllaDB or any other CQL/SQL database, I can offer some help in development and testing

1 Like

I’ve had some time to add simplified API that still internally does the same style of batching, it’s now much more in line with regular GenServer, the updated example:

defmodule MyActor do
  use Dna.Actor
  alias Dna.Storage.KV

  # Define the storage modules used by the actor
  def storage() do
    %{
      kv: KV.new(),
    }
  end

  # Initialize in-memory state for the actor
  def init(_actorname, _storage) do
    {:ok, %{}}
  end

  # handle call/cast events. Internally batched for performance
  # Choose to reply to caller after storage is persisted

  # Reply immediately as no storage is mutated
  def handle_call({:get, key}, _, state, %{kv: kv}) do
    {:reply, KV.read(kv, key), state}
  end

  # Reply after storage is persisted (end of each batch, 1~100msg)
  def handle_call({:put, key, value}, _, state, %{kv: kv}) do
    {:reply_sync, :ok, state, %{kv: KV.write(kv, key, value)}}
  end

  # Storage not immediately persisted, will persist at end of batch (1~100msg)
  def handle_cast({:put, key, value}, state, %{kv: kv}) do
    {:noreply, state, %{kv: KV.write(kv, key, value)}}
  end
end
1 Like