Combining Broadway and Oban for SQS heavy message processing

Hello everyone! I’m new to Elixir and this is my first post here.

I’m currently developing a web application that requires processing of messages from a single AWS SQS queue. The queue contains two types of messages: fast messages that need to be processed quickly, and slow messages that can wait. The fast messages make up 95% of the queue, while the slow messages are only 5%. Processing the fast messages is simple: I just need to extract some data from the notifications and store them in a database. On the other hand, processing the slow messages requires a lot of computation and IO resources, but it doesn’t need to be done in almost-realtime.

Currently, I’m using BroadwaySQS.Producer to receive messages from AWS SQS and process the fast messages. For the slow messages, I plan to receive them in the same Broadway pipeline and create a job for each one using Oban. This way, the slow messages can be delegated to Oban as jobs that can be processed later. I’ll be using the free version of Oban.

Do you see any issues with integrating Broadway pipelines with Oban jobs? Or is it possible to solve this problem using only Broadway?

Happy coding! :slight_smile:

1 Like

My first instinct is why bring in Oban, if you can use tasks and let deletion from sqs queue to determine whether something is complete or needs a retry. I am unsure if my assumptions about sqs queues and deletion are correct.

I want to ensure that even if the whole app crashes or the server needs a reboot, those slow messages/jobs don’t get lost. However, I also want to remove them from the queue quickly because fast messages are altering the database and displaying content on the user interface (using Phoenix/LiveView).

Additionally, I want to control how many slow messages I’m dealing with at any given time. For instance, I may want to process just one slow message at a time. But, I’m not sure how to accomplish all of this using only Broadway.

@rump13 your plan seems sound to me. Broadway is meant for ingesting messages with back pressure, not as a general purpose background job system.

@kanishka this article explains why you’d want to use Oban instead of plain tasks: Oban Starts Where Tasks End · Oban Web+Pro

1 Like

I have read that article. What seems strange to me here is using both SQS and Oban. I would use one or the other.

app crashes

" Note however, that Broadway does not provide any sort of retries out of the box. This is left completely as a responsibility of the producer. For instance, if you are using Amazon SQS, the default behaviour is to retry unacknowledged messages after a user-defined timeout. If you don’t want unacknowledged messages to be retried, is your responsibility to configure a dead-letter queue as target for those messages."

I think that would cover the crash case?

control how many slow messages

I don’t know broadway well, but I assume there is a way to use different pools of workers and set sizes on the pools

You could likely use a separate SQS queue and broadway pipeline to process fast/slow messages.

  • SQS: fast.queue, slow.queue
  • Broadway: FastPipeline, SlowPipeline

Then either publish messages from current publisher or from FastBroadway as it comes across a “slow” message.

If it helps, you can control the concurrency for processors etc.


those slow messages/jobs don’t get lost

Might also be worth mentioning that Broadway handles exceptions during processing and passes any failed messages to handle_failed/2 so you can log/alert/cleanup/re-publish etc. SQS messages won’t be deleted until they’re acknowledged so there’s a low chance of them getting lost.

We use RabbitMQ with Broadway and create dynamic queues for retries and exponential backoff. It seems like SQS supports delayed queues but haven’t used them.

Oban is definitely an option (and an amazing tool thanks @sorentwo :heart:) but might be worth exploring what’s possible with your current setup.

1 Like