Can a GenStage have multiple subscriptions?

Can a GenStage have multiple subscriptions? I’m pretty sure its possible but wanted to double check. The only hint in docs is ‘subscribe_to:’ accepts a list.

defmodule StageB do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, data)

  @impl true
  def init(data) do
    {:producer_consumer, data, subscribe_to: [{producer, partition: data["id"]}, {stageA, options}]}

  @impl true
  def handle_events(events, _from, data) do
    events =
      for event <- events do
    {:noreply, events, data}

With a simple pipeline Producer → A → B, I want to be able to skip A for certain types of events. Producer will use a partition dispatcher to send events directly to each stage if condition is met in producer logic.

Additional Edit:
GenStage.BroadcastDispatcher :selector functions could also be a solution to skip A. Not sure what the pros/cons are yet.

A follow up question: If there are 2 concurrent instances of stageA, would the producer act like a BroadcastDispatcher to that partition? For my use case I would not want events to be duplicated.

Yes, a GenStage :consumer or :producer_consumer can be subscribed to multiple producers. How producers deliver events to those consumers depends on the dispatcher, as you hinted in your question.

I can’t speak for your particular pipeline architecture, so if you have a particular use case in mind, we can try to work through it :wink:

As for the follow-up question: depends on the dispatcher. If you use a PartitionDispatcher, then it acts like a DemandDispatcher across consumers subscribed to each partition. I also opened a PR in GenStage to mention this in the docs for the PartitionDispatcher.

1 Like

Amazing thanks for that answer! Excatly what I was looking for :slight_smile:

“When multiple consumers subscribe to one partition, the producer
behaves like a GenStage.DemandDispatcher within that partition.”

Loving GenStage more and more as I learn about it. For fun I’m seeing how valid GenStage could be for use in a ‘low code’ / SOAR gui solution. Something like Apache Airflow maybe. The rough idea is GenStage consume events with some sort of data payload and then run custom GenStage actions.