Can a GenStage have multiple subscriptions? I’m pretty sure its possible but wanted to double check. The only hint in docs is ‘subscribe_to:’ accepts a list.
defmodule StageB do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, data)
end
@impl true
def init(data) do
{:producer_consumer, data, subscribe_to: [{producer, partition: data["id"]}, {stageA, options}]}
end
@impl true
def handle_events(events, _from, data) do
events =
for event <- events do
#something
end
{:noreply, events, data}
end
end
Motivation:
With a simple pipeline Producer → A → B, I want to be able to skip A for certain types of events. Producer will use a partition dispatcher to send events directly to each stage if condition is met in producer logic.
Additional Edit:
GenStage.BroadcastDispatcher :selector functions could also be a solution to skip A. Not sure what the pros/cons are yet.
A follow up question: If there are 2 concurrent instances of stageA, would the producer act like a BroadcastDispatcher to that partition? For my use case I would not want events to be duplicated.
Yes, a GenStage :consumer
or :producer_consumer
can be subscribed to multiple producers. How producers deliver events to those consumers depends on the dispatcher, as you hinted in your question.
I can’t speak for your particular pipeline architecture, so if you have a particular use case in mind, we can try to work through it
As for the follow-up question: depends on the dispatcher. If you use a PartitionDispatcher
, then it acts like a DemandDispatcher
across consumers subscribed to each partition. I also opened a PR in GenStage to mention this in the docs for the PartitionDispatcher
.
1 Like
Amazing thanks for that answer! Excatly what I was looking for
“When multiple consumers subscribe to one partition, the producer
behaves like a GenStage.DemandDispatcher
within that partition.”
Loving GenStage more and more as I learn about it. For fun I’m seeing how valid GenStage could be for use in a ‘low code’ / SOAR gui solution. Something like Apache Airflow maybe. The rough idea is GenStage consume events with some sort of data payload and then run custom GenStage actions.