Hello, I have a demand on my app where I need to process multiple pipelines that depend on the results of other pipelines.
To do that, I was thinking of using GenStage, I created a sample project that you can get here: https://github.com/sezaru/test-genserver
producer.ex
is my producer, some other service will send messages to run;pc_a.ex
,pc_b.ex
,pc_c.ex
,pc_d.ex
are producers_consumers;c_a.ex
,c_b.ex
,c_c.ex
,c_d.ex
are consumers;
If you run it and send a message to the producer, for example Producer.sync_notify 20
the whole pipeline will run.
- Sending some value to the Producer will trigger
PcA
andPcB
to receive the value; PcA
has it’s own consumerCA
andPcB
haveCB
;PcC
receives bothPcA
andPcB
results and process them in pairs;PcC
has it’s own consumerCC
;PcD
receives bothPcB
andPcC
results and process them in pairs;PcD
has it’s own consumerCD
.
Which gives me something like this diagram:
Each arrow shows the subscription route.
As you can see, in my pipeline I need to send the result from a producer or consumer_producer to many consumers or consumer_producers, so I use GenStage.BroadcastDispatcher
for that.
Also, some consumer_producers may need to receive the response of more than one consumer_producer, so I need some way to sync their results and make sure I’m processing then in the correct pairs, for that I have an implementation using the state of PcC
and PcD
to keep the sync correct.
This seems to work great for my case and I can’t see any obvious issue with that approach.
But I’m not sure if this is the best way to do it and I’m open to some other suggestion and concerns.
Thanks!