I am designing a GenStage pipeline using a FIFO queue from Amazon SQS.
I just want to say right up front that unfortunately I can’t use Broadway SQS based on my use case.
I have a
- producer (receiving messages from SQS)
- producer_consumer (to normalize incoming messages)
- consumer (want to perform operations for a group of messages)
My end goal is to
- take incoming messages
- group them by an attribute on that message (Message Group Id), while maintaining original message order
- process messages in batches/groups
In my producer consumer, I’d like to dynamically spin up a new consumer process for each batch of messages (events).
For example, I have 10 messages received.
- 7 messages have message group ID “a”
- 3 messages have message group ID “b”
After initial processing in my producer_consumer, I would like to spin up 2 (n) consumer processes, one for processing messages w/ group “a” and another process for spinning up messages with group “b”.
I was looking at making my consumer use ConsumerSupervisor, but this spins up a new consumer process per event.
I’m also considering the issue where I receive 7 messages from group “a” and then receive 3 more messages from group “a” (SQS only allows 10 messages in flight for a given Message Group ID). I need to make sure they are processed in the original order they were received. (Processing these in different consumers opens up this potential race condition)
My current idea is to do some sort of check in my producer consumer to see if a consumer process exists for message group “a”
- if it does, push new messages (genstage events) to this consumer process to be processed next
- otherwise create a new consumer process for group “a” and process all incoming messages for a certain window.
This idea feels like I’m going to far away from what was intended with GenStage.
Any ideas to maximize concurrency in my situation while keeping true to GenStage pattern?