I’d recommend checking out the erlang :pg module, which is what Phoenix PubSub uses under the hood in one of it’s adapters.
This module implements distributed process groups. A process can subscribe to a group(which could be your topic, following the phoenix pubsub analogy), and you can fetch all the members of a group and send them a message to do a broadcast. The groups list is managed by :pg itself, so you don’t have to worry about adding them and doing cleanup.
A minimal implementation of your use case would be:
defmodule PubSub do
@type channel :: any
@type client :: pid
@spec send_msg(channel, any) :: :ok
def send_msg(channel, msg) do
for client <- :pg.get_members(channel) do
send(client, msg)
end
:ok
end
@spec add_client(channel, client) :: :ok
def add_client(channel, client) do
:pg.join(channel, pid)
end
@spec del_client(channel, client) :: :ok
def del_client(channel, client) do
:pg.leave(channel, client)
end
end
Note that add_channel and del_channel are not needed, as that’s handled automatically by :pg.
Distribution is handled automatically by Elixir, since you can send messages between nodes and it’s managed by erlang’s distribution.
If what you want is to implement something like that from scratch, then you’d need to implement the channels bookkeeping and whatever kind of consistency guarantees you need, as the rest is just regular message passing. I have never done this myself so I don’t have any code snippets to share.
If someone more versed in Erlang/OTP than me could jump in, where exactly in otp/pg.erl at master · erlang/otp · GitHub (which line) is the ‘HashMap<Channel, HashSet>’ stored ?
Since pg is expected to be used for highly distributed use-cases, this map is added per “scope process”, implementing overlay networks (see some older documentation here: spg/README.md at master · max-au/spg · GitHub )