Ensures all consumers receive events

Suppose we have a project, that uses ecto with a connection pool of 20 connections.

Now the system produces events, however the production process might take long time, and is a database query that uses a connection. I have 2 problems that I need to solve: rate limiting and congestion.

The rate limiting can be solved easily with broadway/genstage.

Now the second problem is related to the fact that all consumers should receive events (they all receive different types of events). The problem appears when a producer spams a lot of events for a single consumer type, while they get processed for that consumer, others don’t receive anything.

A naive approach would be to create a queue with different lists for each consumer and round robbin from that list. Is there a different, maybe more official way to do this?

If you want gauranteed at least once delivery you might be in the market for a proper message queue such as RabbitMQ or Kafka or .

1 Like

A naive approach would be to create a queue with different lists for each consumer and round robbin from that list. Is there a different, maybe more official way to do this?

I think a GenStage producer using the demand dispatcher (the default) does what you want.

If all your consumers are using the same values for max_demand and min_demand then it should distribute the events fairly across consumers

  • GenStage.DemandDispatcher - dispatches the given batch of events to the consumer with the biggest demand in a FIFO ordering. This is the default dispatcher.

You just have to be careful picking the values of max_demand and min_demand to ensure you’re not exhausting your DB pool. Keep in mind this is the behaviour of those parameters:

When implementing consumers, we often set the :max_demand and :min_demand on subscription. The :max_demand specifies the maximum amount of events that must be in flow while the :min_demand specifies the minimum threshold to trigger for more demand. For example, if :max_demand is 1000 and :min_demand is 750, the consumer will ask for 1000 events initially and ask for more only after it processes at least 250.

2 Likes

Ok, this seems a step in the right direction, what I can’t understand is how do I limit the connection pool this way. In my understanding this should be implemented in the following way: create a DemandDispatcher and for each type of message a consumer.

Now we will have some nasty problems:
If each consumer uses a database connection, we can limit the total numbers of connections only by the numbers of consumers. This is also very bad because consumer will always be limited to 1 connection, taking long time to process when only one type of events are required.

In my understanding, no genstage mechanism can solve the problem of congestion. The solution I think would be suited for this case is a 2 stage processing pipeline.

A custom producer will produce events in correct order and send them via something like rabbitmq (to guarantee delivery).

The second stage will be a broadway pipeline with number of processors equal to maximum pool of connections available, that will process all the events and ensure the limit is not hit on connection pool.