Selective Broadcasts

This proposal probably arises from my immature experience with GenStage,
but what if, at subscribe time a consumer could specify a selector i.e. a function
(event :: any -> boolean) which would filter events being sent to itself. Somewhat like:

GenStage.sync_subscribe(consumer, 
  to: producer, 
  selector: fn %{key: key} -> key =~ "foo" end
)

Now when producer emits an event say %{key: key}, this will be dispatched to consumer
only if "foo" matches key.
This is probably a similar approach to the partition dispatcher with extra control over the hashing function. Could I state this so?

I have a very naĆÆve proof of concept in the form of a private pull request here.

where I also modified the gen_event example:

What would the community think of this feature?
Is there already a way to achieve this with no need of extra work?

1 Like

This seems to be essentially what Partition Dispatcher does: https://hexdocs.pm/gen_stage/Experimental.GenStage.PartitionDispatcher.html#content

2 Likes

The partition dispatcher has formats specified upfront and it does not allow sharing. For example, I canā€™t have three consumers where one consumers A, B, D, the other A, B, C and the other B, C and D.

@zampino can you please send a pull request? Your broadcaster code looks good, the only changes I would do are:

  1. Document the selector option
  2. Instead of Keyword.get(opts, :selector, fn _event -> true end), the selector should be nil if there is no selector. This way we donā€™t need to call Enum.filter(..., selector) if there is no selector, just return the list as is
  3. Do not change the examples folder for now
  4. Tests. Unit tests for broadcast_dispatcher_test.exs are fine :slight_smile:
1 Like

Hi Ben,

thanks for the reply,
indeed thatā€™s almost it.

But as far as I understand, using PartitionDispatcher
one needs to specify upfront the hashing range and the hash function,
hence the number and the shape of consumers must be known prior to starting the producer.

My approach above would allow subscribers to decide which events they are
interested in, somewhat dynamically.

In addition with the selector function I describe above there could be multiple consumers recipient of the same message, if I am not wrongā€¦

1 Like

Hi @josevalim,

thanks for the feedback,
Iā€™ll be glad to submit a PR with your suggested changes,

cheers,
Andrea

1 Like

ā€¦to give some further motivations,
weā€™re working with a dynamic population of stages which all subscribe to some producer
with broadcast dispatch.
Consumer stages gets created and updated continuously based on a stream of user events.

Each node might observe some other for changes derived from receiving
events from the source.
This is the reason why Iā€™d need to specify which events I might be interested in, assuming that such events, carry the identifier of the node to which theyā€™re primarily addressed.


At this point a question arises, what if Iā€™d need to update the subscription options
(not necessarily the :selector, but in general)
at present dispatchers save theyā€™re consumers (demands) in a list:

https://github.com/elixir-lang/gen_stage/blob/accec040079e93cbc1b9a57f0d22a365bcb4771e/lib/gen_stage/broadcast_dispatcher.ex#L82-L84

and not in a map (by ref), what is the intention behind this behaviour, maybe @josevalim?
Can I update subscription options without canceling and subscribing again?
If Iā€™m not wrong, if I subscribe twice to a producer, I will receive events twice, is it correct?

2 Likes

You cannot update subscriptions. I can see it becoming really complex given
certain behaviors: what if you update to a different max/min demand? What
if you try to change a partition?

1 Like

I agree, especially in the demand auto-adjustment,
that could cause problems.

If I wish to change the selector then,
one could first re-subscribe with a new selector
and subsequently cancel the old ref, in this case a consumer
might get at most double messages
until the dispatcher cancels the old demand.

Would you suggest to do so?

Better guarding against duplicates in the stage,
than possibly losing events by first canceling and then re-subscribing right?

2 Likes

What we could do is to support async_resubscribe and sync_resubscribe. The idea is that it cancels a previous subscription and starts a new one atomically. This way you wonā€™t receive duplicated messages nor lose messages and the semantics make it clear it is a new subscription altogether. What do you think? If that fits your use case, please open up an issue and I can tackle it this week.

great, Iā€™ll open an issue for resubscriptions
(the name itself sounds terrific :slight_smile:

ah, and that wonā€™t block the :selector PR #112 right?

thanks again,
A

No, it wonā€™t. :slight_smile: