darkmarmot
Using Registry as a counter? (or better alternatives?)
I have many actor processes in my application with event counts associated with each actor.
I’d like to have a live sum of the total actor events across the node (basically map-reduce across the processes).
I was thinking that I could register each process in the Registry with a count – and store a separate sum in the Registry that gets incremented with new events and decremented when actors crash (using trap_exit?)
Does that sound like an appropriate solution in Elixir or is there a more canonical OTP way to handle this?
Thanks,
Scott S.
Most Liked
michalmuskala
This is my go to reference for when terminate/2 is called: https://gist.github.com/mrallen1/806fe5506132260574af33e99dadd499
sasajuric
This is a very good question. I personally mostly avoid terminate, because it won’t be invoked if the process crashes, or if it’s forcefully terminated (killed) from the outside. Thus, if some cleanup code must be executed, I prefer having another process to do it.
However, using a cleanup process isn’t synchronous, since the cleanup code will run after the “main” process has terminated. Therefore, there are some special cases where terminate works better. For example, supervisor terminates children from the terminate callback. This ensures that when the supervisor goes down, its complete subtree is already down. I can’t think of a way to ensure such synchronism by using a separate cleanup process.
However, this approach suffers from potential theoretical issues. If a supervisor process is brutally killed or if it crashes, then this guarantee doesn’t hold. The child processes will still be taken down eventually, but not immediately. If some descendant is trapping exits and ends up in an infinite loop, it might never happen. This in turn could prevent the restart of the crashed supervisor, which could take down the entire system, or it could lead to duplicate processes running, which could cause some strange behaviour of the system.
However, we can assume that the supervisor process is thoroughly tested and hopefully free from unexpected crashes. In addition, in a properly constructed supervision tree, a supervisor is never brutally killed (because :shutdown of a supervisor is :infinity by default), so I’d say that these issues are theoretical, and very unlikely to occur in practice.
So my take would be to use terminate to implement a synchronous cleanup (things are cleaned up before the process terminates). For example, I use it in Parent to terminate children, similarly to supervisor. In such cases, you probably want to keep the logic of the process simple, to reduce the chance of it crashing. You may also consider setting it’s shutdown option to :infinity to prevent its parent from brutally killing it.
If the synchronism is not required, and a cleanup code must be executed when a process goes down, regardless of how/why it goes down, I’d suggest using a separate process.
michalmuskala
A distributed counter is one of the use cases, we’re thinking about when introducing the Firenest.ReplicatedState abstraction in the firnest project. As an example, with the interface we have planned right now, a distributed counter could look like the following.
Each process can register itself for tracking and increment/decrement the counter. When the process goes down, its data is removed (when a node goes down other nodes remove data for all processes from the dead node).
defmodule DistributedCounter do
alias Firenest.ReplicatedState
@behaviour ReplicatedState
def child_spec(opts) do
Firenest.ReplicatedState.child_spec(topology: MyApp.FirenestTopology, name: opts[:name], handler: __MODULE__)
end
def track(server, key) do
ReplicatedState.put(server, key, self(), 0)
# calls local_put as the callback inside the server
end
def increment(server, key, by) when is_integer(by) do
ReplicatedState.update(server, key, self(), {:increment, by})
# calls local_update as the callback inside the server
end
def untrack(server, key) do
ReplicatedState.delete(server, key, self())
# calls local_delete as the callback inside the server
end
def get(server, key) do
# list returns a value for each process tracking the state - both local and remote,
# we just sum them to get the final value
Enum.sum(ReplicatedState.list(server, key))
end
@impl true
def init(_opts) do
{0, _config = %{}}
end
@impl true
def local_put(state, _config) do
{:ok, state}
end
@impl true
def local_update({:increment, by}, _delta, state, _config) do
# we don't use precise data tracking, so we just use the state as out delta that will be
# propagated to remote servers in the handle_remote_delta callback
{_state = state + by, _delta = state + by}
end
@impl true
def handle_remote_delta(remote_delta, _old_state, _config) do
# since the remote delta is just the remote state, we don't need to do any
# state mutation and the remote delta is the new state
remote_delta
end
end
The same abstraction will be used to re-implement Phoenix.Presence and possibly other things - it seems quite flexible. I’d recommend reading the docs in the linked PR - while the implementation is not ready, the docs should be close to the final thing we want. There’s also a mechanism for precise tracking of state changes (with the observe_remote_deltas callback which is not shown here).








