Design / implementation of a minimal distributed pub sub

  1. I am looking for something more useful than “just use Phoenix.PubSub”

  2. I understand there are various security issues. For our definition of ‘minimal’, let us consider the following operations:

control: HashMap<Channel, HashSet<Client>>

fn send_msg(c: Channel, msg: Msg) {
  for client in control.get(c) {
    client.send(msg);
  }
}

fn add_channel(c: Channel) {
  control.insert(c);
}

fn del_channel(c: Channel) {
  send_msg(c, "exit");
  control.delete(c);
}

fn add_client(c: Channel, client: Client) {
  control[c].insert(client);
}

fn del_client(c: Channel, client: Client) {
  control[c].del(client);
}
  1. Now, here is the question. How do we make this distributed ? How do we shard the ‘control’? How do messages get routed to the right place?

I am looking for things on the level of pseudocode/working-code (not informal English descriptions).

Any recommendations for tutorials / books that works through the design / implementation of such a system ?

I’d recommend checking out the erlang :pg module, which is what Phoenix PubSub uses under the hood in one of it’s adapters.
This module implements distributed process groups. A process can subscribe to a group(which could be your topic, following the phoenix pubsub analogy), and you can fetch all the members of a group and send them a message to do a broadcast. The groups list is managed by :pg itself, so you don’t have to worry about adding them and doing cleanup.

A minimal implementation of your use case would be:

defmodule PubSub do

  @type channel :: any
  @type client :: pid

  @spec send_msg(channel, any) :: :ok
  def send_msg(channel, msg) do
    for client <- :pg.get_members(channel) do
      send(client, msg)
    end
    :ok
  end

  @spec add_client(channel, client) :: :ok
  def add_client(channel, client) do
    :pg.join(channel, pid)
  end

  @spec del_client(channel, client) :: :ok
  def del_client(channel, client) do
    :pg.leave(channel, client)
  end
end

Note that add_channel and del_channel are not needed, as that’s handled automatically by :pg.
Distribution is handled automatically by Elixir, since you can send messages between nodes and it’s managed by erlang’s distribution.

If what you want is to implement something like that from scratch, then you’d need to implement the channels bookkeeping and whatever kind of consistency guarantees you need, as the rest is just regular message passing. I have never done this myself so I don’t have any code snippets to share.

2 Likes

For anyone curious: otp/lib/kernel/src/pg.erl at master · erlang/otp · GitHub – this looks really helpful. Thanks!

1 Like

If someone more versed in Erlang/OTP than me could jump in, where exactly in otp/pg.erl at master · erlang/otp · GitHub (which line) is the ‘HashMap<Channel, HashSet>’ stored ?

I guess it is implemented using ETS:

join_local_group(Scope, Group, Pid) when is_pid(Pid) ->
    case ets:lookup(Scope, Group) of
        [{Group, All, Local}] ->
            ets:insert(Scope, {Group, [Pid | All], [Pid | Local]});
        [] ->
            ets:insert(Scope, {Group, [Pid], [Pid]})
1 Like

In pg, ETS is only a cache to circumvent message passing, and speed up get_members while making it really concurrent.

Source of truth - map “Group => [Pid1, Pid2, Pid3, …]” is stored as a part of pg process state: otp/pg.erl at master · erlang/otp · GitHub

Since pg is expected to be used for highly distributed use-cases, this map is added per “scope process”, implementing overlay networks (see some older documentation here: spg/README.md at master · max-au/spg · GitHub )

1 Like