We are building an application where we have a lot of import requests, which each spawn one or multiple processes, one per requested resource we want to import.
The concern of a resource-worker is:
get data from external API.
transform data into our internal data format.
return data to batch (which bundles all resources in a request together, before returning all of them at once to the requester).
The actual resources are requested from an external API. However, on a single API-account, you’re only allowed to do n API requests to the server per second, or do m queries every 15 minutes, etc.
What I was thinking, was to create a pool of n HTTP-client-workers, and have all resource-workers request data through this pool.
But here is the catch: What is the best way to communicate between the resource-workers and the pool?
Should all resource-workers try to open a pool-connection continuously? (i.e. a resource-worker loops until one can be established). This seems to result in an insane amount of message-passing in the application, as all resource-workers will send ‘can I be helped yet?’ messages all the time.
Should resource-workers be put in some sort of queue in the pool-server, and then get a worker-PID returned to them as soon as they are next in line? Caveat: What should a resource do while waiting? Should the resource monitor the pool to ensure that it will still be helped somewhere in the future? (because when the pool-server crashes, the queue will be gone)
@hubertlepicki: You are probably right. A generic pool implementation is very feature-rich, and therefore a difficult project to maintain. I do not want to implement my own pool, unless I have to. (That being said, the little Elixir & OTP Guidebook has building your custom pool as one of its exercises)
The problem I am having, however, is that many APIs apply rate limiting (An example: Twitter) to ensure that their API-endpoint is not DDoSed by a single API-consumer.
In for example the basic Twitter API, you can only perform 180 queries every 15 minutes. What I am looking for, is a pool-solution that keeps this in mind. What I am having trouble with, is how to either combine this system with an existing pool-solution such as Httpoison or Poolboy, or, if this is not possible, how to write my own simple pool that does this internally.
Also, how should such a pool interface with the rest of the application?
Should the pool yell ‘Yo! I ain’t doin’ nothin’ for another 14 minutes’ whenever someone tries to access a worker? (Make it the concern of the requester to try again later)
Or should the pool say ‘Ok, get in line, but be prepared to wait’. (Make it the concern of the pool)
Rather than conflating rate limiter with the pool, you could have a simple rate limiter which responds with yes/no. Then, each client process (in your case resource worker) would ask the limiter for permission. If the permission is given, the client can safely issue the API request (or any other limited job).
This means you don’t need a pool at all. Instead, the rate limiter ensures that at most n API requests are issued in a given time frame. You won’t be able to do more, since limiter won’t allow it.
There are a couple of libraries for this available. Quick googling revealed this one. I never tried it so can’t comment how it works. For my blog, I implemented a naive solution based on ETS counters. This one’s nice because permission questions are not going through the single process, so it should have better throughput. However, since it’ based on ETS tables, it can’t work as a cluster-wide (global) limiter.
It’s worth mentioning that these rate limiters are not 100% consistent, because things happen concurrently. Namely, when a limiter gives you permission, you still need to make the request. So I think it’s theoretically possible that in one second you end up issuing a bit more requests than allowed. A simple remedy for this would be to set the internal limit a bit lower than the one imposed by the external API. If you really want strong consistency I think the only option is to forward all calls through the single process. This would provide stronger guarantees, but might affect the throughput.
Thank you. That seems like a really sensible way to solve the problem!
Now, is it better to, on failure (when the limit has been reached), have clients call every (second? 1/10 second?) to check if they can already send a new request, or is it better to check how much time there is left until the bucket ‘rolls over’, and use a send_after to try again at that time?
My guess would be the second (it ‘feels’ more performant, i.e. less inter-process message spam), but on the other hand this means that the client processes are concerned with the internals of the rate limiter, which is maybe not so nice.
The thing is that both might cause latency variations or possibly even starvations under high loads. I think a more reliable mechanism would be to change the rate limiter a bit. The proposed approach of yes/no answer works better if you want to shed (drop) when the limit is exceeded. If you want to queue, it would be better to make the limiter always answers with a yes. If the rate is exceeded in the current interval, the limiter will respond later (with GenServer.reply), when the action is allowed. In this version, the limiter would need to monitor the caller. If the caller dies before it gets the slot, it should be removed.
You may also want to look into Ulf Wiger’s jobs which might support something out of the box. Never tried it personally, so can’t say for sure.
@sasajuric how about simething really simple like:
a GenServer that stores a list of X recent requests
gets called each time when request attempt is being made
if detects that X+1 requests are too fast in given time span, it sleeps for computed period of time, then returns
otherwise it returns from a call handler straight away
You are 100% certain there’s only one request handled by genserver at a time and I think you an compute exactly the sleep time you need to not go over rate limit, and would never starve any requests.
I.e. you never get “no” response, it’s always yes. But the call may be “hung up” for X seconds. That would work like a counting semaphore, basically.
Not sure if you mean to insert sleep directly in the GenServer. If yes, then I wouldn’t advise it, because GenServer should be responsive for subsequent requests and other system messages.
So the proper solution IMO would be to store a caller ref in an internal queue if we can’t issue more requests. Then, the server would tick in regular intervals (using e.g. Process.send_after or some other mechanism), and on every tick it would notify at most n oldest items from the queue using GenServer.reply.
Yeah, that’s basically what I suggested in my previous reply
R internally stores a queue of PIDs of registered caller processes.
R stores the timespan that is the rate limited time period: time_interval.
R stores the maximum amount of requests that is allowed in this timespan: max_requests
R stores, the amount of requests that have been handled during this interval: requests_done_in_interval
Now:
C tells R to inform it when it is allowed perform the wanted action.
R stores C’s PID in an internal queue.
R starts monitoring C to be sure that, when C crashes, it is removed from the internal queue.
R responds with :ok, to let C know that it has been registered.
C starts monitoring R, to be sure that, when R crashes, it knows and can either re-register itself with R’s replacement, or terminate itself. (Which of these two to pick is application-specific, I presume?)
⁕ If the limit max_requests hasn’t been reached yet, R sends a message to C, indicating that it is allowed to go ahead with the requested action. (R will now stop monitoring C, and vice-versa). R also increases the internally stored requests_done_in_interval
If the max_requestshas been reached, R will instead enqueue C in its internal queue.
every time_interval (using (Process.send_after/2), R will reset its request_done_in_interval to 0. Then, the top max_requests of the queue will be popped and handled sequentially using the procedure described at ⁕.
If you use GenServer.call to issue a request to the rate limiter it will simplify some things. In particular, you won’t need to monitor R from C (GenServer.call does that for you). You also don’t need to monitor C from R if you can immediately respond.
To sketch the idea, handle_call could look something like:
def handle_call(:request_permission, from, state) do
if can_grant?(state) do
{:reply, :ok, inc_issued_count(state)}
else
{:noreply, enqueue_caller(state, from)}
end
end
Where enqueue_caller would have to store the from tuple and setup a monitor to the caller, whose pid is the first element of the from tuple (see here).
Then, when you want to respond to the caller at the later point, you can use GenServer.reply, passing the dequeued from tuple. When doing this, you should also demonitor the corresponding caller.
I think you could also make it work without using Process.send_after, by relying on timeout values in response tuples and some juggling with monotonic time. This would allow you to reset the counter even when you’re highly loaded, because you could also reset it when handling an incoming request. However, that would probably be a more complex solution, so I’d start with send_after, and then maybe refine once everything else is in place.
Using GenServer.call with an infinite timeout (or at least something rather big) blocks the requesting process completely, until it has an answer, right?
So if some other process were then to ask for its status, this would not be handled until the first waiting call is done. Is that a correct assumption?
In any case, this is going to be a lot of fun to build .
The default timeout for GenServer.call is 5 secs. While the caller is waiting, it is blocked. If you want the caller to be responsive to requests by other processes, then issuing the call to rate limiter from a separate process (e.g. a Task) might help. Another option is to expose the status in some public or protected ETS table. This could then be read without querying the process. The property feature of gproc essentially does this.
However sbroker master also provides a framework for building custom job regulators, there is even a property testing framework for the testing it. Its Erlang code but the simplest one limits the number of concurrent tasks: https://github.com/fishcakez/sbroker/blob/master/src/sregulator_open_valve.erl. The regulator process plugs together a valve, a queue and a meter, and has 3 different ways to request to run, blocking with queue, blocking without queueing and asynchronous with queue. The queues and meters come with sbroker but you would need to write the valve. I would really like a PR adding this feature and can help with the property based testing.
By the way, would making use of GenStage make any sense? I’ve watched the Genstage presentation, and it seems that I has some overlap (but haven’t played with it yet, so I could totally missing the point ).
@swennemans In the end, as projects in companies tend to go, the project moved a lot slower because of other work that was deemed more important at the time. So above idea still is mostly theoretical; I have not implemented it yet.
GenStage seems like a perfect fit for this problem. I have not used it a lot myself yet, so I cannot be 100% sure, but it looks like it simplifies constructing these kinds of concurrent pipelines a lot.