How to wrap Phoenix PubSub?

I am working on a small library that computes a moving average, which would work well with Phoenix PubSub, in that it could subscribe itself to a topic using Phoenix PubSub, and then handle the events as they come in.

However, users should be able to modify the exact logic that is going on inside, including what topics it is subscribed to. This is a bit of a problem though, because a PubSub-broadcasted message does not contain any identifying information (It is just a ‘plain’ message that will trigger your GenServer’s handleinfo.)

So, if I want to give users the ability to subscribe to varying PubSub topics, they need to be able to tap into the handleinfo part of the GenServer that the library constructs, to allow matching on the proper messages.

What would be a proper way to do this?

I remember seeing https://github.com/JEG2/hanabi_umbrella, where I think phoenix pubsub was used to tie different umbrella apps together. You might want to look into hanabi_engine/game_manager.ex.

Maybe you can use pg2 without phoenix pubsub in order to add your custom logic?

Really? I thought a PubSub-broadcasted message was contained in a map/struct that contained all kinds of information such as the event, pubsub topic, data/payload, etc…?

What are you trying to do? Say via pseudo-code or so?

Sure, as long as the messages are sent through Phoenix. :stuck_out_tongue_winking_eye:

Basically Phoenix, not PhoenixPubSub is what adds that information.

That being said, Phoenix is probably the one of the best examples on “How to implement application PubSub messaging.”

1 Like

I don’t have a full picture of your use case form your description, can you describe a little more what you mean by:

Maybe showing a pseudo client/server api that you’re thinking of would help. What I can say is if you need certain info to be a part of the message, you can make your own message contract that includes this info, just like the Phoenix.SocketBroadcast for channels contains extra info, your contract could be

{group, event, term}, for example:

{:stats, :new_average, %{value: 100}}
{:health, :cpu_spike, %{node: node(), load: ...}}

Make sense?

2 Likes

Thank you very much for the wonderful replies thus far.

Let me try to explain in a little more detail what I am trying to build:

The end result of what I want to make, is a library that facilitates the on-line calculation of sums, products and averages that can be used to draw pretty graphs and summaries.

In an application I am making (a kind of game), players can make events happen. These events, are the buying/selling of in-game resources. So each event, or transaction, can be considered a struct like this: %Transaction{resource_id: integer, n_resource: integer, money: integer, created_at: DateTime.t}. For buying, n_resource is positive while money is negative. For selling, the opposite.

Now, I want to keep track of the average buy/selling price of the different items, as well as the amount of items bought/sold in total during this time. Not only the ‘current’ average, but a moving average over the past minute, fifteen minutes, hour, day, week (the different window types).

For each {resource_id, window_type}, I thus want to start a GenServer that receives the desired %Transaction{}s, keeps N queues, where each queue contains all items whose created_at is between the beginning and end of the window.

So for instance, the window size might be 15 minutes, and I might want to keep track of the scores in the last 36 15-minute intervals. Thus, there are 36 queues.
When a new item arrives, it is placed at the end of the newest queue.
Every few seconds, items from the front of the oldest queue are removed until the front item is newer than its cutoff. This is repeated for the one-before-oldest queue, but instead of removing these items, they are placed at the end of the oldest queue. This procedure is then repeated until all queues only again contain items part of the time window with relation to ‘now’.

As items are added and removed to the queues one-by-one, we can calculate sums and products (or any other commutative reversible operation) and averages (one sum divided by another sum) on-line:

def add_transaction_to_calculation(state = %{price: price, n_resource: n_resource}, tx = %Transaction{}) do
  state
  |> Map.put(:money, state.money + tx.money)
  |> Map.put(:n_resources, state.n_resources + tx.n_resources)
  |> recalculate_average()
end

def remove_transaction_from_calculation(state = %{price: price, n_resource: n_resource}, tx = %Transaction{}) do
  state
  |> Map.put(:money, state.money - tx.money)
  |> Map.put(:n_resources, state.n_resources - tx.n_resources)
  |> recalculate_average()
end


def recalculate_average(state) do
  Map.put(state, :average, state.price / state.n_resources)
end

As this is an on-line procedure, at any given time it will probably be very fast to update any of the GenServer’s states.

This seems like the kind of thing that would be very useful in its own library.
However, making sure that it is easy but customizeable to send events (like, in this case, new %Transaction{} structs) to the GenServer is what I am stuck at. I would like to use Phoenix.PubSub, but would not:

  1. Want someone to force to use Phoenix; the library should only depend on the publish/subscribe functionality.
  2. Want the broadcaster to be mindful of implementation details of the receiver. This would be the case if the library would provide its own means or wrapper of the message to be broadcasted to it (or its own alternative to Phoenix.PubSub). I would like the Moving Window GenServers to ‘tap in’ into what is already broadcasted to elsewhere.

1.) What is it exactly that makes you feel like you can’t use Phoenix.PubSub outside of Phoenix?

2.) I don’t understand your point here. You are in control of what you broadcast. If you want want more information in your broadcasted messages then you have to broadcast that information. Phoenix builds a Broadcast struct for every broadcasted message.

1 Like

@Azolo: The first point is a reply to what you yourself have said in reply to @OvermindDL1:

Sure, as long as the messages are sent through Phoenix4. :stuck_out_tongue_winking_eye:
Basically Phoenix, not PhoenixPubSub is what adds that information.

So if you use Phoenix.PubSub directly, you do not get a %Broadcast{}.

The second point: While I am in control of what I broadcast when building this thing straight into my application, I would like to extract it into a separate library.

When it is a library, I am no longer in control of what the library-user broadcasts to the library’s GenServer. I would like to give users of the library the ability to handle arbitrary messages that are sent to the GenServer, but preferably without them needing to wade into the inner defhandleinfo-guts of the GenServer.

So to summarize: I’m not really sure how to, when extracting this functionality into a Library, enable the user to do proper handling of messages received at the Library-spawned GenServer.

I see, I consider that to be a good thing. When I used Phoenix.PubSub I didn’t have a need for the event field. So the Broadcast struct is a Phoenix implementation detail.

Well, you could broadcast a separate message for your tracking. Other than that the scenario that you’re describing sounds like my brother asking me to pick a number and me responding with banana. There’s not much recourse without forcing a contract on data coming out of your library.

1 Like

They are basically no different from normal messages, so that then begs the question: How do you handle arbitrary messages sent to your process (any process, pubsub or not)?

Also I’m curious what you mean by this? An info message is just a non-genserver-wrapped message, very very common in GenServers?

2 Likes

I still don’t have a complete picture, especially around users making their own events, but let’s start wit this code. Here we have a GameStore which is the boundary for the game system. You are correct that you want to wrap the pubsub mechanism so the caller doesn’t need to know about it. Given different actions in the system, like purchasing or transferring an item, you’ll broadcast events as part of the success cases. Meanwhile, you can expose a subscribe/0, subscribe/1 and unsubscribe functions on the GameStore for other callers to consume. If they are interested in stats they can subscribe to the “stats” group. I show the Caller example below the GameStore:

defmodule GameStore do

  def subscribe do
    Phoenix.PubSub.subscribe(GameStore.PubSub, "all")
  end

  def subscribe(group) do
    Phoenix.PubSub.subscribe(GameStore.PubSub, group)
  end

  def transfer_item(%Item{owner_id: owner_id} = item, %Player{} = purchaser) do
    case transfer_ownership(purchaser) do
      {:ok, item, %Transfer{} = tranfer} -> 
        broadcast({:transfers, :completed, transfer})
        {:ok, item, transfer}
      {:error , ...} -> ...
    end
  end

  defp broadcast({group, event, term}) do
    Phoenix.PubSub.broadcast(GameStore.PubSub, group, {group, event, term})
    Phoenix.PubSub.broadcast(GameStore.PubSub, "all", {group, event, term})
  end
end


defmodule SomeServer do

  def init(opts) do
    :ok = GameStore.subscribe(:transfers)
    :ok = GameStore.subscribe(:stats)
    ...
    {:ok, some_state}
  end

  def handle_info({:stats, :moving_avarage, ave}, state) do
    IO.puts "the new moving average of the store is #{inspet ave}"
    {:noreply, state}
  end

  def handle_info({:transfers, :completed, trans}, state) do
    IO.puts "A transfer was completed from" <>
            "#{trans.from_player_id} to #{trans.to_player_id}"
    {:noreply, state}
  end
end

Does this put you on the right track?

7 Likes

Thank you very much for your help, everyone. :heart:

To make it more clear what I am struggling with, I have written it down now as a library: SlidingWindow. (Still very work-in-progress; it does not have a lot of documentation yet as things might still change a lot.)

It does what I want to perform in a finite-state-automaton kind of way; that is, SlidingWindows does not care about GenServers or anything at all right now, but only about gathering and updating the information stored in the struct.

An example of a behaviour to calculate averages: (See also the file test_helper.exs)

defmodule TestSW do
  @behaviour SlidingWindow.Behaviour

  defmodule Transaction do
    defstruct [:value, :created_at]
    def new(value, created_at) do
      %__MODULE__{value: value, created_at: created_at}
    end
  end

  defstruct count: 0, sum: 0, product: 1

  def empty_aggregate() do
    %__MODULE__{}
  end

  def add_item(agg, %Transaction{value: int}) do
    %__MODULE__{agg |
      count: agg.count + 1,
      sum: agg.sum + int,
      product: agg.product * int
    }
  end

  def remove_item(agg, %Transaction{value: int}) do
    %__MODULE__{agg |
      count: agg.count - 1,
      sum: agg.sum - int,
      product: div(agg.product, int)
    }
  end

  def extract_timestamp(%Transaction{created_at: timestamp}) do
    timestamp
  end

end

And then you can create it using something like:

result = 
      SlidingWindow.init(TestSW, 10, Timex.Duration.from_seconds(2), initial_data())
      |> SlidingWindow.shift_stale_items(ten_sec_future)
      |> SlidingWindow.add_item(%Transaction.new(10, Timex.now())
      |> SlidingWindow.add_item(%Transaction.new(15, Timex.now())
      # And at some point:
      |> SlidingWindow.get_aggregates()

# Result now contains something like:
result ==            %{0 => %TestSW{count: 0, product: 1, sum: 0},
                       1 => %TestSW{count: 0, product: 1, sum: 0},
                       2 => %TestSW{count: 0, product: 1, sum: 0},
                       3 => %TestSW{count: 1, product: 1, sum: 1},
                       4 => %TestSW{count: 2, product: 6, sum: 5},
                       5 => %TestSW{count: 2, product: 20, sum: 9},
                       6 => %TestSW{count: 2, product: 42, sum: 13},
                       7 => %TestSW{count: 2, product: 72, sum: 17},
                       8 => %TestSW{count: 2, product: 110, sum: 21},
                       9 => %TestSW{count: 2, product: 156, sum: 25}}

In the end, I think that the smartest decision I can make is to not mess with wrapping this inside of a GenServer-like layer myself, as there are an infinite amount of possible ways how someone might want to manipulate the FSA. Rather, I’ll write some example code on how it could be wrapped in your own GenServer including an send_after message to ensure that the data inside never becomes stale.

3 Likes

I too am struggling to get PubSub configured w/o phoenix. So I took the code in this example and wrote it into a small example project.

But something is not configured correctly. Any and all hints are appreciated.

https://github.com/mwindholtz/pub_sub_spike/blob/master/README.md

Looks like you need to start the pubsub. Probably using a Supervisor.

Here’s an example from one of my projects.

defmodule EventServer.Supervisor do
  @moduledoc false

  use Supervisor

  #
  # client
  #

  def start_link() do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def start_link([]), do: start_link()

  #
  # callbacks
  #

  def init(:ok) do
    Supervisor.init(children(), strategy: :one_for_one)
  end

  #
  # private
  #

  defp children() do
    [
      supervisor(Phoenix.PubSub.PG2, [ EventServer.PubSub, [] ])
    ]
  end
end
2 Likes

Thank You !

So I updated my code and now have a working example if anyone wants to see:

I deleted the previous broken spike code since, well it didn’t work.

1 Like

updated code available here:

6 Likes