Building a pipeline processing with GenStage

Hi everyone,

I’m wondering about building a pipeline with GenStage and RabbitMQ queues for processing data about the players which are searching for a game with similar skill (or rating) for one of my open source project, but slightly stuck on the design phase. The basic idea it’s pretty straightforward:

  1. Client put a message into the certain message queue as the request for search a game for him. In any of those messages will be specified reply_to that will be using in the last stage for getting understanding where to left the final response.

  2. And here starting the most interest part of the whole topic: a processing a data about the players. From the some point of view it is none trivial task, but I’ve tried to solve it with the following way:

    2.1. The client’s message from the first step is writting into the “generic queue” which is storing all requests for searching a game with opponents.
    2.2. The publisher/consumer workers is consuming a message, sending a request to the database for getting an additional information about the player and put the message into the next queue, depends on the rating or skill of the player.
    2.3. The next group of publishers/consumers are consuming messages only from the certain queue and processing them (for example a group of workers that processing the players only with an average skill):
    2.3.1 Because each worker is linked to the specific queue, it will extract message in sequence and try to analyze it. If the player, the information about which was specified in the message body, is according to the matchmaking algorithm, then the selected player will be saved in the memory of the worker and the extracted message deleted. This process is repeating until the worker have not enough players to fill the game lobby. And when it will completed, a list of players will be transferred as a message to the next queue.
    2.3.2 Otherwise the message will be published the special queue, created for requeuing players into generic queue.
    2.3.3 Special type of workers are re-publishing messages to the generic queue, which are coming from the publishing node from the 2.3.2 step.
    2.3.4. A worker is extracting the published message on the previous step, and creating a new game server (or choosing one from the already existing). After that it will broadcast the server IP-address, port and connection credentials to each player mentioned in the list via particular response queues, that were specified by clients in the first step.
    2.3.5 Each client is getting the response from response queue and connecting to the game.

The same thing but demonstrated with the picture:

  1. After when the processing will be done, the worker will extract the response and send it to the certain response queue (that was specified on the first step).

and I have a couple questions, that still raising while I’m designing it:

  • Does it a good idea to build this up? Or better to go an another way, when it will be splited up onto small applications?
  • It will be great to scale it up when will be necessary in runtime, because we actually don’t know how many requests will come for processing concurrently. Hovewer, it will be good to configure and use a backpressure of GenStage? Or it will be an overkill?
  • How to deal with a case when necessary to store a list of players in the workers while collecting players into one group before putting them into one game lobby?
2 Likes

It’s not like I developed a lot (or any for that matter) production ready processing queues, but I’m currently building one and my approach to it was a bit simpler I think, although at the same time I don’t know your constraints nor needs - perhaps genstage and several different queues is the way to go for you.

I’m not really answering your questions, just going to share the way I went about it, perhaps it gives you some idea.

I basically opted to have two types of ETS tables (actually there are a bit more but related to other parts),

one which holds the player info in a form such as:

{"player_id", data ... }

which basically is used to verify quickly if a user can or not join a queue, start a game, etc. You look it up with the player ID and then pattern match the tuple & contents to figure out if a player is already playing/has any restraint/etc.

And another ETS table for each type of “tournament” available. So for sake of illustrating this lets say there’s only one tournament type which has 3 matches length - this ETS table will have a few entries with each describing its own record track as the key (if there were more tournament types then there will be more ETS tables, one per each tournament type ofc):

{ "000", [ ] }
{ "w00", [ ] }
{ "l00", [ ] }
{ "wl0", [ ] }
{ "ww0", [ ] }
{ "ll0", [ ] }

With 3 matches length these are all possible track records for matching. When I mention “record” it’s about these “record” tracks and not a record in terms of data structure. The list in the 2nd term is actually the “queue”. Each character on the ETS “key” represents the result of the match, with 0 being not played, w won and l lost. The order of the won & lost doesn’t matter for queuing, so "wl0" is equal to "lw0" hence only "wl0" is used. This is built by an algorithm that takes the number of matches you want to play and produces all the possible relevant results, creating an ETS table where each entry is a record track. This particular one is created on app startup because these tournaments will always be up. Others could be created dynamically at runtime. They can also hold more information if needed.

Now when a user asks to join the play queue he has his own record track. For instance if he hasn’t played any match it will be "000". If he has played two and won both it will be "ww0". If he has played two and lost one and won another it will be "wl0" (because the record string is sorted in order it will always be w’s first, then l’s then 0’s - of course)

The request to join is cast to a genserver which basically just queries the relevant ETS table, looking up the track record that matches this player’s record track, so for instance it gets the record with key "000" in case the player hasn’t played yet any match. If there’s any player in the queue and they can duel each other then the game starts (there are some other details like you can cancel and be requeued, etc)

This genserver handles this activity asynchronously and then itself calls a supervisor to start an individual game genserver for the two matched players that actually handles the game creation and players “negotiation”/“acceptance” to start, update the ets table holding the player “status”, broadcast etc, so it shouldn’t be a bottle neck. The only thing it does is, effectively, looking up an ets table and going through the list in that record (which I expect to never really grow more than 3 or 4 items).

Nonetheless, if it becomes a bottleneck then it can be made so that instead of a single genserver handling all queue requests, it’s instead a genserver per “record track”. So basically a genserver will handle requests to join the record track of "000", other "w00", etc. Since it’s already using the “record” track system, it’s easy to send these requests to their respective genserver as well in case it moves in that direction but I’m not worrying about it rigth now

In this approach a player only ever gets enqueued if there’s no available opponent already waiting, or if the ones that are can’t play against him (they already played against each other this tournament). Otherwise he doesn’t even “get” in the queue. I think this is the easiest way to model it that creates less edge cases but I might be wrong.

There’s one last details which is a send_after message, that gets “scheduled” when a user is enqueued (meaning there wasn’t any available opponent), and basically contains the id of the player and the record track in which they got enqueued. In case it reaches that “time waiting” it will try to find an available opponent in the record tracks above and below the players one (so a player with 2 wins might get matched against a player with 1 win or 1 loss - but not against one player with 2 losses).

When a player is matched in any way their timer is cancelled and basically that’s it.

Hope you can get anything useful out of it, gl!

In general it looks very close to my current solution that was described, except that you’re using ETS tables for handling a user data and available game rooms. I had some thoughts about using it, however, but have a few caveats:

  • In your case everything is storing only on a single machine or a cluster?
  • Do you considered to use Mnesia / Cassandra / other databases for handling user data when you’re working with the cluster?

I was thinking about how handle case when the player is trying to re-queue himself or cancel the searching and, probably, using Mnesia (only for tracking an active players) will be good here. But for any other sort of thing I’d like to prefer to use external database, which, potentially, could be used for analytics later.

It’s not like I developed a lot (or any for that matter) production ready processing queues, but I’m currently building one and my approach to it was a bit simpler I think, although at the same time I don’t know your constraints nor needs - perhaps genstage and several different queues is the way to go for you.

The main reasons why I’m doing it are not something special or unique:

  • Build a cluster of machines where each of them is a microservice based on the message queues. Currently I’ve implemented the reverse proxy (an entry point to the system) and the auth/auth microservice (the general reason)
  • Split up the internal parts of the “searching players” functionality onto small blocks so that I could scale them up separately. Also it will be great to understand what is going on under the hood when something going wrong.
  • Dive into FP and Elixir, when you could represent this workflow as the sequential process.
  • Get an experience in using this feature for handling / processing a big amount of data concurrently.

Sincerely I haven’t thought of clusters nor anything yet as I have no idea about that - I imagine this will get me quite far before I need to do that (if I ever happen to have more than myself wanting to play the game that is!)

Personally I’m trying to keep a “proper” database out of the main “flow”. I use it for storing users and persisting games (although when being played the game state is served from a GenServer and the DB is only a persistence layer). Even the “elements” for the game are not in a DB but instead in ETS tables, that are built on app start from json files. The reason I’m doing it like this is because I imagine I can then more or less easily switch to mnesia or other solutions if needed, there’s no migrations, all available “elements” (in this case “scrolls”/“cards”) can be updated while running by just repopulating the ETS tables, etc.

I’m also trying to keep the logic and flow the simplest possible with the least amount of jumps between “points” so to keep my sanity- it seems that each level of indirection between a request and an action requires further considerations in terms of “did it work as expected or not” so I’m trying to keep it simple and diminish the possible points of failure.

Regarding your first point, if you build a cluster where each machine is a microservice wouldn’t this particularly fit the idea of running the machine as an in-memory db like ETS?

Your second point, perhaps is relative to your game and search functionality, so it makes sense, in my case there’s really not much search functionality, but the more you split the more I think is harder to understand what goes wrong when it goes? Or am I missing something.

Regarding your first point, if you build a cluster where each machine is a microservice wouldn’t this particularly fit the idea of running the machine as an in-memory db like ETS?

As I mentioned in the previous post, using the Mnesia (which is in general based on ETS tables) probably will be a good fit for storing data about active users (only their IDs and perhaps a couple of informational fields). And I think this is perfect for handling cases when user is cancel searching or abruptly lost a connection while he was in searching stage.

For everything else (like a full information about the user, his current statistics and so on) I’d prefer to use an external SQL or NoSQL storage. First of all because Mnesia has some limitations (like the size per node) and probably this data can be used for other stuff as well, like analytics or building a top players of the month.

Your second point, perhaps is relative to your game and search functionality, so it makes sense, in my case there’s really not much search functionality, but the more you split the more I think is harder to understand what goes wrong when it goes? Or am I missing something.

The basic idea about using microservices is split it the functionality onto small pieces, so that each part is doing his job well enough and can be scaled horizontally when it necessary. And the matchmaking microservice it is a good example of microservice. In this topic perhaps I’d described some things too details, but I’m doing it in order to any other people will understand what result I’d like to get.

If we will rethink what is representing the matchmaking, then in general you will describe it like this:

  1. You have an entry point where all players coming with request for a search a game for him.
  2. The server should somehow organize the queue, so that the time required for searching a game for the player will be minimal. Therefore, why not to split up the incoming players into smaller groups and use this advantage later, if we know who they are?
  3. Sequentially sort out players accordingly to your algorithm (for an instance, Elo-based / TrueSkill implementations). The matched players is group up into one list until it won’t be filled. When the list will be prepared, send the result to the next stage.
  4. Invite the prepared group of players into the game lobby and start the game.

It’s not related to the certain game, because I’m trying to generalize this solution. It means that potentially the final project can be used for everything that you needed (but with some sort of tweaking, of course).

I think we’re choosing to model the problem in different ways, mostly because you’re trying to generalise it while I’m modelling it to my own definitive use cases which is fairly easier to do.

I personally think that this is highly dependent on what you’re trying to build and probably having the building blocks (or in case you want even higher level tools like Rabbit, for some guarantees and less lines), and then gluing them according to your needs is better than an abstraction because, the logic for sorting and matching and what happens, before and after will always need to be written, and shoe-boxing the flow into a set of assumptions probably just renders it much more complex than it needs to be and much less flexible. Perhaps there are a whole set of games/situations that could benefit from it but I tend to think it would be rather rigid?

I personally think that this is highly dependent on what you’re trying to build and probably having the building blocks (or in case you want even higher level tools like Rabbit, for some guarantees and less lines), and then gluing them according to your needs is better than an abstraction because, the logic for sorting and matching and what happens, before and after will always need to be written, and shoe-boxing the flow into a set of assumptions probably just renders it much more complex than it needs to be and much less flexible.

I agree, that better to start from something and later solve the issues when they come with bottlenecks and so on. Over-engineering isn’t our friend in this case, honestly saying.
From another point of view, because each microservice will be using message queues while communicating with other parts of the system, we could re-use those queues/exchanges as much as possible and implement some sort of pipeline for processing data. That’s was the main reason why not to try out GenStage for those things, if it provides simple interface for building your own pipelines.

Perhaps there are a whole set of games/situations that could benefit from it but I tend to think it would be rather rigid?

Of course, I have some limitations around this microservice. For example, limitations around the creating and using sub-queues that are going to use only for the certain types of players. For the first versions it will be enough to me to use only one queue. Later, probably, their amount will be increased.