I want to process messages from an AWS SQS queue. I want to limit the number of concurrently processed messages to N.
I’m looking into solving this with
GenStage. I’ve set up a
min_demand of 1 and
max_demand of N. My producer will fetch the messages from the SQS. Once things are running, the demand will generally be 1 which is very simple to implement. But I’m wondering what’s the easiest way to handle the initial demand where it can go way up to N. It may also be the case that the queue has less than N items when asking for the initial demand.
I know there’s the Broadcaster example, but I’m looking for something simpler.
handle_demand until it’s able to fetch D messages seems like a bad idea, because if the queue has C < D messages and no new ones are coming, the first C messages will only be processed when in total D messages will appear in the queue.
On the other hand, if
handle_demand returns an empty list of events, the consumer will stop sending the demand.
So my idea is to do the following:
- if SQS has no messages, make
handle_demanda blocking call and poll for messages
- if SQS has less messages than the demand, return those as events and keep track of the unfulfilled demand
- next time the
handle_demandis called, ask for up to demand + unfulfilled messages and return those
This means that I would sometimes return more events than demanded. It seems to work. But does it follow the GenStage protocol?