Receiving realtime information from database-updating-GenServer

Hi there, thanks for reading this! I’ll try to be as brief as I can.

I am relatively new to Phoenix and Elixir, but doing well I hope. I am working on a Phoenix ‘game’: several hundreds of people will login simultaneously and start sharing messages. Not a huge workload, but enough to be careful with database storage. I have created a similar game in Rails, but it was unreliable. One of the things that caused troubles was a peak load on the database during the first 5 mins of the game.

This time, in Phoenix, I decided to use a GenServer to store all data in an :ets_table. This is working great! But I also would like to store this data in a Postgres database, preferably in a non-blocking fashion. I received an idea to do this with another GenServer. Every time I have to insert, or update a ‘record’ in the cache, I call this GenServer to do the same thing in the database as well.

Now here I started to get a bit enthusiastic… So the idea is to collect changesets in a queue. Every time a new changeset arrives in the queue I count the amount of items in the queue. If I have enough items (more than PostG’s pool_size items) I will execute a batch with Repo.insert or Repo.update wrapped in a Task.async. That also works. But! I almost always will end up with a couple of last elements in the queue (less than pool_size). To overcome this, I have a periodic task (using the tick function) to check if I have to flush the remaining items in the queue. Also works.

One of the nice things is that I can check if a certain record is updated twice in a single batch of changesets. If this is about to happen, I wait 0.2 seconds before I execute the second update to solve race conditions. Here is the code:

`
defmodule NetworkLab.GenServers.DatabaseAssistant do
use GenServer

alias NetworkLab.Repo

@tick_interval 1_000
@pool_size Application.get_env(:network_lab, NetworkLab.Repo)[:pool_size]

def start_link(_) do
    GenServer.start_link(__MODULE__, :queue.new(), name: __MODULE__)
end

# add a changeset to the queue, <changeset> is a map containing the actual changeset
# plus a flag that indicates if this is about inserting or updating
def add(changeset) do
    GenServer.cast(__MODULE__, {:add, changeset})
end

# list the queue
def list() do
    GenServer.call(__MODULE__, :list)
end

# reset the queue
def flush() do
    GenServer.call(__MODULE__, :flush)
end

def init(queue) do
    tick(queue)
    {:ok, queue}
end

# handle adding to queue
def handle_cast({ :add, changeset }, queue) do
    # add changeset to queue
    queue = :queue.in(changeset, queue)
    # if length of queue is bigger or equal to pool_size than
    queue = if queue_length(queue) >= @pool_size do
        # remove <pool-size> elements
        { removed, new_queue } = :queue.split(@pool_size, queue)
        # iterate over removed elements
        execute_queue(removed)
        # return remaining queue
        new_queue
    else
        queue
    end
    
    { :noreply, queue }
end

# if we had enough items in the queue, execute the insertions/updates
defp execute_queue(queue) do
    # loop over queue items with fresh MapSet
    Enum.reduce :queue.to_list(queue), MapSet.new(), fn item, processed ->
        # create a summary of the changeset defstruct in the item
        summary = { item.changeset.data.id, item.changeset.data.__struct__ }
        # if this is a changeset we have already encountered, we wait a bit to avoid
        # race conditions
        if MapSet.member?(processed, summary) do
            # sleep for 0.2 sec
            :timer.sleep(200)
        end
        # execute the query
        execute(item)
        # and add the struct summary to the processed map 
        MapSet.put(processed, summary)
    end
    :ok
end

def handle_call(:flush, _payload, _queue) do
    { :reply, :ok, :queue.new()}
end

def handle_call(:list, _payload, queue) do
    { :reply, :queue.to_list(queue), queue}
end

def handle_info(:flush_remainder, queue) do
    #IO.puts("QUEUE SIZE: #{inspect(:queue.len(queue))}")
    if :queue.len(queue) > 0 && :queue.len(queue) < @pool_size do
       # flush queue
       execute_queue(queue)
    end
    # set a tick to check in @tick_interval seconds
    tick(queue)
    # empty queue
    { :noreply, :queue.new()}
end

defp tick(queue) do
    # WHERE DO I PUT THIS!?
    NetworkLabWeb.Endpoint.broadcast("admin", "update", %{ payload:
        %{ selector: "dba", cell: "queue", id: 1, message: :queue.len(queue) }
    })

    Process.send_after(self(), :flush_remainder, @tick_interval)
end

defp queue_length(queue) do
    :queue.len(queue)
end

# execute async with Repo
defp execute(data) do
    task = Task.async fn ->
        case data.action do
            "insert" ->
                Repo.insert(data.changeset)
            "update" ->
                Repo.update(data.changeset)
        end
    end
    Task.await(task)
end

end
`

I have a two questions:

  • I would like to send realtime data to an admin panel so I can see how large the queue is. I broadcast the queue length to admin in the periodic tick function. If I ‘test’ the GenServer by hitting it hard, I will loose the ‘tick’ function, it seems it is waiting for the mayhem to pass and than picks up after the test is done. If the GenServer is busy, I would like this tick to happen, no matter what, and broadcast info to admin.
  • Is this setup ridiculous? I realise it is a bit much… I am getting pretty enthusiastic about Elixir and Phoenix and the database buffer is a great pet-project. But if it will be unreliable in production I would like to know.

Thanks again,

Cspr

For sending out notifications you’ll ususally opt for a pubsub mechanism. If you’re using phoenix you should already have phoenix_pubsub installed, which gives you that ability.

1 Like

Hi @casperkaandorp and welcome! This is a very interesting problem space, and it’s great to see that you’ve got a pretty solid handle on how GenServers work, which can sometime be challenging for new people.

As a bit of a tangent, we solved a similar problem at our company by leveraging the Event Sourcing pattern. Actions that users would take would open a transaction, write an event to an events table, and then derive and execute changeset(s) to other tables on the basis of that event.

What was nice about this is that when we wanted real time push updates via channels, we just added an asynchronous reader that walked through the events table and would publish to different topics depending on what the event was and what it referenced. The reader tracked the last event id it had published on, which we could compare to the most recent event id, allowing us a sense of whether it was getting behind or not.

Getting back to your implementation, I’ve got a few questions.

Storing data in postgres is good, since you’ll avoid dataloss. Can you elaborate on the non blocking bit though? Is the concern that postgres won’t be able to keep up? When I first came from rails I had this mantra that “the database is always slow”. As I wrote more Elixir I came to realize that a lot of that perceived slowness has to do with poor access patterns that Rails makes really easy to do. Postgres can be really quite fast if you use it right.

The tradeoff here is about guarantees. What is your tolerance here for data loss? What happens if your genserver can’t keep up and your server runs out of memory? BEAM will crash and you’ll lose all the changesets. Is that acceptable? I don’t know with your use case.

Notably, unless you transform the set of changesets into rows you can insert_all with, or updates you can update_all with, doing N changesets worth of data isn’t really faster done in 1 process than done in N client HTTP processes. It’s might be faster done in 1 if you use Repo.checkout or Repo.transaction so that you have a single connection. On the other hand you have wont’ have any concurrency to the inserts, which you would if you let the client request processes make the inserts.

Basically, you unblock the clients, but you might lose their data.

I’m not entirely sure I see how this helps with race conditions. The GenServer can’t race condition with itself, it’s a single thread and it’s doing one change at a time. It can race condition with any other updates or inserts happening that aren’t serialized through this GenServer, but delaying the insert 0.2 seconds won’t change that.

All in all, I think you really have two distinct issues you’re trying to solve:

  1. How can you avoid overloading the server?
  2. How can you get real time updates?

Buffering as a solution to #1 is tricky. If the peak load fits in your buffer it can help. If it lasts longer than you expect or is more intense than you expect, your buffer will eat all your memory and everything will crash. Personally, I’d start by doing the simple approach where each client hits the database, do some benchmarks, and if that turns out to be too slow measure to figure out what is slow and optimize. If you have stuff you don’t mind losing a bit of data reliability on (like chat messages) I think there are better ways to drop those.

As for #2, I think this can be achieved within the client request process as well. Instead of pushing changesets to a central genserver and then having that genserver broadcast to everyone, the client processes are just as able to do broadcasts. If nobody is listening broadcasting is basically a no-op. If people are listening then the client process will need to spend the time to send the messages, but those are rather quite fast operations and more to the point, if the client is making your server do work, the client should pay the latency penalty for that. If that latency gets too high you optimize. Buffering is a high risk quasi-high reward optimization. It feels super fast (high reward) until the buffer is full, and then everything blows up.

3 Likes

Hi @benwilson512, thanks for your lengthy reply, appreciate it!.

Oh! Well, I have/had the same mantra. The Rails application was a failure. The database could not keep up and I got pretty paranoia about it. It was traumatic. No kidding here. This is exactly the reason why I have created the GenServer. I have a gut-feeling that this GenServer is a bit too much. But I would like to program it anyway, and I think it gives me a place to avoid race conditions.

The use-case: I have between 400 and 700 people who will show up in a period of minutes. They get paid to play. Once they have signed-in, and are assigned to a slot, they will produce and update around 150 records per user. These records will not contain massive amounts of data, we’re talking a couple of fields containing small strings, dates and integers. That adds up to 60.000 / 105.000 database requests. Much of these are processed within the first 5/10 minutes.

I have no feeling for performance here… it would surprise me if these 105.000 records would cost a lot of memory and, again, it sounds like a database should be able to handle this load.

My tolerance for data-loss is very low. Afterwards I will collect all cached data and store it in a CSV, but would like to have the same data in a database as well. It is redundant, but the data is important. Don’t want to take chances here. When anything crashes, something will be better than nothing.

I do have to be careful about race-conditions. During testing I found out that in certain cases signing-in and slot-assignment were executed almost ‘simultaneously’ causing weird records in the database. A user was assigned to his/her slot, but the status of this person wasn’t updated, probably because the same record was still in the ‘transaction’ of the sign-in. The problems were immediately solved by simply postponing the slot-assignment. So that rattled my cage again. By switching to a GenServer with a queue I can check if a batch from the queue is about to update the same database-record multiple times. If in the batch a second (or third) update is discovered, I will delay the second update for 0.2 seconds. I guess that’s enough. No science here, just ball-parking it. Note that I execute these updates wrapper in a Task.async. I thought that this ensures that the GenServer moves on to the next batch once it ordered the executions without waiting for it.

The update_all and Repo.transaction makes sense, but since I am comparing changesets before updating/inserting it seems to me it will complicate things. If I find that I am about to update the same record twice I should remove the second changeset and put it back in front of the queue. If I can keep things simple, by just adding a bit of delay, I would prefer that.

Phew, that’s a lot of writing. I changed things a bit yesterday. I know use the ticks differently: at every tick (10 per second) I process a batch in the queue. Which solved the real-time update of the queue-state to one admin user (separate channel). Let me know if you would like to see the new code.

So @benwilson512, it feels like I am all over the place. Suffering from PTSD, and probably enthusiasm as well, I would like to continue with the GenServer unless it is a very bad idea. I would appreciate some feedback on the numbers I have mentioned (peanuts or concern?) and the way I handle the race-condition. And again, thank you for your time.

Cspr

1 Like

IMO you are overthinking and overdoing it.

Firstly, with proper authentication, there’s no way ever to execute a DB transaction for a logged-in user before the auth framework gives you a session cookie. So how exactly did the code manage to do what you described? Pretty weird.

Secondly, using Ecto already means pooled DB connections, proper concurrency and resource management, and transactions if/when you need them. Nothing you slap on top will make it better; on the contrary, it might overcomplicate it and you might end up creating the problems.

Thirdly, it’s pretty easy to introduce an event-queue-like table in Postgres and just have a background job (maybe that should be your GenServer) process them in order.

Perhaps I am not reading you well but it definitely does look like you are trying to be more clever than some very solid and proven frameworks like Phoenix and Ecto.

Hey @dimitarvp,

Fair enough! Thanks.

This is not your average sign-in setup. All user-accounts exist beforehand. People who will participate get an access token. They go to a page, fill out a very small form and send the form-data, along with the token to a session controller. After token verification, I store the form data, along with a status update (want to know if people signed-in, but didn’t participate). After updating the user account, the user-record is passed to another GenServer.

Now this GenServer is not a frivolous pet-project. It counts how many people I have waiting to be assigned to a slot. In some cases, if I have enough people of a certain type, they will get a special slot. If these slots are unavailable they get a different slot, causing a new update of the user-account: status is changed, plus information about the slot where they are playing.

Testing the above let to a database table in which a very low amount of users were assigned to a slot, but did not receive the proper, hard-coded, status update that goes along with it. If I delayed the slot assignment by calling other necessary functions first, the problem was solved. Weird? Yes! A seasoned Phoenix programmed blamed race-conditions.

Got it.

Thanks for your candid and direct reply. As a Phoenix/Elixir-novice, the above sounds a bit funny. I am pretty stupid when it comes to this framework… I saw the weird thing happening in my database, and I -am- worried about another database-clusterf*ck so I am exploring things to overcome these problems. Definitely not trying to be a smart-#ss here.

cheers

Cspr

2 Likes

To be clear, I’m not suggesting that introducing the delay didn’t fix your issue. If that’s what you observed then I’m sure it did. Rather my point is that introducing delays to solve race conditions will only happen to solve them under certain conditions. If the conditions change, and certain operations take a little longer than before, the problem will reappear. Delays at best make race conditions less likely, they don’t solve them categorically. To solve it, you need to identify what is racing with what and introduce some kind of coordination mechanism.

To be clear, I think using a GenServer per slot or something is a perfectly fine mechanism for grouping people. I think it gets a little tricky across nodes, but even that is solvable. I’m just cautioning against using a single genserver for this, you introduce an application wide bottleneck, and increase the cost of failure.

Are you running multiple nodes by the way or just one?

Right. I have absolutely no objection to trying to use genservers to handle your “group people into buckets” logic. I think that’s totally fine. What I’m objecting to is the use of genservers as a database write optimization. 105k writes is 105k writes. It isn’t faster done in a single process vs several. 105k records over 5 minutes is ~250 records per second. That definitely sounds like something that would give rails trouble, and won’t be an issue for Phoenix / Ecto.

I guess my main point is this: You had a problem in rails, you assumed it would be a problem in Elixir, and you wrote a relatively complicated solution to this assumed problem. This solution, while pretty impressive for someone who is just starting with the frameworks, has introduced an application wide bottleneck, and prevents horizontal scalability (how would it work multi-node?). What I think @dimitarvp and I are suggesting is that the best way to solve this would be to start more simply with ordinary writes in the requesting process, solve race conditions with ordinary coordination mechanisms, optimize this as you measure slow points, and then if you want to move the actual bucketing process to memory go for it.

Moving the buckets to memory by the way should probably involve a genserver per bucket, so that you can get some concurrency happening and minimize the impact of any issues.

1 Like

So it seems I did partially misunderstood you. Apologies!

What you describe sounds like a good candidate for event-driven code: the user-facing actions should merely queue up DB-persisted events which a background worker(s) will execute in the right order, updating the database as they go.

You are right to introduce a GenServer in this scenario. But IMO don’t involve it in the DB work itself. It should just be pulling and processing events.

An added bonus to such an approach is that it also allows for synchronous actions (it’s otherwise asynchronous in nature): namely if the user browser has to get a response with the result immediately, you can make a function that executes all related events in order and eventually yields the result.

1 Like

Completely agree. The primary user interaction could just write a row to an event queue indicating the action they want to take. Then you can have a genserver work through those and perform other operations asynchronously. Ideally the event row would have some concept of a “group” that the action belonged to, because then you could actually have N genservers, and shard the groups over the set of genservers, allowing for concurrent processing of the event queue, while still maintaining serial guarantees per group.

2 Likes

@benwilson512, @dimitarvp,

I am sorry for the late reply. My sincere apologies! I have rearranged my GenServer and it works great. I do acknowledge I am probably making things overly complicated. But guys, I am walking in a candy-store here and I just can’t resist. Besides that: I don’t have enough skills to benchmark the app, nor do I have the financial means to conduct an elaborate pilot to check if a simpler implementation is enough.

Anyways, I want to thank you both! I have learned stuff. Since there is no obvious solution here I haven’t checked a Solution box (can’t check two replies). If that needs changing, please let me know.

cheers

Cspr

4 Likes

That’s completely fine, I am not here to collect trophies. :slight_smile:

If you like, you can post a relevant code snippet that adequately shows how you tackled your problem and then self-accept that as the solution.

1 Like