Thank you very much for the wonderful replies thus far.
Let me try to explain in a little more detail what I am trying to build:
The end result of what I want to make, is a library that facilitates the on-line calculation of sums, products and averages that can be used to draw pretty graphs and summaries.
In an application I am making (a kind of game), players can make events happen. These events, are the buying/selling of in-game resources. So each event, or transaction, can be considered a struct like this: %Transaction{resource_id: integer, n_resource: integer, money: integer, created_at: DateTime.t}
. For buying, n_resource
is positive while money
is negative. For selling, the opposite.
Now, I want to keep track of the average buy/selling price of the different items, as well as the amount of items bought/sold in total during this time. Not only the ‘current’ average, but a moving average over the past minute, fifteen minutes, hour, day, week (the different window types).
For each {resource_id, window_type}
, I thus want to start a GenServer that receives the desired %Transaction{}
s, keeps N queues, where each queue contains all items whose created_at
is between the beginning and end of the window.
So for instance, the window size might be 15 minutes
, and I might want to keep track of the scores in the last 36
15-minute intervals. Thus, there are 36 queues.
When a new item arrives, it is placed at the end of the newest queue.
Every few seconds, items from the front of the oldest queue are removed until the front item is newer than its cutoff. This is repeated for the one-before-oldest queue, but instead of removing these items, they are placed at the end of the oldest queue. This procedure is then repeated until all queues only again contain items part of the time window with relation to ‘now’.
As items are added and removed to the queues one-by-one, we can calculate sums and products (or any other commutative reversible operation) and averages (one sum divided by another sum) on-line:
def add_transaction_to_calculation(state = %{price: price, n_resource: n_resource}, tx = %Transaction{}) do
state
|> Map.put(:money, state.money + tx.money)
|> Map.put(:n_resources, state.n_resources + tx.n_resources)
|> recalculate_average()
end
def remove_transaction_from_calculation(state = %{price: price, n_resource: n_resource}, tx = %Transaction{}) do
state
|> Map.put(:money, state.money - tx.money)
|> Map.put(:n_resources, state.n_resources - tx.n_resources)
|> recalculate_average()
end
def recalculate_average(state) do
Map.put(state, :average, state.price / state.n_resources)
end
As this is an on-line procedure, at any given time it will probably be very fast to update any of the GenServer’s states.
This seems like the kind of thing that would be very useful in its own library.
However, making sure that it is easy but customizeable to send events (like, in this case, new %Transaction{}
structs) to the GenServer is what I am stuck at. I would like to use Phoenix.PubSub, but would not:
- Want someone to force to use
Phoenix
; the library should only depend on the publish/subscribe functionality.
- Want the broadcaster to be mindful of implementation details of the receiver. This would be the case if the library would provide its own means or wrapper of the message to be broadcasted to it (or its own alternative to Phoenix.PubSub). I would like the Moving Window GenServers to ‘tap in’ into what is already broadcasted to elsewhere.