What's the easiest way to handle the initial demand in GenStage producer?

I want to process messages from an AWS SQS queue. I want to limit the number of concurrently processed messages to N.

I’m looking into solving this with GenStage. I’ve set up a ConsumerSupervisor with min_demand of 1 and max_demand of N. My producer will fetch the messages from the SQS. Once things are running, the demand will generally be 1 which is very simple to implement. But I’m wondering what’s the easiest way to handle the initial demand where it can go way up to N. It may also be the case that the queue has less than N items when asking for the initial demand.

I know there’s the Broadcaster example, but I’m looking for something simpler.

Blocking handle_demand until it’s able to fetch D messages seems like a bad idea, because if the queue has C < D messages and no new ones are coming, the first C messages will only be processed when in total D messages will appear in the queue.

On the other hand, if handle_demand returns an empty list of events, the consumer will stop sending the demand.

So my idea is to do the following:

  • if SQS has no messages, make handle_demand a blocking call and poll for messages
  • if SQS has less messages than the demand, return those as events and keep track of the unfulfilled demand
  • next time the handle_demand is called, ask for up to demand + unfulfilled messages and return those

This means that I would sometimes return more events than demanded. It seems to work. But does it follow the GenStage protocol?

In GenStage the consumer should be generally unaware of any of the issues you mentioned. It just does its work and send demand when it sees fit.

Fulfilling demand is the responsibility of the producer alone and it can do that in whatever form it sees fit to fulfill the demand it received. The important bit it, that the producer can fulfill demand not only when the handle_demand callback is called, but also from all other callbacks of the underlying genserver, like e.g. handle_info. So if your handle_demand is called and SQS does not have enough messages you can still return the messages you currently got, store the excess demand in the genserver state and e.g. use Process.send_after to initiate polling for new messages. Then the callback handling that polling can return events as they trickle in.

1 Like

Thanks! I was guessing that it works that way, but I haven’t seen this written explicitly in the docs (or maybe I missed it…).