Design a chat rooms systems with size limits that automatically switch users over when room is getting too big or too small

Hello,

I need some inputs to conceive a system of multiple chat rooms with
min size = 15 and max = 100 users per room where users would be automatically moved to a different channels when those criteria are not met.

The scenario is that you would have users initially joining a channel “chat:room:lobby” and based on the number of users already there they would be switched over to:

  • A new channel if the lobby has gotten too big and there’s no other channels
  • Another channel that has not reached the limit

We also need to cope for channels that are getting below the limit

Obviously the goal is to always maintains a list of channels with homogeneous size (max/2) more or less.

Note that this would be used by hundreds of thousands of users, if not millions.

To achieve that first I though about storing the users joining and leaving channels in a cache using Phoenix.Tracker

active_users_tracker.ex

def handle_diff(diff, state) do
    for {topic, {joins, leaves}} <- diff do
      for {key, meta} <- joins do
       // cache update
        update_total_players_per_channel(topic, "join")
      end
      for {key, meta} <- leaves do
        update_total_players_per_channel(topic, "leave")
      end
    end
    {:ok, state}
  end

Then I need a process that periodically goes over the list of channels and their respective number of users. From those numbers I would broadcast to users on some of those channels to join other channels when:

  • The channel they are in is getting below the min size
  • The channel they are in is getting to big
  1. I would first get the list from the cache cachex.get(“rooms:list”, “rooms”)

Example:

[
%{ "chat:room:1", 4 },
%{ "chat:room:2", 6 },
%{ "chat:room:3", 7 },
%{ "chat:room:4", 10 },
%{ "chat:room:5", 40 },
%{ "chat:room:6", 70 }
...
]
  1. I sort the list of channels “chat:room:<> roomid” by count asc

  2. Then I need to get the channels below the min limit and regroup them with other channels.

Starting from the beginning of the array, I would pick up any channels whose count is less that the min limit and accumulatively add them up until I reach a number in the range of max/2.

That way I would gather group of “origins” channels to form a new channel.

I could do the same starting from the end of the list, to gather group of users in channels above the limit.

At the end, I would get list indicating users who need to join other rooms
Example:

%{ join: [ %{ from: ["chat:room:1", "chat:room:2",...], dest: "chat:room:x" } ] }
  1. Terminate channel whose count = 0 and remove from the cache

  2. Broadcast message to each topic in the “from” array, so that users joins the “dest” topic

broadcast("chat:room:x", "join", %{ room: "chat:room:x" })

Any thoughts about that ? :slight_smile:

Cheers

2 Likes

:wave:

:ets.update_counter might be useful.

2 Likes

In what way is that better to use that rather than cachex ?

What about the rest of my proposal in terms of scalability ??

And the :atomics package in general.

1 Like

It might not be indeed. We are offering you a few venues through which you can attempt to find your solution.

1 Like

Why would ETS be a better approach to store the count number rather than using a cache like cachex?

Also I’m a beginner in that field and I don’t have the slightest idea of what :atomics is and how it could help me.
I was expecting a bit more than people throwing at me random words ^^.

Why :atomics is going to help ?

TBH, I was first interested in knowing if I was going generally into the right direction

Then about the details of the implementation which is how to store the user counts,
One says :ETS, you say :ATOMICS, well may be you could tell me why I should use one or the other ?

Thanks

In this case: because cachex is specialized in storing data structures, not separate counters. So if you use ETS’ counters functionality you are (1) drinking from the source and (2) getting a feature that cachex does not (seem to) have.

2 Likes

Balancing cannot really be done distributed, given you need to work based on a single view of the world. So I’d start by asking if you need to share those counters at all. If you don’t then neither :ets nor :atomics will give you any benefit over simply storing the data locally in the process doing the balancing.

Once presence tracking becomes a bottleneck you can look at sharing the data between multiple processes (e.g. one presence tracker per channel + one balancer). At that point atomics/counters is useful because they’re very performant options for sharing counters, but only counters. If you need to relate metadata then :ets.update_counter might be simpler to do, even if less performant. What you want to avoid here is that individual processes updating counters would need to coordinate with peers. Serializing counter updates will likely kill any performance benefit over the above single process option.

Generally I’d suggest building out the single process option first. Maybe write the code so balancing and counter updating can easily be separated and only move to the more complex options once things become actually slow. You’ll be in a much better position to make tradeoffs at that point in them rather than working with hypothesis today.

2 Likes

No those counters are not for sharing, they are only used to balance the number of users among the channels.

Generally I’d suggest building out the single process option first

Well there’s basically 2 processes
1) One that increment/decrement counters (seen above in Phoenix Tracker) and stores in cache (Caching)
2) A server that periodically sort the counters, and broadcast the channel switch (Balancer)

If i understood you correctly, you’re suggesting for the moment to get rid of the Caching , and do everything in the balancer !?

That means I have to directly refetch the channels counts , before being able to proceed to the sorting, broadcast and so on…

i.e

channels_list :> UsersTrack.list() 

Or I can do a cast(:increment) from the UserTracker module to update the count in the Balancer to avoid refetching all the counts.

At the end of the days the count do not have to be exact, they need to be more or less exact.

only move to the more complex options once things become actually slow

If phoenix.tracker becomes slow, :atomics looks good but it only allows to store number…
and the update is done using the index of the count in the array…

-spec put(Ref, Ix, Value) -> ok when Ref :: [counters_ref]
Write `Value` to counter at index `Ix`.

So I would still have to store the list of channels and their respective index somewhere !?

First of all I’d suggest dropping the idea of caching. You’re not caching anything. You’re calculating and maintaining a derived value based off of the diffs shared by the tracker.

The second point is that this derived state would need to live in a place where it’s value is shared across many processes. Starting with a single process to hold onto that state is simpler.

To start with that single process could even be the tracker process, but generally you want to keep that process light on processing. You can send/forward the diffs to the balancer process for it to deal with.

That balancer process can just use the state is maintains whenever a tick for rebalancing comes around. All the necessarily information will live in its internal state.

Either run stress tests on this system or wait for you actually having scale before trying to determine where this system fails and even more what could be potential solutions to it.

4 Likes

ok got it thanks

1 Like

Sorry, I should’ve provided more details :slight_smile:

I was thinking about something like this when I gave that answer but was a bit lazy to put it in writing:

defmodule CrowdControl do
  @moduledoc """
  CrowdControl is like a Phoenix.PubSub with a gatekeeper.
  """

  @default_counter_table __MODULE__.Counter
  @default_pubsub __MODULE__.PubSub

  def child_spec(opts) do
    CrowdControl.Supervisor.child_spec(opts)
  end

  def find_or_start_room do
    find_room() || start_room()
  end

  def find_room(table \\ @default_counter_table) do
    # matches rooms with count between 15 and 100
    # :ets.fun2ms(fn {room, count} when count > 15 and count < 100 -> room end)
    ms = [{{:"$1", :"$2"}, [{:andalso, {:>, :"$2", 15}, {:<, :"$2", 100}}], [:"$1"]}]

    case :ets.select(table, ms) do
      [] ->
        # tries to get any non-empty room
        # :ets.fun2ms(fn {room, count} when count > 0 -> room end)
        case :ets.select(table, [{{:"$1", :"$2"}, [{:>, :"$2", 0}], [:"$1"]}]) do
          [] -> nil
          rooms -> Enum.random(rooms) # can be something else, random is just an example, maybe :ets.select(table, ms, _limit = 1) can be used to get just one room
        end

      rooms ->
        Enum.random(rooms) # same as above
    end
  end

  def start_room do
    "room:#{:rand.uniform(100_000)}"
  end

  @doc """
  Attempts to join a room. On success, subcribes the process to the room.

  Example usage:

      # in a Phoenix channel, right after joining the lobby
      def join_room do
        room = find_or_start_room()

        case attempt_join(room) do
          :ok ->  room
          :we_are_full -> join_room()
        end
      end

      # users send a message, it gets re-broadcasted to the room (fastlaned)
      def handle_event("message", %{"body" => _body} = params, socket) do
        room = socket.assigns.room
        broadcast = %Phoenix.Socket.Broadcast{topic: room, event: "message", payload: Jason.encode_to_iodata(Map.put(params, "room", room))} # putting room info so that client can ignore stray messages which can come in while the client is moving rooms
        Phoenix.PubSub.broadcast_from!(CrowdControl.Pubsub, room, broadcast)
        {:ok, socket}
      end

  """
  def attempt_join(table \\ @default_counter_table, pubsub \\ @default_pubsub, room) do
    count = :ets.update_counter(table, room, {2, 1, 100, 100}, {room, 0})

    if count < 100 do
      :ok = Phoenix.PubSub.subscribe(pubsub, room)
    else
      :we_are_full
    end
  end

  @doc """
  Explicitely leaves a room. If the room counter drops to 15 people, broadcasts a message asking everyone else to move.

  Example usage:

      # in a Phoenix channel
      def handle_info({CrowdControl, :please_move}, socket) do
        prev_room = socket.assigns.room
        new_room = join_room()

        unless new_room == prev_room do
          CrowdControl.leave(prev_room)
          push(socket, "new_room", new_room)
        end

        {:noreply, socket}
      end

  """
  def leave(pubsub \\ @default_pubsub, room) do
    # PubSub.unsubscribe/2 -> Registry.unregister/2 -> Registy notifies :listeners -> Counter.handle_info/2
    :ok = Phoenix.PubSub.unsubscribe(pubsub, room)
  end
end

defmodule CrowdControl.Supervisor do
  @moduledoc false
  use Supervisor

  def start_link(opts) do
    name = Keyword.get(opts, :name, CrowdControl)
    Supervisor.start_link(__MODULE__, opts, name: Module.concat(name, "Supervisor"))
  end

  @impl true
  def init(opts) do
    name = Keyword.get(opts, :name, CrowdControl)

    counter_name = Module.concat(name, "Counter")
    counter_clean_period = Keyword.get(opts, :clean_period, :timer.seconds(60))

    pubsub_name = Module.concat(name, "PubSub")
    pubsub_adapter_name = Module.concat(pubsub_name, "Adapter")

    pubsub_partitions =
      opts[:pool_size] ||
        System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc()

    # custom pubsub to monitor those who joined rooms and decrement counters when they exit
    # adapted from https://github.com/phoenixframework/phoenix_pubsub/blob/v2.1.3/lib/phoenix/pubsub/supervisor.ex
   # alternative is to monitor in the counter, which is probably simpler, but I wanted to explore this PubSub approach
    registry = [
      meta: [pubsub: {Phoenix.PubSub.PG2, pubsub_adapter_name}],
      partitions: pubsub_partitions,
      keys: :duplicate,
      name: pubsub_name,
      # the important bit, this let's us decrement room counter when the user disconnects
      listeners: [counter_name]
    ]

    children = [
      {Registry, registry},
      {Phoenix.PubSub.PG2, Keyword.put(opts, :adapter_name, pubsub_adapter_name)},
      {CrowdControl.Counter,
       name: counter_name, table: counter_name, clean_period: counter_clean_period}
    ]

    Supervisor.init(children, strategy: :rest_for_one)
  end
end

defmodule CrowdControl.Cleaner do
  @moduledoc false
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: Keyword.fetch!(opts, :name))
  end

  @impl true
  def init(opts) do
    clean_period = Keyword.fetch!(opts, :clean_period)
    table = Keyword.fetch!(opts, :table)

    :ets.new(table, [
      :named_table,
      :set, # can be ordered set if find_room should prefer fuller / emptier rooms
      :public,
      {:read_concurrency, true},
      {:write_concurrency, true},
      {:decentralized_counters, true}
    ])

    schedule(clean_period)
    {:ok, %{table: table, clean_period: clean_period}}
  end

  @impl true
  def handle_info({:register, _pubsub, _room, _partition, _meta}, state) do
    {:noreply, state}
  end

  def handle_info({:unregister, pubsub, room, _partition}, state) do
    count = :ets.update_counter(state.table, room, {2, -1, 0, 0})

    if count == 15 do # this part can be improved with a MapSet of "dying" rooms to support `if count < 15 do`
      Phoenix.PubSub.broadcast!(pubsub, room, {CrowdControl, :please_move})
    end

    {:noreply, state}
  end

  def handle_info(:clean, state) do
    clean(state.table)
    schedule(state.clean_period)
    {:noreply, state}
  end

  defp schedule(clean_period) do
    Process.send_after(self(), :clean, clean_period)
  end

  # cleans abandoned rooms
  defp clean(table) do
    # :ets.fun2ms(fn {room, count} when count == 0 -> room end)
    ms = [{{:"$1", :"$2"}, [{:==, :"$2", 0}], [:"$1"]}]
    :ets.select_delete(table, ms)
  end
end

The problem with atomics per resource is that their mapping still needs to be stored somewhere, and if it’s stored in ets, using atomics become slower than :ets.update_counter: GitHub - ruslandoga/rate_limit

3 Likes

The “API” I envisioned was

  1. User connects (e.g. joins a user:<id> channel)
  2. User tries to join a room, not a Phoenix Channel room, but a CrowdControl room
  3. on success, user’s channel process is now subscribed to PubSub and can receive messages
  4. communication happens via Phoenix.PubSub.broadcast, but can probably be replaced with just joining a room channel, but that makes moving a bit more complicated – client would need to get involved
  5. once room gets low on members, user’s channel process receives please_move
  6. users attempt to move, but in case there are no other rooms, thye just stay in the same one
  7. room counter increments / decrements can be broadcasted across the cluster which would make them eventually consistent, similar to how rate limiting works in Hexpm
  8. there is a cleanup process that deletes empty rooms from ets
2 Likes

Thanks for you inputs

  1. User tries to join a room, not a Phoenix Channel room, but a CrowdControl room

Not sure what you mean by “CrowdControl room”.

If it’s not a channel then what it is ?

Because the user still has to join a channel to know where to go right (obviously preferably one of where is there’s room to fill) ?

What I envisioned is to have, as you mentioned, users joining a channel user:id first, and from there getting the room with lowest count (findRoom()) and subscribe to it.

defmodule UsersChannel do
  use Phoenix.Channel
  alias UserSocket

  def join("user:" <> user_id, _message, socket) do

    # find room with slots available and subscribe to it
    send(self(), :after_join)

    {:ok, socket}
  end

  def handle_info(:after_join, socket) do
    userPK = socket.assigns[:userPK]
   
   roomTopic = find_room()
   Phoenix.PubSub.subscribe(
             socket.pubsub_server,
             roomTopic,
           )
  end
  1. communication happens via Phoenix.PubSub.broadcast, but can probably be replaced with just joining a room channel, but that makes moving a bit more complicated – client would need to get involved

Not sure to understand what you mean by

replaced with just joining a room channel

If you’re saying having one global room to receive the broadcast, i don’t think it would scale very well. I was thinking that managing more rooms of small size is better in terms of scaling !?

My idea is that the balancer server running every x seconds would run through the list of channels and equilibrate it.

From the results of that, I would broadcast() to each “room:channel:x” where users have to switch.

In the above user channel, I could then have

# handle switch
def handle_info({:switch_channel, swtich_channel: channel, old:old_channel}, socket) do
    userPK = socket.assigns[:userPK]
   
     Phoenix.PubSub.unsubscribe(
             socket.pubsub_server,
             old_channel,
    )
   Phoenix.PubSub.subscribe(
             socket.pubsub_server,
             channel,
           )
  end
  1. room counter increments / decrements can be broadcasted across the cluster which would make them eventually consistent

Since my api will run in a cluster of nodes, I guess I can do
local broadcast right ?

The problem with atomics per resource is that their mapping still needs to be stored somewhere, and if it’s stored in ets, using atomics become slower than :ets.update_counter:

I had thought about storing the counts using :atomics and the list of channels in :ets
but may be it would become way too complicated, and there’s really nothing to gain ?

If it’s not a channel then what it is ?

It’s a topic in a custom CrowdControl Phoenix.PubSub. It’s custom only in a sense that it decrements a subscription counter for a topic (room) when people disconnect. In your switch_channel example you seem to be doing a similar thing with subscribing and unsubscribing to topics, except you call topics channels. Note that channels are a topic subscription + a process managing messages to it. So what you and I are both doing is just adding and removing subscriptions in a PubSub, and Phoenix Channels are more than that.

Because the user still has to join a channel to know where to go right (obviously preferably one of where is there’s room to fill) ?

No, it can be just a subscription to a topic, but channels might be more idiomatic.

What I envisioned is to have, as you mentioned, users joining a channel user:id first, and from there getting the room with lowest count (findRoom() ) and subscribe to it.

We have the same thing.

If you’re saying having one global room to receive the broadcast, i don’t think it would scale very well. I was thinking that managing more rooms of small size is better in terms of scaling !?

That’s not what I was saying :slight_smile: The broadcasts from a room would go only to the selected topic, aka “CrowdControl” room (a topic and a counter which tracks how many people are subscribed).

My idea is that the balancer server running every x seconds would run through the list of channels and equilibrate it.

That might make people move across rooms unnecessarily. Unless I misunderstand the meaning behind equilibrate. I was under the assumption that if room has between 15 and 100 people, they all stay there. And only once the room falls to 15 users do they move out. And if rooms reaches 100, it simply doesn’t accept anyone anymore until someone leaves or disconnects.

Since my api will run in a cluster of nodes, I guess I can do
local broadcast right ?

Yes, but note that local broadcast in Phoenix PubSub terminology means broadcast within a single node. What you probably mean is a normal broadcast.

I had thought about storing the counts using :atomics and the list of channels in :ets
but may be it would become way too complicated, and there’s really nothing to gain ?

It’s just slower and more complicated since you need to write some things from scratch. Or at least it was at the time of GitHub - ruslandoga/rate_limit In that benchmark rate_limiter and atomic3 are using atomics and counter and plug_attack are using :ets.update_counter.

2 Likes

Oh, sorry I just realised in your previous post, I had not seen the integrality of your code (did not see there was more to scroll) :man_facepalming:
It’s clearer now :slight_smile:

So you actually set up a registry which gets triggered when subscribe/unsubscribe occurs.
The unregister func takes care of moving users around when it reaches the minimum and you have also periodic cleaning of the empty room from the :ets table.

That’s quite smart! I’m not really familiar with the registry TBH.
It’s really cool to see how things can be done in a different way.

As stated in my early post, I just thought of Phoenix tracker to be notified for any join/leave.
I guess your solution is more performant/elegant !?

I have a couple of question regarding your code though:

  1. Join Room
def join_room do
  room = find_or_start_room()
  case attempt_join(room) do

Despite we have already found a room with slots available in find_or_start_room(), we increment the count in attempt_join() and check the count again.

Is that because in the meantime, some other user could have already joined ?
attempt_join() kinds of double check that there’s still room available ?

My first intuition would have been to increment the count directly in findRoom() since the query already returns room within the right range < 100.

  count = :ets.update_counter(table, room, {2, 1, 100, 100}, {room, 0})

    if count < 100 do
      :ok = Phoenix.PubSub.subscribe(pubsub, room)
  1. Broadcast vs BroadcastFrom
        broadcast = %Phoenix.Socket.Broadcast{topic: room, event: "message", payload: Jason.encode_to_iodata(Map.put(params, "room", room))} # putting room info so that client can ignore stray messages which can come in while the client is moving rooms
        Phoenix.PubSub.broadcast_from!(Messages.RoomControl.Pubsub, room, broadcast)
        {:ok, socket}
      end

The documentation says:

broadcast_from(from, topic, event, msg)
#Broadcasts a msg from the given from as event in the given topic to all nodes.

What’s the difference between your code and doing a normal broadcast ?

Phoenix.Socket.Broadcast{topic: room, event: "message", payload: Jason.encode_to_iodata(Map.put(params, "room", room))}
  1. Registry
children = [
      {CrowdControl.Counter,
       name: counter_name, table: counter_name, clean_period: counter_clean_period}
    ]

the child process CrowdControl.Counter should be CrowdControl.Cleaner shouldn’t it?

Yes, but note that local broadcast in Phoenix PubSub terminology means broadcast within a single node. What you probably mean is a normal broadcast.

Well, I think that’s what I meant.

If I deploy may backend api on multiple nodes, the cleaner for example is going to run on each of those 3 nodes, so I assumed that it’s only necessary to broadcast on the node where the Cleaner is running for example !?

Thanks for your time and your patience. Still learning :slight_smile:

I guess your solution is more performant/elegant !?

I think it should be reasonably performant (since it’s basically Phoenix PubSub with :ets.update_counter in front; find_room is probably slow though) but it’s not as elegant as I had thought when I had that idea. I think having a single process (possibly partitioned as well, e.g. with PartitionSupervisor — Elixir v1.17.3) would be easier to reason about and more customizable than a “patched” Phoenix PubSub. I’ll update my snippet tomorrow.

As for Phoenix.Tracker, I’ve never used it, but from just the fact that it does more than just incrementing and decrementing numbers – it does full CRTD! – it would be less performant. But I guess it’s better to benchmark both approaches to know for sure.

Is that because in the meantime, some other user could have already joined ?

Yes.

Incrementing it in findRoom would also work, I think. The important bit is to do the “joining” only after :ets.update_counter returns a suitable count (less than 100). It acts as the synchronization point.

  1. Broadcast vs BroadcastFrom

I think it mostly depends on how the client handles the messages that it sends. Here I assumed that the client doesn’t need to receive their own messages and can render them in channel.push(message).receive(“ok”, () => { /* here */ }).

the child process CrowdControl.Counter should be CrowdControl.Cleaner shouldn’t it?

Yes, or rather CrowdControl.Cleaner module should be renamed to CrowdControl.Counter. I changed it’s name multiple times while adding and removing logic from it …

to broadcast on the node where the Cleaner is running for example !?

Unless there is extra infrastructure in place like a load balancer that does sticky sessions, then I think it would be safer to broadcast to all nodes, and I thought that counters would be running on all nodes too, just like Phoenix.PubSub does.

  • node 1 gets a user in room:1, broadcasts to other nodes to increment their counter for room:1
  • node 2 gets a user in room:1, broadcasts to other nodes to increment their counter for room:1
  • a user connected to node 1 leaves room:1 (or disconnects from node 1), node 1 broadcasts to other nodes to decrement their counter for room:1
  • etc.

So you get eventually consistent counters, meaning there could be cases with rooms having 100 + n people, where n is the number of nodes, if they all joined at once, when their local count were 99; if “clamping” is removed from the :ets.update_counter call (the third and fourth elements in {2, 1, 100, 100}), then these rooms could be detected and extra people moved out from them. But I would probably just leave them be, small overflows like that are probably unnoticeable.

1 Like

Thanks a lot !!!

I’ve tested your proposal.

One thing that is missing though is detecting when an user leaves a channel due to connection issue or something else…
In that case, the counter does not get decremented.
I was expecting the listener in the registry to catch it (:unregister) but it does not.

I think that’s why I had initially chosen PhoenixTracker…

Hm… Maybe I did something wrong there, I didn’t actually even run that version.

I did run and test this one though:

defmodule CrowdControl do
  @moduledoc """
  CrowdControl is like a Phoenix.PubSub with a gatekeeper.
  """

  use GenServer

  @default_counter __MODULE__
  @default_pubsub __MODULE__.PubSub

  @type counter :: atom

  @type start_option ::
          GenServer.option()
          # phoenix pubsub to use for message broadcasting
          | {:pubsub, Phoenix.PubSub.t()}
          # how often to clean abandoned rooms
          | {:cleanup_period, timeout}

  @spec start_link([start_option]) :: GenServer.on_start()
  def start_link(opts) do
    name = Keyword.get(opts, :name, @default_counter)
    opts = Keyword.put_new(opts, :name, name)
    opts = Keyword.put_new(opts, :pubsub, @default_pubsub)
    opts = Keyword.put(opts, :table, name)

    {gen_opts, opts} =
      Keyword.split(opts, [:debug, :name, :timeout, :spawn_opts, :hibernate_after])

    GenServer.start_link(__MODULE__, opts, gen_opts)
  end

  @spec find_or_start_room() :: String.t()
  def find_or_start_room do
    find_room() || start_room()
  end

  @spec find_room(counter) :: String.t() | nil
  def find_room(counter \\ @default_counter) do
    # matches rooms with count between 15 and 100
    # generated with `:ets.fun2ms(fn {room, count} when count > 15 and count < 100 -> room end)`
    ms = [{{:"$1", :"$2"}, [{:andalso, {:>, :"$2", 15}, {:<, :"$2", 100}}], [:"$1"]}]
    limit = 5

    case :ets.select(counter, ms, limit) do
      {rooms, _continuation} ->
        Enum.random(rooms)

      :"$end_of_table" ->
        # tries to get any non-empty room (but and also half-full)
        # generated with `:ets.fun2ms(fn {room, count} when count > 0 and count < 50 -> room end)`
        ms = [{{:"$1", :"$2"}, [{:andalso, {:>, :"$2", 0}, {:<, :"$2", 50}}], [:"$1"]}]

        case :ets.select(counter, ms, limit) do
          {rooms, _continuation} -> Enum.random(rooms)
          :"$end_of_table" -> nil
        end
    end
  end

  @spec start_room :: String.t()
  def start_room do
    id =
      Base.hex_encode32(
        <<
          System.system_time(:second)::32,
          :erlang.phash2({node(), self()}, 65536)::16,
          :erlang.unique_integer()::16
        >>,
        case: :lower,
        padding: false
      )

    "room:#{id}"
  end

  @doc """
  Broadcasts a message to a room.

  Example usage:

      # user sends a message, it gets re-broadcasted to the room
      def handle_in("message", message, socket) do
        CrowdControl.fastlane_broadcast_from(socket.assigns.room, "room:message", params)
        {:noreply, socket}
      end

  """
  def fastlane_broadcast_from(pubsub \\ @default_pubsub, room, topic, message) do
    # fastlaning the message (i.e. encoding only once)
    broadcast = %Phoenix.Socket.Broadcast{
      topic: room,
      event: topic,
      payload: :json.encode(message)
    }

    Phoenix.PubSub.broadcast_from!(pubsub, self(), room, broadcast)
  end

  @doc """
  Attempts to join a room. On success, subscribes the process to the room.

  Once the caller process exits (e.g. if the caller process is a Phoenix Channel, it would exit on user disconnect),
  the room counter is automatically decremented.

  Example usage:

      defp find_and_join_room(socket) do
        room = CrowdControl.find_or_start_room()

        case CrowdControl.attempt_join(room) do
          {:ok, join_ref} ->
            push(socket, "room:joined", %{room: room})
            assign(socket, room: room, join_ref: join_ref)

          :we_are_full ->
            find_and_join_room(socket)
        end
      end

      def handle_info(:after_join, socket) do
        {:noreply, find_and_join_room(socket)}
      end

  """
  @spec attempt_join(counter, Phoenix.PubSub.t(), String.t()) :: {:ok, reference} | :we_are_full
  def attempt_join(counter \\ @default_counter, pubsub \\ @default_pubsub, room) do
    count = :ets.update_counter(counter, room, {2, 1, 100, 100}, {room, 0})

    if count < 100 do
      :ok = Phoenix.PubSub.subscribe(pubsub, room)
      {:ok, monitor(counter, room)}
    else
      :we_are_full
    end
  end

  @doc """
  Explicitly leaves a room.

  If the room is below 15 people, broadcasts a `{CrowdControl, :please_move}` message asking everyone else (in that room) to move.

  Example usage:

      defp leave_room(socket) do
        %{room: room, join_ref: join_ref} = socket.assigns
        CrowdControl.leave_room(room, join_ref)
        assign(socket, room: nil, join_ref: nil)
      end

      def handle_in("leave_room", _params, socket) do
        {:noreply, leave_room(socket)}
      end

      defp try_move_room(socket) do
        %{room: prev_room, join_ref: prev_join_ref} = socket.assigns
        new_room = CrowdControl.find_or_start_room()

        if new_room == prev_room do
          # didn't move to a new room, probably because it's the only non-empty room available right now,
          # and so we stay, we'll try again once someone else leaves
          socket
        else
          case CrowdControl.attempt_join(new_room) do
            {:ok, new_join_ref} ->
              # moved to a new room, note that this leave would trigger another `:please_move` broadcast
              # but that's ok, this way we make sure that everyone eventually moves out
              CrowdControl.leave_room(prev_room, prev_join_ref)
              push(socket, "room:moved", %{room: new_room})
              assign(socket, room: new_room, join_ref: new_join_ref)

            :we_are_full ->
              try_move_room(socket)
          end
        end
      end

      def handle_info({CrowdControl, :please_move}, socket) do
        {:noreply, try_move_room(socket)}
      end

  """
  @spec leave_room(counter, Phoenix.PubSub.t(), String.t(), reference) :: :ok
  def leave_room(counter \\ @default_counter, pubsub \\ @default_pubsub, room, join_ref) do
    demonitor(counter, join_ref)
    :ok = Phoenix.PubSub.unsubscribe(pubsub, room)
    count = :ets.update_counter(counter, room, {2, -1})

    if count < 15 do
      Phoenix.PubSub.broadcast!(pubsub, room, {CrowdControl, :please_move})
    end

    :ok
  end

  @spec monitor(counter, String.t()) :: reference
  defp monitor(counter, room) do
    GenServer.call(counter, {:monitor, self(), room})
  end

  @spec demonitor(counter, reference) :: :ok
  defp demonitor(counter, ref) do
    GenServer.cast(counter, {:demonitor, ref})
  end

  @impl true
  def init(opts) do
    clean_period = Keyword.fetch!(opts, :clean_period)
    table = Keyword.fetch!(opts, :table)
    pubsub = Keyword.fetch!(opts, :pubsub)

    :ets.new(table, [
      :named_table,
      # NOTE: using :ordered_set might (?) improve find_room performance
      :set,
      :public,
      {:read_concurrency, true},
      {:write_concurrency, true},
      {:decentralized_counters, true}
    ])

    schedule(clean_period)
    {:ok, %{table: table, pubsub: pubsub, clean_period: clean_period}}
  end

  @impl true
  def handle_call({:monitor, pid, room}, _from, state) do
    {:reply, Process.monitor(pid, tag: {:DOWN, room}), state}
  end

  @impl true
  def handle_cast({:demonitor, ref}, state) do
    Process.demonitor(ref, [:flush])
    {:noreply, state}
  end

  @impl true
  def handle_info(:clean, state) do
    clean(state.table)
    schedule(state.clean_period)
    {:noreply, state}
  end

  def handle_info({{:DOWN, room}, ref, :process, _pid, _reason}, state) do
    %{table: table, pubsub: pubsub} = state

    Process.demonitor(ref, [:flush])
    count = :ets.update_counter(table, room, {2, -1})

    if count < 15 do
      Phoenix.PubSub.broadcast!(pubsub, room, {CrowdControl, :please_move})
    end

    {:noreply, state}
  end

  defp schedule(clean_period) do
    Process.send_after(self(), :clean, clean_period)
  end

  # cleans abandoned rooms
  defp clean(table) do
    # :ets.fun2ms(fn {room, count} when count <= 0 -> room end)
    ms = [{{:"$1", :"$2"}, [{:<=, :"$2", 0}], [:"$1"]}]
    :ets.select_delete(table, ms)
  end
end