Implementing a distributed users counter

Hey everyone!

I’m struggling to get Phoenix.Presence to scale up sufficiently for our needs. I feel like Presence might be overkill in this situation, but I’m not 100% sure what the alternatives would be.

Our Issue

The Presence.track is timing out around 80-100k connections. Our load tests are joining and leaving LOTS of times every second. That creates lots of traffic for the presence diff. Eventually, the whole cluster just stops tracking until it has time to catch up.

Some Things We Have Tried

  • Reimplemented Phoenix.Tracker and took out the broadcast on join/leave
  • Implemented a dirty count that reaches into Phoenix.Tracker.Shard.dirty_list and then does not group them, but just counts them, to reduce cycles during the count

We’re basically just doing presence so that we can get a valid count of the connected users. What are some alternatives to phoenix presence when you only need to track a count?

4 Likes

If all you want is the connected users, the presense is definitely overkill, because you are sharing each connected user and their metadata across nodes when all you need is a counter.

A simpler solution is to share only the counter. Here is a high-level outline, broken in two steps of one possible approach. I will be glad to clarify any possible points and answers questions.

Step 1: basic setup

Every time a user connects, you send a message to a process with your PID. We will call the receiveing process the “LocalCounter”. The LocalCounter will bump its internal counter when it receives said message and monitor the PID. Once it receives the DOWN message, it decreases the counter.

You will also have a separate process, which is the GlobalCounter. The GlobalCounter will receive updates from other processes in the cluster. You will:

  1. Every X seconds, you will query the local counter and broadcast a message on a “global_counter” topic with your node name and your local counter

  2. Other nodes will receive your message and they should store: the time they received the message, the node name, and the counter

  3. The total counter is the sum of local counter with all other global counters

  4. After you broadcast, you should prune any dead node. You can consider a dead node to be any node that you haven’t received a broadcast from after N*X seconds. Alternatively, you can use :erlang.monitor_nodes() to see when nodes go up and down so you can remove those entries immediately

The choice of X is important. X means how frequently you will broadcast, too small means a lot of traffic but always up to date. For example, if X is 5 seconds, it means that you will stay behind from other nodes at most 5 seconds. X is also the maximum time it takes for a new node to receive all updates when it goes up.

Step 2: optimizing

The implementation above has one issue: the LocalCounter will likely become a bottleneck. We can address this by using the :counters module in Erlang and changing it to be a pool of processes. Here is what we will do:

  1. Instead of a single local counter, we will start N local counters. We will also create a use the :counters API to create a counter array of N entries. Each local counter will have an index inside the counter array and update said index.

  2. Now, when you need to track a given PID, you should do :erlang.phash2(pid, N) to select one of the existing local counters. You can use a Registry to track the local counters.

  3. Change the global counter to, instead of asking the local counter its current count, to traverse all indexes in the :counters reference, adding them all. That’s what you will broadcast now.

In pseudo-code, your CounterSupervisor’s init will look like this:

@count 8

def init(_) do
  counter = :counters.new(@count, [:write_concurrency])

  children = [
    {Registry, name: CounterRegistry, kind: :unique},
    {GlobalCounter, counter}
  ] ++
    Enum.map(1..count, fn index ->
      Supervisor.child_spec({LocalRegistry, counter: counter, index: index}, id: index)
    end)

The LocalCounter should register itself under CounterRegistry with the index.

When dispatching to a local counter, you will roughly do this:

def count_me do
  # phash is zero-based
  index = :erlang.phash2(self(), @count) + 1
  name = {:via, Registry, {CounterRegistry, index}}
  GenServer.cast(name, {:count, self()})
23 Likes

Btw, something else to consider is sharding your presence usage, but as I mentioned above, I think presence is the wrong solution to the problem.

2 Likes

Thanks Jose! Your response was very insightful.

We did encounter one hiccup: being able to have separate counts across different topics. The issue is when you receive a :DOWN event from a monitored pid, you don’t know which count to decrement. To solve this, we’ve decided to keep a basic key/value pair of pid to topic record in an ets table. Once you receive a :DOWN event, we’ll check which topic that pid belongs to, and then remove it from the ets table.

I’ll report back with some more code, or perhaps a gist of code for the benefit of future readers.

4 Likes

Yes, if you need to track topics, then instead of using :counters, you can use an ets table and use concurrent and atomic writes to update the topics. And then broadcast all of ets every X seconds.

It is worth adding that the topics approach has another issue: if you have a topic that no user is leaving and joining, i.e. it is constant, then you will broadcast the topic every X seconds, even though the data is the same. So if you have a long tail of topics, it means your payload may be really large, while the tracker version would be more optimized.

Therefore, there is another alternative here, which is to continue using Phoenix.Tracker, but you will track the local counter instead of each individual process. In a nutshell:

  1. Create N local counter processes. It has two keys in its state, the overall state and the diff

  2. Every time a local counter process receives a pid-topic pair to track or a DOWN message, you put that in the diff. If topic “foobar” receives two joins and one leave, the diff will be %{"foobar" => 1}

  3. Every X seconds, you will merge the diff into the overall state. If a new topic was added, you start tracking it from that local counter process. If a topic is removed, you start untracking. If the topic counter was updated, you update its tracking. The current counter will be the metadata.

This means Phoenix.Tracker becomes your “GlobalCounter”, which is quite efficient and optimized. In pseudo-code your counter supervisor would look like:

@count 8

def init(_) do
  children = [
    {Registry, name: CounterRegistry, kind: :unique},
    {Phoenix.Tracker, ...}
  ] ++
    Enum.map(1..count, fn index ->
      Supervisor.child_spec({LocalRegistry, counter: counter, index: index}, id: index)
    end)

The only last thing to consider is how you are going to hash the topic-pid pairs across local counters. You can:

  1. hash by pid, this means that if you have N local counter processes, they can all end-up tracking the same topic. This may increase the payload as the counters are split across local processes

  2. hash by topic, this decreases payload but it may increase contention if there is a topic with dozens of thousands of users

Finally, to get the number of users in a topic, you need to query the given topic and sum all of the members count metadata. If you are hashing by topic, the number of entries will be the number of nodes. If you are hashing by pid, it will be at most the number of nodes * N. In both cases, it should be fast enough to compute on demand. If it isn’t, you can use handle_diff to store the overall results directly on ETS.

5 Likes

We were able to implement our own incantation of this and use it live with over 120k users live. We peaked at around 15% cpu when large batches of users were joining, and leveled out at around ~7% cpu.

I’ve prepared a gist of the main parts of the code for the benefit of future readers.

I still think there are improvements to be made here. One downside is if the node that has the global gen_server running crashes for some reason, we won’t start up a new one.

If anyone ends up using this code and making improvements, please report back!

13 Likes