This concept is interesting I started experimenting with it last month a little (was not aware of Erleans at the time).
Any good ideas on how to handle distribution problems? Like binding grains to clusters/georegions/uniquely across a globally distributed cluster.
I didnt push the implementation I brainstormed yet but its basically the new PG module to register actors, Mnesia for persistence, mnesia subscriptions for stopping/starting/hot_adding new grains. Example to stop a grain just delete it from mnesia. A GrainSupervisor with functions like start/stop tick (which is really a pause, the grain still runs and procs messages just does not tick). Every grain has a tick_interval by default which is 1000 ms.
Some other functionality I was thinking to add was keyed messages, so unique messages that any grain can pick up and work on, addressed to that type of grain. Example, persistent_msg = %{type: Order, key: 555444, meta: %{}}, if another order with the same key gets added to the persistent_msg queue, it gets dropped.
Process.get/put is used for temporary state that dies with the grain, and the return off the tick/1 or message/2 persists as the state.
defmodule Grain.OrderFeeder do
use Actor
def tick(state) do
ts_m = :os.system_time(1000)
if !Process.get(:inited) do
Process.put(:inited, true)
FileLogger.use_group_leader("/tmp/grain_orderfeeder")
end
cond do
ts_m > (state[:next_sync]||0) ->
log(state, "syncing")
Orders.sync()
put_in(state, [:next_sync], ts_m+30_000)
true ->
state
end
end
end
Grain.OrderFeeder.unique()
defmodule Actor do
defmacro __using__(_) do
quote do
def new(params \\ %{}) do
uuid = params[:uuid] || Db.uuid_base62()
args = %{mod: __MODULE__, uuid: uuid}
args = if !params[:name], do: args, else: Map.put(args, :name, params.name)
args = if !params[:tick_interval],
do: Map.put(args, :tick_interval, 1000),
else: Map.put(args, :tick_interval, params.tick_interval)
args = if !params[:state], do: args, else: Map.merge(args, params.state)
Db.insert_keyed(Actor, args.uuid, args)
args.uuid
end
def unique(params \\ %{}) do
pattern = Map.merge(%{mod: __MODULE__}, params)
case Db.match_object(Actor, :_, pattern) do
[] -> new(params)
_ -> nil
end
end
def message(_message, state) do
state
end
def tick(state) do
state
end
def init(state) do
pid = self()
true = :erlang.register(:"#{state.uuid}", pid)
:pg.join(PGActor, pid)
:pg.join(PGActor, state.uuid, pid)
if state[:name] do
:pg.join(PGActor, state.name, pid)
end
:proc_lib.init_ack({:ok, pid})
loop(state)
end
def loop(old_state) do
state = loop_flush_messages(old_state)
new_state = if state[:enabled] != false do
tick(state)
else
state
end
cond do
new_state == :erase ->
Db.delete(Actor, state.uuid)
old_state == new_state ->
Process.sleep(Map.get(new_state, :tick_interval, 1000))
__MODULE__.loop(new_state)
old_state != new_state ->
Db.merge(Actor, new_state.uuid, new_state)
Process.sleep(Map.get(new_state, :tick_interval, 1000))
__MODULE__.loop(new_state)
end
end
def loop_flush_messages(state) do
receive do
{ActorMsg, :start} ->
state = Map.delete(state, :enabled)
loop_flush_messages(state)
{ActorMsg, :stop} ->
state = Map.put(state, :enabled, false)
loop_flush_messages(state)
{ActorMsg, :update, new_state} ->
state = Db.merge_nested(state, new_state)
loop_flush_messages(state)
msg ->
state = message(msg, state)
loop_flush_messages(state)
after 0 ->
state
end
end
def all() do
Db.match_object(Actor, :_, %{mod: __MODULE__})
end
def start_all() do
all_actors = all()
Enum.each(all_actors, & ActorSupervisor.start(&1.uuid))
end
def stop_all() do
all_actors = all()
Enum.each(all_actors, & ActorSupervisor.stop(&1.uuid))
end
def delete_all() do
all_actors = all()
Enum.each(all_actors, & Db.delete(Actor, &1.uuid))
end
def log(state, line) do
time = String.slice("#{NaiveDateTime.utc_now()}",0..-4)
mod = "#{__MODULE__}" |> String.trim("Elixir.")
IO.inspect "#{time} #{mod} | #{line}"
end
#defoverridable new: 1
defoverridable message: 2
defoverridable tick: 1
defoverridable log: 2
end
end
end
Spawning Erlang procs gets very expensive especially if they need to allocate more memory than the default min_heap_size. But some kind of next_tick timeout will work better than always ticking every grain. This was just a quick draft.