Hi there, thanks for reading this! I’ll try to be as brief as I can.
I am relatively new to Phoenix and Elixir, but doing well I hope. I am working on a Phoenix ‘game’: several hundreds of people will login simultaneously and start sharing messages. Not a huge workload, but enough to be careful with database storage. I have created a similar game in Rails, but it was unreliable. One of the things that caused troubles was a peak load on the database during the first 5 mins of the game.
This time, in Phoenix, I decided to use a GenServer to store all data in an :ets_table. This is working great! But I also would like to store this data in a Postgres database, preferably in a non-blocking fashion. I received an idea to do this with another GenServer. Every time I have to insert, or update a ‘record’ in the cache, I call this GenServer to do the same thing in the database as well.
Now here I started to get a bit enthusiastic… So the idea is to collect changesets in a queue. Every time a new changeset arrives in the queue I count the amount of items in the queue. If I have enough items (more than PostG’s pool_size items) I will execute a batch with Repo.insert or Repo.update wrapped in a Task.async. That also works. But! I almost always will end up with a couple of last elements in the queue (less than pool_size). To overcome this, I have a periodic task (using the tick
function) to check if I have to flush the remaining items in the queue. Also works.
One of the nice things is that I can check if a certain record is updated twice in a single batch of changesets. If this is about to happen, I wait 0.2 seconds before I execute the second update to solve race conditions. Here is the code:
`
defmodule NetworkLab.GenServers.DatabaseAssistant do
use GenServer
alias NetworkLab.Repo
@tick_interval 1_000
@pool_size Application.get_env(:network_lab, NetworkLab.Repo)[:pool_size]
def start_link(_) do
GenServer.start_link(__MODULE__, :queue.new(), name: __MODULE__)
end
# add a changeset to the queue, <changeset> is a map containing the actual changeset
# plus a flag that indicates if this is about inserting or updating
def add(changeset) do
GenServer.cast(__MODULE__, {:add, changeset})
end
# list the queue
def list() do
GenServer.call(__MODULE__, :list)
end
# reset the queue
def flush() do
GenServer.call(__MODULE__, :flush)
end
def init(queue) do
tick(queue)
{:ok, queue}
end
# handle adding to queue
def handle_cast({ :add, changeset }, queue) do
# add changeset to queue
queue = :queue.in(changeset, queue)
# if length of queue is bigger or equal to pool_size than
queue = if queue_length(queue) >= @pool_size do
# remove <pool-size> elements
{ removed, new_queue } = :queue.split(@pool_size, queue)
# iterate over removed elements
execute_queue(removed)
# return remaining queue
new_queue
else
queue
end
{ :noreply, queue }
end
# if we had enough items in the queue, execute the insertions/updates
defp execute_queue(queue) do
# loop over queue items with fresh MapSet
Enum.reduce :queue.to_list(queue), MapSet.new(), fn item, processed ->
# create a summary of the changeset defstruct in the item
summary = { item.changeset.data.id, item.changeset.data.__struct__ }
# if this is a changeset we have already encountered, we wait a bit to avoid
# race conditions
if MapSet.member?(processed, summary) do
# sleep for 0.2 sec
:timer.sleep(200)
end
# execute the query
execute(item)
# and add the struct summary to the processed map
MapSet.put(processed, summary)
end
:ok
end
def handle_call(:flush, _payload, _queue) do
{ :reply, :ok, :queue.new()}
end
def handle_call(:list, _payload, queue) do
{ :reply, :queue.to_list(queue), queue}
end
def handle_info(:flush_remainder, queue) do
#IO.puts("QUEUE SIZE: #{inspect(:queue.len(queue))}")
if :queue.len(queue) > 0 && :queue.len(queue) < @pool_size do
# flush queue
execute_queue(queue)
end
# set a tick to check in @tick_interval seconds
tick(queue)
# empty queue
{ :noreply, :queue.new()}
end
defp tick(queue) do
# WHERE DO I PUT THIS!?
NetworkLabWeb.Endpoint.broadcast("admin", "update", %{ payload:
%{ selector: "dba", cell: "queue", id: 1, message: :queue.len(queue) }
})
Process.send_after(self(), :flush_remainder, @tick_interval)
end
defp queue_length(queue) do
:queue.len(queue)
end
# execute async with Repo
defp execute(data) do
task = Task.async fn ->
case data.action do
"insert" ->
Repo.insert(data.changeset)
"update" ->
Repo.update(data.changeset)
end
end
Task.await(task)
end
end
`
I have a two questions:
- I would like to send realtime data to an admin panel so I can see how large the queue is. I broadcast the queue length to admin in the periodic tick function. If I ‘test’ the GenServer by hitting it hard, I will loose the ‘tick’ function, it seems it is waiting for the mayhem to pass and than picks up after the test is done. If the GenServer is busy, I would like this tick to happen, no matter what, and broadcast info to admin.
- Is this setup ridiculous? I realise it is a bit much… I am getting pretty enthusiastic about Elixir and Phoenix and the database buffer is a great pet-project. But if it will be unreliable in production I would like to know.
Thanks again,
Cspr