This proposal probably arises from my immature experience with GenStage,
but what if, at subscribe time a consumer could specify a selector i.e. a function (event :: any -> boolean) which would filter events being sent to itself. Somewhat like:
GenStage.sync_subscribe(consumer,
to: producer,
selector: fn %{key: key} -> key =~ "foo" end
)
Now when producer emits an event say %{key: key}, this will be dispatched to consumer
only if "foo" matches key.
This is probably a similar approach to the partition dispatcher with extra control over the hashing function. Could I state this so?
I have a very naĆÆve proof of concept in the form of a private pull request here.
where I also modified the gen_event example:
What would the community think of this feature?
Is there already a way to achieve this with no need of extra work?
The partition dispatcher has formats specified upfront and it does not allow sharing. For example, I canāt have three consumers where one consumers A, B, D, the other A, B, C and the other B, C and D.
@zampino can you please send a pull request? Your broadcaster code looks good, the only changes I would do are:
Document the selector option
Instead of Keyword.get(opts, :selector, fn _event -> true end), the selector should be nil if there is no selector. This way we donāt need to call Enum.filter(..., selector) if there is no selector, just return the list as is
Do not change the examples folder for now
Tests. Unit tests for broadcast_dispatcher_test.exs are fine
But as far as I understand, using PartitionDispatcher
one needs to specify upfront the hashing range and the hash function,
hence the number and the shape of consumers must be known prior to starting the producer.
My approach above would allow subscribers to decide which events they are
interested in, somewhat dynamically.
In addition with the selector function I describe above there could be multiple consumers recipient of the same message, if I am not wrongā¦
ā¦to give some further motivations,
weāre working with a dynamic population of stages which all subscribe to some producer
with broadcast dispatch.
Consumer stages gets created and updated continuously based on a stream of user events.
Each node might observe some other for changes derived from receiving
events from the source.
This is the reason why Iād need to specify which events I might be interested in, assuming that such events, carry the identifier of the node to which theyāre primarily addressed.
At this point a question arises, what if Iād need to update the subscription options
(not necessarily the :selector, but in general)
at present dispatchers save theyāre consumers (demands) in a list:
and not in a map (by ref), what is the intention behind this behaviour, maybe @josevalim?
Can I update subscription options without canceling and subscribing again?
If Iām not wrong, if I subscribe twice to a producer, I will receive events twice, is it correct?
You cannot update subscriptions. I can see it becoming really complex given
certain behaviors: what if you update to a different max/min demand? What
if you try to change a partition?
I agree, especially in the demand auto-adjustment,
that could cause problems.
If I wish to change the selector then,
one could first re-subscribe with a new selector
and subsequently cancel the old ref, in this case a consumer
might get at most double messages
until the dispatcher cancels the old demand.
Would you suggest to do so?
Better guarding against duplicates in the stage,
than possibly losing events by first canceling and then re-subscribing right?
What we could do is to support async_resubscribe and sync_resubscribe. The idea is that it cancels a previous subscription and starts a new one atomically. This way you wonāt receive duplicated messages nor lose messages and the semantics make it clear it is a new subscription altogether. What do you think? If that fits your use case, please open up an issue and I can tackle it this week.