GenStage safely and automatically demands more events

Hi guys,

We want to set up a pipeline like like

S3/SQS -> [Producer] -> [Processor] -> [Consumer] -> Database

We want to setup in a way that when some one place a file in S3, it will notify SQS.

[Producer] will read from SQS, get object_key then read the actual content in S3.

Our problem is that, when we first start the stack, everything runs correctly.

[Consumer] demands [Processor], [Processor] demands [Producer], [Producer] check SQS then read from S3, for example 10k lines. Because the max_demand is 1k, it takes a while to actually exhaust the whole content from S3.

Our problem is that after first run, [Consumer] doesn’t seem to demand any more. We placed a new file into S3, we got notification into SQS. But since [Consumer] doesn’t demand anymore, [Producer] doesn’t go and take a look into SQS so nothing enters the system.

Do we implement and understand GenStage incorrectly?

We read about buffering events and buffering demand but cannot figure out how it fits into our use case.

Is there a way to keep asking for more data safely?

We don’t want to go over producer’s buffer_size, i.e. if we have 10 files in the SQS, we don’t want to read them all in but read 1, wait until it is safe to read the next one, i.e. in case database is down we don’t want to keep content of 10 files in memory.


1 Like

I think you need to buffer demand until consumers are ready to consume more events, and you need to keep the :min_demand very low in order to only ask for few (or one) files at a time.

So, the producer would start and not ready any files out of SQS. When the consumer starts and is ready to consume a file, it will send the initial demand. If you want, you can set this very low (for example, max_demand: 2) so that initially the consumer only asks for two events. In handle_demand/2 in the producer, you can then fetch as many files as demand asks: the first time this will be 2 but then it will be mostly 1. This will ensure that as soon as the consumer processes the first file, it will ask for one more file and the producer can fetch the new file while the consumer is processing the second file. This way, you can take advantage of concurrency because producer and consumer are working asynchronously. After the first two files, the producer will never fetch more than two files in memory anyways so you achieve the “safety” you mentioned.

Does this make things clearer? Ask away if it doesn’t, I am no GenStage guru (yet!). :slight_smile:

1 Like

I think yesterday we tried with min_demand = 0, I think the demand = max_demand - min_demand…

I can try a bit more with those parameters, but to have the context.

The issue is the files are managed by different people, and they can drop the files in any time they want. And we want to process the data, when 1. there is new data 2. and without dropping any data

A few solutions we can thing of are

  1. Is there any configuration that [Consumer] keep asking for events indefinitely?

  2. Or is there any way [Producer] can know the current buffer size so that it will only poll for new data when