Consumer overload protection in the event bus

I have a simple event bus as a means of communicating between event producers and consumers. It’s job is basically to broadcast certain events from producers to consumers subscribed to those event types (which in essence is done by ConsumerPid ! Event).

The problem I’m facing is that some consumers sometimes get overloaded and their inbox grows uncontrollably. I’m aware this is a known issue but I can’t come up with a good solution on how to implement it. Ideally I’d like to do the overload protection in the event bus module so I don’t have to write every consumer with this in mind.

given

  • this is an erlang project so GenStage is not an option
  • most of the producers are push-based (like incoming udp packets, http api requests)

here are the options that I considered:

  1. using a bounded queue per consumer in the event so that consumers can pull events at their convenience. But this means that an event will be copied to a queue first, possibly to multiple queues (if there are multiple consumers for that event type) and then it will have to be copied to each consumer’s process instead of going directly to consumers. I’m not sure if this is a good idea performance-wise which I guess I can only find out after it is done.
  2. using gen_sever:call instead of cast'ing. The obvious downside is that producers will get blocked and each event will go through consumers one by one instead of going to all consumers almost simultaneously. The latter is probably fine because there is enough of parallel stuff already happening beside this so it’s not like the system would go idle. But this approach seems kind of unsophisticated and I’m not sure if the system will become unresponsive.
  3. checking consumer’s inbox queue length in the event bus (with erlang:process_info(Pid, message_queue_len)) and dropping events when it exceeds some threshold. The issue with this is that calling process_info on a process too often seems like a bad strategy because it apparently locks the process for which info is requested.
  4. implementing the overload protection in each consumer for example using an ets counter. Counter can be checked and incremented before putting a message in the process inbox and decremented from inside the process after the message is process. Parts of this might be implemented in some behaviour but I believe the counter decrementing code would have to be dealt with in every consumer which is not so great.

I feel I got stuck in analysis paralysis so I’d be grateful for any advice :slight_smile:

1 Like