Throttling+Distinct GenStage Help!

I have a router where I get input from various protocols and output to various other protocols (network protocols such as SMPP, XMPP etc…). Some of the sinks require that I send at a specific rate which may vary for each sink.

Additionally I have many producers and and some of them may send very large amounts of messages. I don’t want a single producer to hog any of the sinks and allow some sort of fairness to other producers. In the past I was doing store and forward and doing a distinct call every now and then to get small bundles of messages from each producer.

So here is my question how can I accomplish this with GenStage? What components that already exist can help me accomplish this?

NOTE: I am looking for an at least once type system.

GenStage manages the demand of every producer independently. This means it is not for you to throttle based on the producer using GenStage defaults. However, one option is to use the handle_subscribe callback and return :manual so you can manually manage the demand for some of your producers. More info here: gen_stage/lib/gen_stage.ex at main · elixir-lang/gen_stage · GitHub