Hello, I have a demand on my app where I need to process multiple pipelines that depend on the results of other pipelines.
To do that, I was thinking of using GenStage, I created a sample project that you can get here: https://github.com/sezaru/test-genserver
producer.exis my producer, some other service will send messages to run;pc_a.ex,pc_b.ex,pc_c.ex,pc_d.exare producers_consumers;c_a.ex,c_b.ex,c_c.ex,c_d.exare consumers;
If you run it and send a message to the producer, for example Producer.sync_notify 20 the whole pipeline will run.
- Sending some value to the Producer will trigger
PcAandPcBto receive the value; PcAhas it’s own consumerCAandPcBhaveCB;PcCreceives bothPcAandPcBresults and process them in pairs;PcChas it’s own consumerCC;PcDreceives bothPcBandPcCresults and process them in pairs;PcDhas it’s own consumerCD.
Which gives me something like this diagram:
Each arrow shows the subscription route.
As you can see, in my pipeline I need to send the result from a producer or consumer_producer to many consumers or consumer_producers, so I use GenStage.BroadcastDispatcher for that.
Also, some consumer_producers may need to receive the response of more than one consumer_producer, so I need some way to sync their results and make sure I’m processing then in the correct pairs, for that I have an implementation using the state of PcC and PcD to keep the sync correct.
This seems to work great for my case and I can’t see any obvious issue with that approach.
But I’m not sure if this is the best way to do it and I’m open to some other suggestion and concerns.
Thanks!























