Let’s say I have a FruitProducer
set up something like {:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: [:apple, :cherry, :grape] }}
, and I have three consumers: FruitConsumer
, FruitConsumerPieProducer
, and FruitConsumerSauceProducer
. I want the FruitConsumer
to subscribe to all 3 partitions, FruitConsumerPieProducer
to subscribe to the :apple
and :cherry
partitions, and the FruitConsumerSauceProducer
to subscribe to only :apple
. Is this possible or is this an incorrect use of the PartitionDispatcher
? The docs suggest that only a single partition can be subscribed to but this is not explicitly stated:
When subscribing to a GenStage
with a partition dispatcher the following option is required:
:partition
- the name of the partition. The partition must be one of the partitions specified in :partitions
above.
Looking at the source code, it looks like a given partition can only have one subscriber gen_stage/partition_dispatcher.ex at main · elixir-lang/gen_stage · GitHub
Extract
partition = Keyword.get(opts, :partition)
case partitions do
%{^partition => {nil, nil, demand_or_queue}} ->
partitions = Map.put(partitions, partition, {pid, ref, demand_or_queue})
references = Map.put(references, ref, partition)
{:ok, 0, {tag, hash, waiting, pending, partitions, references, infos}}
%{^partition => {pid, _, _}} ->
raise ArgumentError, "the partition #{partition} is already taken by #{inspect(pid)}"
_ when is_nil(partition) ->
raise ArgumentError,
"the :partition option is required when subscribing to a producer with partition dispatcher"
_ ->
keys = Map.keys(partitions)
raise ArgumentError, ":partition must be one of #{inspect(keys)}, got: #{partition}"
end
So for your situation to work I think you’d need to have multiple Fruit Producers. And to have a single consumer subscribe to multiple partitions it would need to use the subscribe_to
option to pass the same producer multiple times with a different partition value.
I’m pretty sure that’s not correct. I think you may have meant “a subscriber can only subscribe to one partition” which I think is true but not well documented. The code you extracted just shows that the :partition
keyword in the opts
keyword list will be used to retrieve the partition identifier the consumer is attempting to subscribe to. Looking at the docs again, this is what makes me wonder if you can pass a list instead of a single partition:
Examples
The partition function can be given either on init
’s subscribe_to:
{:consumer, :ok, subscribe_to: [{producer, partition: 0}]}
That suggests to me you could maybe have {:consumer, :ok, subscribe_to: [{producer, partition: 0}, {producer, partition: 1}]}
but that’s what I’m trying to clarify.
But what does this part show
%{^partition => {pid, _, _}} ->
raise ArgumentError, "the partition #{partition} is already taken by #{inspect(pid)}"
Ah, that does look like you are correct. What would be the reasoning behind that? Does this mean that to accomplish the flow I outlined in the OP you would have FruitProducer
with a PartitionDispatcher and then each partition would flow into a Producer-Consumer that is a Broadcast or Demand Dispatcher?
So something like:
FruitProducer ---+
+-ApplePartitionDispatcher ---+----------------------+
| |---PieProducer----+ |
| | | |
+-CherryPartitionDispatcher --+-----------------FoodConsumer
| |
+-GrapePartitionDispatcher -------------------------+
Not the most clear ascii graphic but I think it shows what I’m trying to express. If a given partition can only have a single subscriber, then in order to have multiple end processors of that partition the subscriber must be an intermediate stage, of which there can be multiple subscribers.
I can’t say why it was chosen to behave this way, but my best guess is because it would be more complex to implement a “sub dispatcher” within each partition. And they probably didn’t get enough of a reason to add that logic.
What you are saying looks like it would work for your purpose. Another way to go is to have multiple producers instead of a single producer with partitions. This would remove the need for an intermediary producer_consumer. But not sure if that works for your case.