A distributed counter is one of the use cases, we’re thinking about when introducing the Firenest.ReplicatedState
abstraction in the firnest project. As an example, with the interface we have planned right now, a distributed counter could look like the following.
Each process can register itself for tracking and increment/decrement the counter. When the process goes down, its data is removed (when a node goes down other nodes remove data for all processes from the dead node).
defmodule DistributedCounter do
alias Firenest.ReplicatedState
@behaviour ReplicatedState
def child_spec(opts) do
Firenest.ReplicatedState.child_spec(topology: MyApp.FirenestTopology, name: opts[:name], handler: __MODULE__)
end
def track(server, key) do
ReplicatedState.put(server, key, self(), 0)
# calls local_put as the callback inside the server
end
def increment(server, key, by) when is_integer(by) do
ReplicatedState.update(server, key, self(), {:increment, by})
# calls local_update as the callback inside the server
end
def untrack(server, key) do
ReplicatedState.delete(server, key, self())
# calls local_delete as the callback inside the server
end
def get(server, key) do
# list returns a value for each process tracking the state - both local and remote,
# we just sum them to get the final value
Enum.sum(ReplicatedState.list(server, key))
end
@impl true
def init(_opts) do
{0, _config = %{}}
end
@impl true
def local_put(state, _config) do
{:ok, state}
end
@impl true
def local_update({:increment, by}, _delta, state, _config) do
# we don't use precise data tracking, so we just use the state as out delta that will be
# propagated to remote servers in the handle_remote_delta callback
{_state = state + by, _delta = state + by}
end
@impl true
def handle_remote_delta(remote_delta, _old_state, _config) do
# since the remote delta is just the remote state, we don't need to do any
# state mutation and the remote delta is the new state
remote_delta
end
end
The same abstraction will be used to re-implement Phoenix.Presence
and possibly other things - it seems quite flexible. I’d recommend reading the docs in the linked PR - while the implementation is not ready, the docs should be close to the final thing we want. There’s also a mechanism for precise tracking of state changes (with the observe_remote_deltas
callback which is not shown here).