Implementing a distributed users counter

Hey everyone!

I’m struggling to get Phoenix.Presence to scale up sufficiently for our needs. I feel like Presence might be overkill in this situation, but I’m not 100% sure what the alternatives would be.

Our Issue

The Presence.track is timing out around 80-100k connections. Our load tests are joining and leaving LOTS of times every second. That creates lots of traffic for the presence diff. Eventually, the whole cluster just stops tracking until it has time to catch up.

Some Things We Have Tried

  • Reimplemented Phoenix.Tracker and took out the broadcast on join/leave
  • Implemented a dirty count that reaches into Phoenix.Tracker.Shard.dirty_list and then does not group them, but just counts them, to reduce cycles during the count

We’re basically just doing presence so that we can get a valid count of the connected users. What are some alternatives to phoenix presence when you only need to track a count?

5 Likes

If all you want is the connected users, the presense is definitely overkill, because you are sharing each connected user and their metadata across nodes when all you need is a counter.

A simpler solution is to share only the counter. Here is a high-level outline, broken in two steps of one possible approach. I will be glad to clarify any possible points and answers questions.

Step 1: basic setup

Every time a user connects, you send a message to a process with your PID. We will call the receiveing process the “LocalCounter”. The LocalCounter will bump its internal counter when it receives said message and monitor the PID. Once it receives the DOWN message, it decreases the counter.

You will also have a separate process, which is the GlobalCounter. The GlobalCounter will receive updates from other processes in the cluster. You will:

  1. Every X seconds, you will query the local counter and broadcast a message on a “global_counter” topic with your node name and your local counter

  2. Other nodes will receive your message and they should store: the time they received the message, the node name, and the counter

  3. The total counter is the sum of local counter with all other global counters

  4. After you broadcast, you should prune any dead node. You can consider a dead node to be any node that you haven’t received a broadcast from after N*X seconds. Alternatively, you can use :erlang.monitor_nodes() to see when nodes go up and down so you can remove those entries immediately

The choice of X is important. X means how frequently you will broadcast, too small means a lot of traffic but always up to date. For example, if X is 5 seconds, it means that you will stay behind from other nodes at most 5 seconds. X is also the maximum time it takes for a new node to receive all updates when it goes up.

Step 2: optimizing

The implementation above has one issue: the LocalCounter will likely become a bottleneck. We can address this by using the :counters module in Erlang and changing it to be a pool of processes. Here is what we will do:

  1. Instead of a single local counter, we will start N local counters. We will also create a use the :counters API to create a counter array of N entries. Each local counter will have an index inside the counter array and update said index.

  2. Now, when you need to track a given PID, you should do :erlang.phash2(pid, N) to select one of the existing local counters. You can use a Registry to track the local counters.

  3. Change the global counter to, instead of asking the local counter its current count, to traverse all indexes in the :counters reference, adding them all. That’s what you will broadcast now.

In pseudo-code, your CounterSupervisor’s init will look like this:

@count 8

def init(_) do
  counter = :counters.new(@count, [:write_concurrency])

  children = [
    {Registry, name: CounterRegistry, kind: :unique},
    {GlobalCounter, counter}
  ] ++
    Enum.map(1..count, fn index ->
      Supervisor.child_spec({LocalRegistry, counter: counter, index: index}, id: index)
    end)

The LocalCounter should register itself under CounterRegistry with the index.

When dispatching to a local counter, you will roughly do this:

def count_me do
  # phash is zero-based
  index = :erlang.phash2(self(), @count) + 1
  name = {:via, Registry, {CounterRegistry, index}}
  GenServer.cast(name, {:count, self()})
29 Likes

Btw, something else to consider is sharding your presence usage, but as I mentioned above, I think presence is the wrong solution to the problem.

3 Likes

Thanks Jose! Your response was very insightful.

We did encounter one hiccup: being able to have separate counts across different topics. The issue is when you receive a :DOWN event from a monitored pid, you don’t know which count to decrement. To solve this, we’ve decided to keep a basic key/value pair of pid to topic record in an ets table. Once you receive a :DOWN event, we’ll check which topic that pid belongs to, and then remove it from the ets table.

I’ll report back with some more code, or perhaps a gist of code for the benefit of future readers.

4 Likes

Yes, if you need to track topics, then instead of using :counters, you can use an ets table and use concurrent and atomic writes to update the topics. And then broadcast all of ets every X seconds.

It is worth adding that the topics approach has another issue: if you have a topic that no user is leaving and joining, i.e. it is constant, then you will broadcast the topic every X seconds, even though the data is the same. So if you have a long tail of topics, it means your payload may be really large, while the tracker version would be more optimized.

Therefore, there is another alternative here, which is to continue using Phoenix.Tracker, but you will track the local counter instead of each individual process. In a nutshell:

  1. Create N local counter processes. It has two keys in its state, the overall state and the diff

  2. Every time a local counter process receives a pid-topic pair to track or a DOWN message, you put that in the diff. If topic “foobar” receives two joins and one leave, the diff will be %{"foobar" => 1}

  3. Every X seconds, you will merge the diff into the overall state. If a new topic was added, you start tracking it from that local counter process. If a topic is removed, you start untracking. If the topic counter was updated, you update its tracking. The current counter will be the metadata.

This means Phoenix.Tracker becomes your “GlobalCounter”, which is quite efficient and optimized. In pseudo-code your counter supervisor would look like:

@count 8

def init(_) do
  children = [
    {Registry, name: CounterRegistry, kind: :unique},
    {Phoenix.Tracker, ...}
  ] ++
    Enum.map(1..count, fn index ->
      Supervisor.child_spec({LocalRegistry, counter: counter, index: index}, id: index)
    end)

The only last thing to consider is how you are going to hash the topic-pid pairs across local counters. You can:

  1. hash by pid, this means that if you have N local counter processes, they can all end-up tracking the same topic. This may increase the payload as the counters are split across local processes

  2. hash by topic, this decreases payload but it may increase contention if there is a topic with dozens of thousands of users

Finally, to get the number of users in a topic, you need to query the given topic and sum all of the members count metadata. If you are hashing by topic, the number of entries will be the number of nodes. If you are hashing by pid, it will be at most the number of nodes * N. In both cases, it should be fast enough to compute on demand. If it isn’t, you can use handle_diff to store the overall results directly on ETS.

5 Likes

We were able to implement our own incantation of this and use it live with over 120k users live. We peaked at around 15% cpu when large batches of users were joining, and leveled out at around ~7% cpu.

I’ve prepared a gist of the main parts of the code for the benefit of future readers.

I still think there are improvements to be made here. One downside is if the node that has the global gen_server running crashes for some reason, we won’t start up a new one.

If anyone ends up using this code and making improvements, please report back!

15 Likes

Hello,

After you broadcast, you should prune any dead node. You can consider a dead node to be any node that you haven’t received a broadcast from after N*X seconds. Alternatively, you can use :erlang.monitor_nodes() to see when nodes go up and down so you can remove those entries immediately

:net_kernel.monitor_nodes(true, [:nodedown_reason])

I’m actually dealing with those scenario where a node “might go down” or simply not be able to communicates with the others.

From what I understand net_kernel.monitor_nodes is not reliable because this isn’t really enough to know the state of a cluster. From any nodes view it’s just not possible to tell if another node has left for good, been autoscaled away, or been disconnected because it couldn’t keep up with health checks.

The question is at what point, should it be fine to untrack those counters because we’re " almost sure" the node did go down ?

I’m just scared I might untrack a bunch of counters from the tracker while the node was not really down.

Cheers

You are correct. monitor_nodes is not going to tell you if a node left, crashed, or if you just can’t talk to it temporarily. That’s an intrinsic problem of distributed systems and you need timeouts or group membership protocols, for example: do other nodes also see that node A is down? If only you think it is down, then it can still be up, etc.

But depending on the problem, you can just assume the node is gone and, when it is back up, you ask its latest copy of the data again. If you keep a buffer, you just need the diff (for example, you had revision 34 and the node has revision 42, you ask the changes from 35 to 42).

1 Like

Hey,

Thanks for getting back to me.

For the context, I’ve implemented your second proposal using Phoenix Tracker where each local counter is tracked.

The topic of the tracker is the node name concat to the local counter index:

"#{Node.self()}_local_counter_#{index_counter}",

The key room is the topic of the room

Phoenix.Tracker.update(
        Messages.Trackers.ChannelsTracker,
        self(),
        "#{Node.self()}_local_counter_#{index_counter}",
        room,
        %{counter: count}
      )

Tracker diff is like

diff #=> %{
  "test-cluster3@127.0.0.1_local_counter_2" => {[
     {"room_0",
      %{
        counter: 1,
      }}
   ], []}
}

I then store the added up counts for each room in each node in each local_counters in a :ets table

This is just for the context.

Please bear with me, and sorry in advance, if my questions sound stupid :slight_smile:

To me, I see essentially 2 scenarios.

  1. In a cluster, you could have during a brief moment, 2 nodes or more that cannot talk to each other, before the cluster is able to heal. In that scenario no node is really going down.

  2. In a cluster, you could have a node which at some point goes down for good and gets replaced by a new node in the cluster.

I assume there’s 2 different things to accomplish in either of those scenario.

1) Scenario 1:

During the “netsplit”, both nodes are going to do their own thing as usual, and there’s going to be changes in local counters of both nodes, that Phoenix Tracker could not have replicated across the cluster.

If I understand correctly, they might not be able to talk to each other during that time, but they might still be reachable by clients, and so we’re going to have discrepancies in each node.

Therefore when they are back up, meaning, when they are able to talk to each other again, I assume you’ll end up, with correct values in the local counters owned by each node, but not the ones belonging to the other node, right ?

For example let’s say you have 2 nodes "A"and “B”.
The counts stored in tracker for Node A could be different in Node B

Node “A” could read

"Node_A_local_counter_1" => {[
     {"room_0",
      %{
        counter: 10,
      }}

While Node B:

"Node_A_local_counter_1" => {[
     {"room_0",
      %{
        counter: 3,
      }}

Within that split, we got 7 more new users joining Node A in room_0 that could not be replicated to Node B, and then we get inconsistent values.

you ask its latest copy of the data again

How would you get local counters values from the other nodes ?

Does it mean doing like a RPC call to get the values stored in tracker of the other nodes ??

1) Scenario 2:

In this scenario a node is going down for good and is not coming back up, it gets “replaced” or not, by a new node in the cluster.

In this scenario, local counters of the node who died must be untracked.

You need timeouts or group membership protocols, for example: do other nodes also see that node A is down

My initial idea is to record the time, every time we do Tracker.update in local counters.
That way we can see the last time a node was broadcast.

When a node goes down I can compare the dates, see the oldest to find out about the dead node and after a X seconds, if that date stays unchanged, for that node I was suspicious about, I can start untracking the counters that node holds…

Thanks