Flow Based Pipeline and different scenarios

Hello there!
I want to realize flow based algorithm(or pipeline/conveyor) to processing my files in system. So:

  • Can i to make separated pipelines for different scenarios?
  • Or i need to have only one pipeline and more flexible functional of each step?

Which way is better? Are there some kind of tutorials?

You can have multiple pipelines for different scenarios, but you will probably need to use GenStage directly in such case to use GenStage.BroadcastDispatcher.

1 Like

In this way it seems u mean to handle different scenarios in separated consumers, right?

And can a producer have several different types of consumers? If so, what should I read to implement this?

Yes, producer can have multiple different consumers, but that will not make much sense with default dispatcher (as it will try to load balance/randomise the consumer that the event will be sent). So as I said, you can use broadcast dispatcher and then all consumers subscribed to the producer will receive copy of the same message.

Another solution is to wrap your data in some struct and add a field that you use to disambiguate with. Then you can define multiple clauses of your functions, each to handle its own type of data. This lets you have “multiple consumers”. For example you can have a struct called Container with a type field:

[%Container{type: "potato", data: some_data}, %Container{type: "carrot", data: some_other_data}]
Flow.from_enumerable()
Flow.map(&process_food/1)
Flow.run()

...

def process_food(%Container{type: "potato", data: data}) do
   ...
end

def process_food(%Container{type: "carrot", data: data}) do
  ...
end

This is the pattern suggested in https://www.youtube.com/watch?v=b342YwOzIjI

It can also be extended to do smart error handling etc.