Debouncing and aggregating events across cluster nodes

I have a system that emits events, such as:

"record_updated", id: 6, company_id: 1
"record_updated", id: 2, company_id: 2
"record_updated", id: 4, company_id: 1

These events are currently dispatched through Phoenix PubSub, from nodes in an Elixir cluster, and either through Channels or GraphQL subscriptions (Absinthe) get delivered to clients who react appropriately to these updates.

The problem is the amount of traffic, and number of updates.

I would like to debounce these events and group by company_id and only emit them when the updates stop or a configured time period is reached.

So, if the events happen constantly, we would emit one event per company_id, having a list of IDs, rather than single ID in their payload, once every 10s or so.

If there was just one event, I think I’d like to wait like a second and emit it to the client as soon as possible.

For example, given the above 3 events, I would like to transform them and emit just two:

"records_updated", ids: [2], company_id: 2
"records_updated", ids: [6, 4], company_id: 3

Any hints on libraries/tools I could use to simplify this task? Should I be looking at GenStage/Broadway and implement this from scratch or there are some tools out there that do something similar to my requirements that you can recommend?

1 Like

GenStage would probably work great for this use-case. GenStage — gen_stage v1.2.1

Broadway is built on top of GenStage, and could simplify a few things for you, but I think it definitely helps to have an understanding of the underlying concepts. Introduction — Broadway v1.1.0

1 Like

Thanks. I have used GenStage (lightly) in the past and I can see it working, been looking at Broadway just now. In both cases I think these pipelines are operating within single node of the cluster, or you can “fan out” events from the source/queue to multiple parallel pipelines but one thing I cannot seem to figure out is how to have a single pipeline for cluster, which would make more sense in my use case. I think it may not be a good fit in the end.

I rolled my own GenServer for something that sounds at least 90% the same of what you need about a year ago and it worked perfectly, no library needed. To be more accurate, it was one GenServer per something (can’t remember what) and they were properly supervised. It was working in production, we had something like ~200 of them working in parallel and I never noticed a single failure (but then again the main app node never fell down either).

Its only weakness was that it was accumulating state in memory and never persisted it before dumping it after the configured period of inactivity or maximum time to hold it in. I found that tradeoff good but maybe you would not?

In any case, I can copy-paste the code here later today after I edit it a little bit (and I really should not be on ElixirForum right not but oh well, you know about the revenge procrastination, right?) and you can tell me if it works well for your purposes.

I don’t think I want to use GenServers and buffer these events in memory. There’s several things i dislike about this approach: from the need to cluster process registration and sort of reconciliation to the fact that this is sort of wrong usage pattern as we’re using processes, which are runtime entities, to partition data, and all the ineffectiveness that this brings.

I am thinking now I’ll go with a PostgreSQL or Redis-based solution where I will in atomic way write the IDs of records that need to be broadcasted to clients as being updated, then have a recurring jobs on each node that will also in atomic way fetch and nullify the IDs and broadcast them to clients over PubSub.

1 Like

I see, I suspected you need something more scalable.

That’s precisely what I need, but I strongly suspect that GenServer-per-company is actually quite wasteful in terms of both: memory and CPU power.

Maybe there’s something to learn from how Nx.Serving deals with batch size and batch timeout:

https://hexdocs.pm/nx/Nx.Serving.html#module-stateful-process-workflow

But yeah, it looks like something Broadway could help: Broadway — Broadway v1.1.0

1 Like