Erleans now with Elixir and Ecto

Erleans is a port of Microsoft Orleans, a framework for building distributed applications to BEAM.

It originally got its start to fill a need in a now defunct project. I’ve finally taken the time to do some updates and redesign. Along with that I thought some life might be able to be breathed into it by adding an Elixir interface and Ecto persistence provider.

Orleans documentation about what Grains (virtual actors) are and how they are persisted is useful right now as I have written up similar detailed documentation for Erleans itself.

I put together a simple example in Elixir. With a grain looking like:

defmodule ErleansElixirExample do
  use Erleans.Grain,
    placement: :prefer_local,
    provider: :postgres,
    state: %{:counter => 0}

  def get(ref) do
    :erleans_grain.call(ref, :get)
  end

  def increment(ref) do
    :erleans_grain.cast(ref, :increment)
  end

  def handle_call(:get, from, state = %{:counter => counter}) do
    {:ok, state, [{:reply, from, counter}]}
  end

  def handle_cast(:increment, state = %{:counter => counter}) do
    new_state = %{state | :counter => counter + 1}
    {:ok, new_state, [:save_state]}
  end
end

There is still much to do and I plan to update this thread as I go about it over the coming weeks. First item up is making it so the Ecto MySQL support actually works, this requires small changes to Erleans itself for how providers work so the same code can be shared between the MySQL and Postgres Ecto providers.

Then new releases of partisan and lasp.

Lots of tests and documentation to write as well. Then can get to working on updating the streaming support.

So it is still early but I hope some might be curious and want to take a look.

29 Likes

On the partisan page you link to there is a comparison of latency of “Erlang Distribution” vs. Partisan. Completely omits the Erlang/OTP version the tests were made. I assume it was done before OTP22 which fixes the head of line blocking which is responsible for the favourable comparison.

Would consider going with normal Erlang Distribution nowadays instead.

3 Likes

Very good point. I’ll see if Chris can run those benchmarks again.

I’ve spoken to him about it before and the changes in OTP help but they aren’t all that is needed, nor all that partisan does to improve messaging.

Of course there is also the fact partisan supports non-full-mesh configurations. But that doesn’t apply to Erleans since we rely on a feature for lasp_pg that requires a full mesh.

2 Likes

Well Erlang distribution doesn’t require a fully connected mesh either if you don’t need to reach any node directly. Besides hacking around it with hidden node which had been there forever there now is also:

-connect_all false

If this flag is present, global will not maintain a fully connected network of distributed Erlang nodes, and then global name registration cannot be used. See global(3).

The only thing thats missing is that we forward messages to nodes not directly connected but for gossip protocol like applications thats not even necessary, would just work out of the box

Right, I know, but partisan handle the forwarding and gossiping about who is in the cluster :slight_smile:

Is it actually forwarding towards a target or rather distributing information via (hybrid) gossip over the whole network?

It forwards towards a target.

This is really neat! Thank you! :slight_smile:

I was looking at the get_grain function and the type spec suggests it never fails. Is the idea that act grain actor is defined by a module, and the grain actor for each module is always alive? With the underlying processes and any failures abstracted away?

1 Like

Correct, get_grain never fails. The grains are “virtual actors”, so they activate only when used and may deactivate later when not in use. If they crash they are either restarted or simply reactivated when another message for the grain is sent.

3 Likes

Thank you for the explaination.

I could see how this might be a nice fit for statically typed languages as you don’t need to pass around pids, something that is tricky with OTP supervision trees. I may have to have a go making a Gleam wrapper when I have time :slight_smile:

3 Likes

That would be awesome! And I’d be happy to help.

And F#'s Orleans wrapper http://orleanscontrib.github.io/Orleankka/ might provide some inspiration.

1 Like

This concept is interesting I started experimenting with it last month a little (was not aware of Erleans at the time).

Any good ideas on how to handle distribution problems? Like binding grains to clusters/georegions/uniquely across a globally distributed cluster.

I didnt push the implementation I brainstormed yet but its basically the new PG module to register actors, Mnesia for persistence, mnesia subscriptions for stopping/starting/hot_adding new grains. Example to stop a grain just delete it from mnesia. A GrainSupervisor with functions like start/stop tick (which is really a pause, the grain still runs and procs messages just does not tick). Every grain has a tick_interval by default which is 1000 ms.

Some other functionality I was thinking to add was keyed messages, so unique messages that any grain can pick up and work on, addressed to that type of grain. Example, persistent_msg = %{type: Order, key: 555444, meta: %{}}, if another order with the same key gets added to the persistent_msg queue, it gets dropped.

Process.get/put is used for temporary state that dies with the grain, and the return off the tick/1 or message/2 persists as the state.

defmodule Grain.OrderFeeder do
    use Actor

    def tick(state) do
        ts_m = :os.system_time(1000)
        if !Process.get(:inited) do
            Process.put(:inited, true)
            FileLogger.use_group_leader("/tmp/grain_orderfeeder")
        end

        cond do
          ts_m > (state[:next_sync]||0) ->
            log(state, "syncing")
            Orders.sync()
            put_in(state, [:next_sync], ts_m+30_000)
          true ->
           state
        end
    end
end

Grain.OrderFeeder.unique()
defmodule Actor do
  defmacro __using__(_) do
    quote do
      def new(params \\ %{}) do
        uuid = params[:uuid] || Db.uuid_base62()
        args = %{mod: __MODULE__, uuid: uuid}
        args = if !params[:name], do: args, else: Map.put(args, :name, params.name)
        args = if !params[:tick_interval],
            do: Map.put(args, :tick_interval, 1000),
            else: Map.put(args, :tick_interval, params.tick_interval)
        args = if !params[:state], do: args, else: Map.merge(args, params.state)
        Db.insert_keyed(Actor, args.uuid, args)
        args.uuid
      end

      def unique(params \\ %{}) do
        pattern = Map.merge(%{mod: __MODULE__}, params)
        case Db.match_object(Actor, :_, pattern) do
            [] -> new(params)
            _ -> nil
        end
      end

      def message(_message, state) do
        state
      end

      def tick(state) do
        state
      end

      def init(state) do
        pid = self()
        true = :erlang.register(:"#{state.uuid}", pid)
        :pg.join(PGActor, pid)
        :pg.join(PGActor, state.uuid, pid)
        if state[:name] do
            :pg.join(PGActor, state.name, pid)
        end
        :proc_lib.init_ack({:ok, pid})
        loop(state)
      end

      def loop(old_state) do
        state = loop_flush_messages(old_state)
        new_state = if state[:enabled] != false do
            tick(state)
        else
            state
        end

        cond do
            new_state == :erase ->
                Db.delete(Actor, state.uuid)

            old_state == new_state ->
                Process.sleep(Map.get(new_state, :tick_interval, 1000))
                __MODULE__.loop(new_state)

            old_state != new_state ->
                Db.merge(Actor, new_state.uuid, new_state)
                Process.sleep(Map.get(new_state, :tick_interval, 1000))
                __MODULE__.loop(new_state)
        end
      end

      def loop_flush_messages(state) do
        receive do
            {ActorMsg, :start} ->
                state = Map.delete(state, :enabled)
                loop_flush_messages(state)
            {ActorMsg, :stop} ->
                state = Map.put(state, :enabled, false)
                loop_flush_messages(state)
            {ActorMsg, :update, new_state} ->
                state = Db.merge_nested(state, new_state)
                loop_flush_messages(state)

            msg ->
                state = message(msg, state)
                loop_flush_messages(state)
        after 0 ->
            state
        end
      end

      def all() do
          Db.match_object(Actor, :_, %{mod: __MODULE__})
      end

      def start_all() do
          all_actors = all()
          Enum.each(all_actors, & ActorSupervisor.start(&1.uuid))
      end

      def stop_all() do
          all_actors = all()
          Enum.each(all_actors, & ActorSupervisor.stop(&1.uuid))
      end

      def delete_all() do
          all_actors = all()
          Enum.each(all_actors, & Db.delete(Actor, &1.uuid))
      end

      def log(state, line) do
          time = String.slice("#{NaiveDateTime.utc_now()}",0..-4)
          mod = "#{__MODULE__}" |> String.trim("Elixir.")
          IO.inspect "#{time} #{mod} | #{line}"
      end

      #defoverridable new: 1
      defoverridable message: 2
      defoverridable tick: 1
      defoverridable log: 2
    end
  end
end

Spawning Erlang procs gets very expensive especially if they need to allocate more memory than the default min_heap_size. But some kind of next_tick timeout will work better than always ticking every grain. This was just a quick draft.