What abstraction to use for Producer - consumer model with queue implementation

Hi! I am trying to build a solution and I need guidance as to what type of abstractions to use.

I have a pool of users and operators connected to my Phoenix app via channels. I have entries in my database for each operator and user along with their current state. Now I need to implement a basic FIFO queue of users. Whenever an operator’s state is ‘available’, I need to allocate the first member of the user queue to this operator and pop him out of the queue. The operator is now in a different state and will do some activities with this user through channels. The number of users can be less or more than the agents at any time.
I’d like to know if I should use an external library, like OPQ https://github.com/fredwu/opq, or implement Genstage on my own, or if there’s better solution to this entire problem.
Also, in order to make processing fast, I’m thinking of maintaining states of operators and users using Agent and then asynchronously update in the database.

Any sort of guidance will be really helpful.

Have you looked at Oban?

Or Broadway?

1 Like

That’s an interesting problem statement. At first reading it seems like it might be good to model this with processes (using either GenServer or Agent). Can you explain more why you need to persist the operators state in the database? Could that instead be stored entirely in memory?

Based on my understanding of your description, OPQ doesn’t seem like a good fit because it simply has generic worker processes, whereas you need to assign users from the queue to a specific available operator.

I think what you primarily need is a FIFO queue of users, then whenever an operator becomes available, it can request the next user from the queue. That way the operators don’t need to talk to each other, instead they just need to pull items off of the shared queue. Depending on your scaling needs a single GenServer holding the queue of users should work fine (Look at the erlang queue module).

2 Likes

It’s not really a producer consumer problem, I think it’s more related to limited resource access (where operators are those limited resources)… In which case I would look for design like poolboy, with checkin/checkout.

There is Erlang :queue module to manage queue.

If You want to be fast, You shouldn’t think in terms of database. As mentionned by @axelson there are many ways to go faster than db. At most, I would use db to load state for a GenServer. When I really need speed, I reach for ETS.

I don’t think GenStage is a good fit, it’s more like a pipeline, with successive steps.

It looks like a call center model, or a helpdesk service.

There are similar ideas in voip asterisk server https://www.voip-info.org/asterisk-agents/

Thanks for the reply! Actually our business requires us to store various states in the Database because those will be used for deciding the billing and also analytics (I know for analytics we can use other approaches). The number of users can go upto 5000 at peak.

Thank you! Looking into ETS :slight_smile:

Oban too has generic workers which won’t fit in my use case.

Whenever an operator’s state is ‘available’, I need to allocate the first member of the user queue to this operator and pop him out of the queue.

Operator sounds like a Rabbitmq consumer that can only handle 1 task at a time, and user queue is Rabbitmq messages queued up under a certain topic (channel). This is more of a routing problem than consumer/producer problem. I don’t have any concrete implementation I can suggest but rabbitmq is open source so you could look at their code :thinking: