Hi,
In AWS SQS, I have one FIFO queue for each user with messages coming from another system. When a user joins his own Phoenix channel (e.g. user:42), I start a Broadway process that keeps pulling messages from user’s queue in SQS and pushing them to the user via websocket.
My current implementation calls Broadway.start_link/2
from the join/3
callback of Phoenix Channel, like this:
Broadway.start_link(__MODULE__,
name: String.to_atom("Broadway-" <> user_id),
producer: [
module: {BroadwaySQS.Producer,
queue_url: queue_url(user_id)
}
],
processors: [
default: [
concurrency: 1
]
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 1000
]
]
)
The questions are:
-
Is this right in terms of processes and supervision?
-
When the user disconnects, I see that all his Broadway-related processes are gone, which is good, but I’m wondering if this is a proper stop/shutdown or if I should implement something.
Best regards,
Roger
If you want Broadway to properly drain messages and perform a graceful shutdown, it should be started as a child of a supervision tree. See the bullet point “Graceful shutdown” here. Typically, people start Broadway as a child of the supervision tree of their main application.
Starting a new Broadway pipeline whenever a user joins a channel seems like overkill to me, as it is using a separate SQS queue for each user (do you create them dynamically?). Without knowing the exact problem you’re trying to solve, I would probably attempt to have a constant number of queues (1 is possible) shared by all users and a single pipeline that fetches messages and routes them to the destination channel.
1 Like
Thanks for your inputs.
it should be started as a child of a supervision tree
It is under the Phoenix supervision tree, but I’m not sure it means Broadway will perform a graceful shutdown.
Typically, people start Broadway as a child of the supervision tree of their main application.
That’s exactly how I usually do too, but only if I know which queue I’ll be reading from a priori, which is not the case.
do you create them dynamically? [a queue per user]
Yes, that other external system creates the queue as part of the provisioning process of a new user.
I would probably attempt to have a constant number of queues (1 is possible) shared by all users and a single pipeline that fetches messages and routes them to the destination channel.
The problem is the destination channel (the user) might not be there to receive the fetched message, so we would have to keep his message(s) in the queue for certain time, then after some short time they would appear again but the destination might still be offline and so on and on. Imagine we have hundreds of offline users. It seems odd to keep fetching messages that we can’t deliver. Also, online users might be penalized (in terms of delay) as their messages might be “buried” under a bunch of messages targeted to offline users.
The problem I’m trying to solve is: how to deliver user-specific enqueued messages to the users when they get online. Messages might be enqueued at any time, but dequeued only when the user is online.
Again, thanks a lot for you time and let me know if I can be more exact or specific.
1 Like
Thanks for the overview. What you’re sketching sounds a lot like email.
Have you thought about creating user’s mailboxes (for example in a DB) where you can store undeliverable messages? When you fetch a message for a user from the queue and the user is offline, you store the message in the mailbox. The user will then receive the messages when they go online.