I have to imagine that the answer I’m going to end up with is “you need a different approach”, but I figured I’d ask as a last-ditch before I gave up on something I was working on.
I’m working to optimize an event processor at work. We get a large volume of events from RabbitMQ, which are divvied up into worker processes for processing. These worker processes query Elasticsearch (among other things) to collect the requisite data to complete processing. One problem we ran into is that Elastic’s search queue gets overwhelmed during times of heavy load; the details aren’t important, but the TL;DR is that in Elastic will only allow a certain number of unprocessed queries to build up before it starts rate limiting clients. As a mitigation, I set up a GenServer which would collect query requests from each worker process, and on a fixed interval (or once a high-water mark was reached), perform a single bulk query with Elastic. I then post-process this bulk response and distribute the query results to the requesting processes (using a simple
While this strategy worked for mitigating the rate limits I mentioned before, the records we pull from Elastic are sizable chunks of JSON (topping out around 1M, I’d say), so copying the data back to the requesting processes is a non-trivial endeavor. Unfortunately, this produces an unacceptable amount of latency in our event processing.
Herein lies the question: is there a way I can somehow share these chunks of data with the “querying” processes? If I were using a lower level language, I would ideally just be passing pointers to the existing data around, but that’s not the world of BEAM
One idea I had was to abuse the fact that Erlang will pass references to large binary blobs, but I don’t think this will be fruitful, and the process of encoding the “split up” result from the bulk query to JSON, sending it, then decoding it, is just a copy operation with extra steps. Writing the data to ETS is also likely not a solution, for similar reasons.
Have you tried scaling out your ES cluster by adding more nodes and sharding and/or replicating the index across them?
:jobs. They invert the flow of “checking out” stuff from a pool which means that your querying processes will work directly with the 1M JSON object and will not copy it. This does not come completely for free and I recommend reading the
NimblePool intro that I linked, it’s excellently explained and demonstrated.
:jobs does something similar with some more features, though it also offers automatic limiting of requests if you need it (I think you don’t, you seem to have a fixed amount of Elastic and Rabbit workers).
NimblePool seems like a really interesting option, but I’m not clear how that would work out here. It seems to me that this will serialize access to a resource (much like a GenServer), and will provide access to the responses within a callback, thereby avoiding the copying. However, it seems to me that there would be no way to “fan out” the response from NimblePool, right? In other words, if I have 250 workers performing queries, they will be forced to operate on the responses one by one, eliminating concurrency (I wouldn’t be able to move the resource back to the worker process without copying). Am I misunderstanding this?
It’s something we’ve talked about. We’ve been looking to optimize the code first before scaling our infra.
Not one by one, no.
You can have 250 workers waiting their turn to use e.g. 20 connections to Elastic. It helps with that.
Ah, I see. This would avoid the need for the “batching” that I’m doing. I’m not sure how that would work out at first blush but let me give it some thought. Thank you!
Yeah it’s not completely for free, you have to do a thing or two manually, but the selling point of both options I gave you is basically this: they don’t copy stuff.
Interested to hear about the solution you’ll settle on btw, hope you post it in the future.
If it ends up being anything less than a rearchitecture, I’d be happy to
Instead of copying the data to the processors, could the processors send functions to the data holders?
Heh, so yes, they can (and do; I actually use a callback mechanism to introduce the
send calls). However, dispatching that data to be processed non-serially would require copying, no? It’s kind of a catch-22; either I process them serially and take that performance hit, or I take the performance hit of copying everything and then run my processing concurrently. One could avoid copying by just doing something like
Enum.each(data, callback), but that would be run serially. If I used something like
Task.async_stream/2, the data would still be copied when spawning those procs.
Are you sure that such a size is an actual issue here? I would tentatively expect the binary format of Erlang terms to be more compact that JSON (if keys are atoms), but even it they were not, copying one megabyte from process to process should take very little compared to its download and deserialization - that you have to do anyway.
No, not necessarily. It’s just what I’ve narrowed down after a number of investigations
To give some details
- The “batch” operation that each thread waits on (inclusive of any I/O and JSON deserialization) takes very little time compared to how long the worker processes wait.
- The scheduler utilization is low, <= 10% on any scheduler. I don’t believe this means I’m waiting on a context switch.
- The work that the workers are doing is taking much more than what would seem to be the “sum of their parts”, as it were.
- This latency only happens when I apply a large amount of load to the service (i.e. many batched operations at once)
Ergo, by elimination, the only thing that makes any sense to me is that this a problem with the message passing taking too long.
Just wild thought. Instead of tossing tons of data around, there is another option, another point of view. If Mohammed cannot get to the mountain, mountain has to go to the Mohammed: Don’t send data to work, send work to the data…
You could have a process holding the requested data and have it receive a function which would process the data…
(I have no idea if it would actually be beneficial… but just wanted to point it out, that it could be a way as well)
totally unrelated, but…
do you know if jobs is still maintained? there are several PRs from the maintainer that are not merged. i’d expect at least dependencies update and stuff like that.
My apologies, I have no idea. I wouldn’t think such a library needs regular updates once it reaches feature completion though.
I tried the library once, about a year ago, and it got the job done. Liked it, would use again.
Passing refc binaries is a very reasonable solution that should be tested before being discounted. You’ll have a single reference instead of many small terms polluting in a process’s heap as they’re collected before sending.
You could likely do two direct tests to check if that’s the case:
- “stuff” the map you are sending with a large, known map, that you know it’s approx a megabyte, maybe adding a large known response under
:uselesskey, so that every message you send is 2x as big as it was before.
- do the correct fetching etc, but then send over the smallest possible map, and see if it makes a difference
My gut feeling is that on a single-box BEAM it should make very little difference, if the sending is 1:N - and even if you hit the network. Slow, maybe, but latency is where the BEAM supposedly shines, because most things end up being “context switches”.
Honest question - you are 100% sure that you are NOT sending all the batch to a single worker, where latency would be caused by it consuming messages in sequence?