zampino
Selective Broadcasts
This proposal probably arises from my immature experience with GenStage,
but what if, at subscribe time a consumer could specify a selector i.e. a function
(event :: any -> boolean) which would filter events being sent to itself. Somewhat like:
GenStage.sync_subscribe(consumer,
to: producer,
selector: fn %{key: key} -> key =~ "foo" end
)
Now when producer emits an event say %{key: key}, this will be dispatched to consumer
only if "foo" matches key.
This is probably a similar approach to the partition dispatcher with extra control over the hashing function. Could I state this so?
I have a very naïve proof of concept in the form of a private pull request here.
where I also modified the gen_event example:
What would the community think of this feature?
Is there already a way to achieve this with no need of extra work?
Most Liked
benwilson512
This seems to be essentially what Partition Dispatcher does: https://hexdocs.pm/gen_stage/Experimental.GenStage.PartitionDispatcher.html#content
zampino
…to give some further motivations,
we’re working with a dynamic population of stages which all subscribe to some producer
with broadcast dispatch.
Consumer stages gets created and updated continuously based on a stream of user events.
Each node might observe some other for changes derived from receiving
events from the source.
This is the reason why I’d need to specify which events I might be interested in, assuming that such events, carry the identifier of the node to which they’re primarily addressed.
At this point a question arises, what if I’d need to update the subscription options
(not necessarily the :selector, but in general)
at present dispatchers save they’re consumers (demands) in a list:
and not in a map (by ref), what is the intention behind this behaviour, maybe @josevalim?
Can I update subscription options without canceling and subscribing again?
If I’m not wrong, if I subscribe twice to a producer, I will receive events twice, is it correct?
zampino
I agree, especially in the demand auto-adjustment,
that could cause problems.
If I wish to change the selector then,
one could first re-subscribe with a new selector
and subsequently cancel the old ref, in this case a consumer
might get at most double messages
until the dispatcher cancels the old demand.
Would you suggest to do so?
Better guarding against duplicates in the stage,
than possibly losing events by first canceling and then re-subscribing right?







