GenStage PartitionDispatcher consumer subscribe to multiple partitions b/c no one wants a grape pie

Let’s say I have a FruitProducer set up something like {:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: [:apple, :cherry, :grape] }}, and I have three consumers: FruitConsumer, FruitConsumerPieProducer, and FruitConsumerSauceProducer. I want the FruitConsumer to subscribe to all 3 partitions, FruitConsumerPieProducer to subscribe to the :apple and :cherry partitions, and the FruitConsumerSauceProducer to subscribe to only :apple. Is this possible or is this an incorrect use of the PartitionDispatcher? The docs suggest that only a single partition can be subscribed to but this is not explicitly stated:

When subscribing to a GenStage with a partition dispatcher the following option is required:

  • :partition - the name of the partition. The partition must be one of the partitions specified in :partitions above.

Looking at the source code, it looks like a given partition can only have one subscriber gen_stage/partition_dispatcher.ex at main · elixir-lang/gen_stage · GitHub


partition = Keyword.get(opts, :partition)

case partitions do
  %{^partition => {nil, nil, demand_or_queue}} ->
    partitions = Map.put(partitions, partition, {pid, ref, demand_or_queue})
    references = Map.put(references, ref, partition)
    {:ok, 0, {tag, hash, waiting, pending, partitions, references, infos}}

  %{^partition => {pid, _, _}} ->
    raise ArgumentError, "the partition #{partition} is already taken by #{inspect(pid)}"

  _ when is_nil(partition) ->
    raise ArgumentError,
          "the :partition option is required when subscribing to a producer with partition dispatcher"

  _ ->
    keys = Map.keys(partitions)
    raise ArgumentError, ":partition must be one of #{inspect(keys)}, got: #{partition}"

So for your situation to work I think you’d need to have multiple Fruit Producers. And to have a single consumer subscribe to multiple partitions it would need to use the subscribe_to option to pass the same producer multiple times with a different partition value.

I’m pretty sure that’s not correct. I think you may have meant “a subscriber can only subscribe to one partition” which I think is true but not well documented. The code you extracted just shows that the :partition keyword in the opts keyword list will be used to retrieve the partition identifier the consumer is attempting to subscribe to. Looking at the docs again, this is what makes me wonder if you can pass a list instead of a single partition:

The partition function can be given either on init's subscribe_to:

{:consumer, :ok, subscribe_to: [{producer, partition: 0}]}

That suggests to me you could maybe have {:consumer, :ok, subscribe_to: [{producer, partition: 0}, {producer, partition: 1}]} but that’s what I’m trying to clarify.

But what does this part show

%{^partition => {pid, _, _}} ->
        raise ArgumentError, "the partition #{partition} is already taken by #{inspect(pid)}"

Ah, that does look like you are correct. What would be the reasoning behind that? Does this mean that to accomplish the flow I outlined in the OP you would have FruitProducer with a PartitionDispatcher and then each partition would flow into a Producer-Consumer that is a Broadcast or Demand Dispatcher?
So something like:

FruitProducer ---+
                 +-ApplePartitionDispatcher ---+----------------------+
                 |                             |---PieProducer----+   |
                 |                             |                  |   |
                 +-CherryPartitionDispatcher --+-----------------FoodConsumer
                 |                                                   |
                 +-GrapePartitionDispatcher -------------------------+

Not the most clear ascii graphic but I think it shows what I’m trying to express. If a given partition can only have a single subscriber, then in order to have multiple end processors of that partition the subscriber must be an intermediate stage, of which there can be multiple subscribers.

I can’t say why it was chosen to behave this way, but my best guess is because it would be more complex to implement a “sub dispatcher” within each partition. And they probably didn’t get enough of a reason to add that logic.

What you are saying looks like it would work for your purpose. Another way to go is to have multiple producers instead of a single producer with partitions. This would remove the need for an intermediary producer_consumer. But not sure if that works for your case.