Consuming multiple APIs with GenStage

I am having a little trouble planning out my first GenStage api consuming thingy :slight_smile: For every event that producer gets I want to call 3 external APIs, 2 immediately and last one based on results from “External API #2” in picture below:

Am I trying to use single GenStage producer for too many things? Should I create new producer that gets notified with new data from “External API #2” consumers/workers ?

++ What is the best way to deal with results from consumers? I need to update DB and Users GenServer cache(green box on left), should this be done from consumers? I am assuming producer (DataController in diagram above) and producer_consumer in between shouldn’t care about results at all, correct?

But in this great video about GenStage he is sending results from consumers back to producer.

What about dealing with failure ? If let’s say API goes down, or credits run out… Should every consumer(worker) have to run into it and pause sending demand after few retries?

A single process with 2 external HTTP calls (1 and 2 are done in parallel, while 3 might happen or not happen) definitely looks like a possible bottleneck. I’m not an expert by any means but would consider to adding a producer-consumer layer for that last possible call.
Regarding dealing with failure take a look at “Buffering demand” section at https://hexdocs.pm/gen_stage/Experimental.GenStage.html, however, as it was said in the talk the actual recovery scenario is tailored to the specific situation: I imagine there should be some kind of checkpoints that you can rely on to check if job was complete.

1 Like

One of the reasons we use GenStage is to leverage concurrency. The questions you need to ask yourself is: how in this schema can I leverage concurrency and will it be enough to leave my machine considerably busy?

If events can be processed out of order, all you need to do is to have a single producer which receive events and immediately dispatches each event to N consumers where N is the number of cores. If they are mostly doing IO, you can likely increase N to a bigger value (but make sure you won’t make your external API providers angry by doing more requests than they can handle). If this approach is enough, then each consumer for each event will contact APIs 1, 2 and 3 directly. No need to make each external API a separate process with different consumers each.

If the above is true, I would likely implement this by having a Producer -> DynamicSupervisor, where the DynamicSupervisor will start a new process per event and have at most max_demand children. This way you also get the supervision guarantees and a failure in processing any of the events won’t bring the current stage down.

5 Likes