ExWaiter - Helper for waiting on asynchronous conditions to be met

I just released the first version of ExWaiter - A helper for waiting on asynchronous conditions to be met.

https://hexdocs.pm/ex_waiter/ExWaiter.html
https://github.com/baldwindavid/ex_waiter

Here is a quick intro from the documentation:

Why This Exists?

In some testing scenarios there is no obvious way to ensure that asynchronous side effects have taken place without continuously checking for successful completion. For example, perhaps an assertion is needed on click data being asynchronously persisted to the database. It is not difficult to write a recursive function to handle this one-off, but there is a bit of ceremony involved.

Additionally, perhaps it is desirable to configure the amount of delay prior to each check, the total number of attempts, a convention for handling exhausted retries, an easy way to inject callbacks, and a record of the history of each attempt.

This simple package provides all that and more! Well, actually just that.

A Walkthrough

The package provides await/2 and await!/2 functions. Each requires an anonymous function that may return {:ok, value}, :ok, or true for a successful attempt or {:error, value}, :error, or false for a failed attempt. The tagged tuples must be used if you need a return value or want to track the history of value changes. Additional options are available for setting the desired number of attempts and custom delay between attempts.

Let’s use await!/2 to check the database for the most recently persisted click.

click = ExWaiter.await!(fn ->
  case Clicks.most_recent() do
    %Click{} = click ->
      {:ok, click}

    value ->
      # This is a good place for a callback you might want to run each
      # time the condition is unmet (e.g. flushing jobs).
      {:error, value}

  end
end)

By default, this will check the database up to 5 times spaced out over 150ms. If, at some point, the condition is met, the %Click{} will be returned. If retries are exhausted, an exception will be raised that looks something like:

 ** (ExWaiter.Exceptions.RetriesExhausted) Tried 5 times over 150ms, but condition was never met.

 %ExWaiter.Waiter{
   attempt_num: 5,
   attempts: [
     %ExWaiter.Attempt{attempt_num: 1, delay_before: 10, fulfilled?: false, value: nil},
     %ExWaiter.Attempt{attempt_num: 2, delay_before: 20, fulfilled?: false, value: nil},
     %ExWaiter.Attempt{attempt_num: 3, delay_before: 30, fulfilled?: false, value: nil},
     %ExWaiter.Attempt{attempt_num: 4, delay_before: 40, fulfilled?: false, value: nil},
     %ExWaiter.Attempt{attempt_num: 5, delay_before: 50, fulfilled?: false, value: nil},
   ],
   attempts_left: 0,
   delay_before_fn: #Function<...>,
   fulfilled?: false,
   checker_fn: #Function<...>,
   num_attempts: 5,
   total_delay: 150,
   value: nil
 }

This displays a Waiter struct, which includes a recording of everything that happened during attempts.

The await/2 function would return either {:ok, %Click{}, %Waiter{}} or {:error, nil, %Waiter}. It can be helpful to inspect this Waiter struct for debugging and optics into timing.

Additional Options

  • :delay_before_fn - takes a function that receives the %Waiter{} struct at that moment and returns a number of milliseconds to delay prior to performing the next attempt. The default is fn waiter -> waiter.attempt_num * 10 end.
  • :num_attempts - The number of attempts before retries are exhausted. (default: 5)
9 Likes

Based upon feedback a number of changes/additions have been made and 0.2.1 released.

  • await!/2 has been introduced as an alternative to await/2. It returns ONLY the “value” or raises an exception
  • removed the exception_on_retries_exhausted? option - no longer needed with the introduction of await!/2
  • the anonymous “checker” function passed as an argument now supports true or :ok for success and false or :error for failure (in addition to the existing {:ok, value} or {:error, value}). This means that no “value” can be returned, but would be useful if all you care is whether an exception is raised or whether an error tuple is returned.
  • a lot of documentation updates/additions
1 Like

This package has changed quite a lot since the original release. The latest includes the following changes/additions:

receive_next/1 and receive_next!/1

The package now includes receive_next/receive_next! functions which will, as you might imagine, receive the next message/s in a specified timeout. This can be handy in tests as a streamlined way to gather multiple messages within a given timeout and assert on their order.

send(self(), :hello)
send(self(), :hi)
assert {:ok, [:hello, :hi]} = ExWaiter.receive_next(2)

Rename await to poll

The await/await! terminology has changed to poll/poll! to better describe what they are intended to do.

new_poller

All polling operations start out by building a Poller struct via new_poller/2 to track the status of a polling session. This struct can then be passed around while actual polling is kicked off separately. (e.g. new_poller(...) |> poll().

Configurable delay and max attempts

The delay in between attempts can be set as an integer (in ms) or a function to dynamically configure backoff. The max_attempts can be an integer, :infinity, or a function to dynamically configure whether to retry based upon the current conditions just after a failed attempt. This could also be used for throttling and rate limiting scenarios.

poll_once/1

Previously, only synchronous polling was supported. This was fine for tests, but blocking for long periods in production code is often not what you’re looking for. More complex asynchronous scenarios (using Process.send_after, multiple processes, etc) are now supported via the poll_once/1 function.

Below is a contrived example of scheduling retries. In practice, you might use
a GenServer with handle_info and send to self or a different process that
notifies the caller when finished.

poller =
  ExWaiter.new_poller(fn ->
    case Projects.get(1) do
      %Project{} = project -> {:ok, project}
      _ -> :error
    end
  end)

Suppose the first attempts fails…

assert {:error, :attempt_failed, poller} = ExWaiter.poll_once(poller)

The returned Poller struct includes the default delay for the first retry of
10 milliseconds. This can be used to schedule a later retry.

assert poller.next_delay == 10
Process.send_after(self(), {:retry, poller}, poller.next_delay)

Using the receive_next!/2 function built into this package we receive the
{:retry, poller} message sent via Process.send_after.

assert {:retry, poller} = ExWaiter.receive_next!()

We try another attempt that fails, but there are still retries available.

assert {:error, :attempt_failed, poller} = ExWaiter.poll_once(poller)

The default delay for a second retry is 20 milliseconds and we use that to
schedule another retry.

assert poller.next_delay == 20
Process.send_after(self(), {:retry, poller}, poller.next_delay)

We again receive our scheduled message and kickoff another poll attempt. This
time our project is there and we can get it on the returned Poller struct in
the value attribute.

assert {:retry, poller} = ExWaiter.receive_next!()
assert {:ok, poller} = ExWaiter.poll_once(poller)
assert %{
  attempt_num: 3,
  next_delay: nil,
  total_delay: 30,
  value: %Project{}
} = poller
2 Likes

Rate limiting functionality has been added to this package. The limit_rate/2 function enforces a configurable rate limit using the token bucket algorithm.

“Token Bucket” is a common algorithm for enforcing rate limits like “5 requests per second”. A “bucket” contains “tokens” (just a name representing a count) and the time the last token was added to the bucket. The number of tokens indicates how many requests can be made in a given period.

This function takes a “bucket” tuple {token_count, last_updated_timestamp} and configuration options and checks if a bucket has the required token/s to make a request. The resulting updated “bucket” is then passed in for enforcement of the next request…rinse and repeat. In each case, if the required tokens are present, you make the request; otherwise you don’t and wait until you can. This function intentionally has no constraints or opinions around bucket storage or state management (see “Storage and State” below for a simple example of state management with a Genserver)

Each enforcement of a given bucket first checks the amount of time elapsed since the last tokens were “refilled” in the bucket. For “5 requests per second”, 5 is the refill_rate while 1 second is the interval. The bucket will be refilled based upon the number of intervals that have passed. If 2 seconds have passed, the user will have gained 10 more tokens (2 seconds x 5 refill rate). If less than a second has passed, the bucket would not be refilled with any tokens.

Let’s assume the user already had 3 tokens and those 2 seconds have passed; they would now have 13 tokens. After adding the tokens, 1 token is then subtracted for the request and something like {:ok, {12, 1678657185594}, %Limiter{...}} is returned. The second element in the tuple is the updated “bucket”. The first element in that bucket (12) is the number of remaining tokens and the second element is the unix timestamp (in milliseconds) as the new updated time. This bucket can then be passed as the first argument of a future call of this function. The last element in the tuple is a Limiter struct which contains full details of the result (see details below). Had the user made a request with no remaining tokens and no tokens were refilled then something like {:error, {0, 1678657185530}, %Limiter{...}} would be returned. The timestamp would not be updated in this case because no tokens were refilled.

This function allows configuration of both the refill_rate and interval. That may be enough in many cases, but it also supports burst_limit and cost options. “Burst limit” is the maximum amount of tokens that can be accumulated in a bucket (i.e. the bucket size). If the refill rate was 5 per second and no requests are made through the night you wouldn’t want the user to continually fill the bucket and then be able to make a burst of 144k requests in a small window of time, right? By default, the burst_limit is equal to the refill_rate, but it can be configured separately if you want the burst limit to be higher than the refill rate (for infrequent bursts). The cost is simply the number of tokens to be “paid” (subtracted) during enforcement. By default the cost is 1, but is configurable in case a single enforcement actually represents, say, 5 requests.

Options

  • :refill_rate - The number of tokens to refill in the bucket per “interval” since the last refill. The default is 1.
  • :burst_limit - The maximum amount of tokens that can be accumulated in a bucket (i.e. the bucket size). The default is equal to the refill_rate. A burst limit that is higher than the refill rate would support infrequent bursts, whereas the default enforces more of a consistent limit.
  • :interval - The time window that the rate limit enforces in milliseconds. For example, for a rate limit of “5 requests per second” the interval would be 1000 milliseconds (i.e. 1 second). The default is 1000.
  • :cost - The number of tokens to remove from the bucket upon enforcement of the rate limit in case a single enforcement represents more than 1 API request. The default is 1.

Limiter struct

This function returns a tuple of the format {:ok, {remaining_tokens, updated_at}, %Limiter{...}} or {:error, {remaining_tokens, updated_at}, %Limiter{...}}. This Limiter struct contains additional details about refilled tokens, paid tokens, when the next tokens will be refilled, etc. This can be helpful for scheduling future requests and understanding the result of various configurations. Below is an example with a “bucket” containing 3 tokens with a last update timestamp of 1678822656122. This bucket tuple along with the specified configuration options returns the Limiter struct below it. The resulting bucket can be used for the next result.

result =
  ExWaiter.limit_rate({3, 1678822656122},
    refill_rate: 3,
    interval: 50,
    burst_limit: 5,
    cost: 2
  )

{:ok, {1, 1678822656122},
  %ExWaiter.RateLimiting.Limiter{
    # How many tokens to add with each passing interval
    refill_rate: 3,
    # Milliseconds between token refills
    interval: 50,
    # Max amount of tokens that can be in a bucket
    burst_limit: 5,
    # Amount of tokens to subtract
    cost: 2,
    # Timetamp when this limit was checked
    checked_at: 1678822656124,
    # If there is no bucket to pass in (e.g. user's first request ever)
    # this will be equal to checked_at; otherwise nil
    created_at: nil,
    # The timestamp passed into the check or nil if there is no bucket
    # to pass to the function
    previous_updated_at: 1678822656122,
    # Updated timestamp after the check - Equal to checked_at
    # if tokens are refilled; otherwise equal to previous_updated_at
    updated_at: 1678822656122,
    # Timestamp of next refill - If tokens were refilled this will
    # be equal to checked_at + interval; otherwise equal to
    # previous_updated_at + interval
    next_refill_at: 1678822656172,
    # Milliseconds until the next refill
    ms_until_next_refill: 48,
    # Number of tokens passed into the check or nil if there is no bucket
    # to pass to the function
    previous_tokens: 3,
    # Number of tokens refilled in this check
    refilled_tokens: 0,
    # Number of tokens in the bucket after refilling,
    # but prior to paying the cost - Equal to
    # previous_tokens + refilled_tokens
    tokens_after_refill: 3,
    # Number of tokens subtracted in this check
    paid_tokens: 2,
    # Number of tokens remaining - Equal to
    # previous_tokens + refilled_tokens - paid_tokens
    tokens_after_paid: 1
  }} = result

Examples

By default 1 token is refilled every 1 second. Below we enforce on a nil bucket; imagine this is a bucket for a user that has not made any requests before. The bucket will automatically get 1 token (equal to the “burst limit”) so the first enforcement is successful.

{:ok, {0, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(nil)

The first element of the bucket is the number of tokens left. The second element is a unix timestamp in milliseconds. An example bucket with a single token might be {1, 1678669778198}.

We then pass that updated bucket and try to enforce again. Since the bucket now has no tokens we’ll get back an error.

{:error, {0, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(bucket)

We then enforce again, but given a second has now passed, the bucket will get refilled with another token. The token is used and we get a successful return showing that the bucket again has no tokens.

Process.sleep(1000)
{:ok, {0, _}, %Limiter{}} = ExWaiter.limit_rate(bucket)

Configure Refill Rate

The default refill_rate is 1, but this configurable. This is the number of tokens to refill in the bucket per “interval” that has passed since the last refill.

opts = [refill_rate: 3]
{:ok, {2, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(nil, opts)
{:ok, {1, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)
{:ok, {0, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)
{:error, {0, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)
Process.sleep(1000)
{:ok, {2, _}, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)

Configure Interval

The default interval is 1 second, but this is configurable. This is the time window that the rate limit enforces in milliseconds. For example, for a rate limit of “5 requests per second” the interval would be 1000 milliseconds.

opts = [interval: 50]
{:ok, {0, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(nil, opts)
{:error, {0, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)
Process.sleep(50)
{:ok, {0, _}, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)

Configure Burst Limit

The default burst_limit is equal to the refill_rate, but this is configurable. This is the maximum amount of tokens that can be accumulated in a bucket (i.e. the bucket size). A burst limit that is higher than the refill rate would support infrequent bursts, whereas the default enforces more of a consistent limit. Note that passing in nil for the first argument results in a new “full” bucket with a number of tokens equal to the burst limit.

opts = [refill_rate: 3, burst_limit: 5]
{:ok, {4, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(nil, opts)
{:ok, {3, _}, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)

Configure Cost

The cost defaults to 1, but this is configurable. This is the number of tokens to remove from the bucket upon enforcement of the rate limit in case a single enforcement represents more than 1 API request.

opts = [cost: 3, burst_limit: 10]
{:ok, {7, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(nil, opts)
{:ok, {4, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)
{:ok, {1, _} = bucket, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)
{:error, {1, _}, %Limiter{}} = ExWaiter.limit_rate(bucket, opts)

Storage and State

This function intentionally has no constraints or opinions around bucket storage or state management. It just takes a bucket and rate limit configuration and returns an updated bucket with a yay or nay on whether sufficient tokens exist for the request to be made. This provides for a lot of flexibility; you can manage a collection of user buckets with a Genserver, ETS, redis, etc. The only thing it requires is that “buckets” are passed to it that are either nil (no bucket for the user yet) or a tuple containing the token count and last request as a unix timestamp in milliseconds. The function will both produce (if the bucket is nil) and update those bucket tuples; you don’t have to do any of that. You just need to store it somewhere and be able to return it later in the same format. Below is an example of a Genserver that stores a map of buckets with username keys in state.

defmodule RateLimitServer do
  use GenServer

  def init(buckets) do
    {:ok, buckets}
  end

  def handle_call({:enforce, bucket_key}, _, buckets) do
    bucket = Map.get(buckets, bucket_key)

    {_, updated_bucket, %Limiter{}} =
      result =
      ExWaiter.limit_rate(bucket,
        refill_rate: 2,
        interval: 100
      )

    {:reply, result, Map.put(buckets, bucket_key, updated_bucket)}
  end
end

{:ok, server} = GenServer.start_link(RateLimitServer, %{})

{:ok, {1, _}} = GenServer.call(server, {:enforce, "jane"})
{:ok, {0, _}} = GenServer.call(server, {:enforce, "jane"})
{:ok, {1, _}} = GenServer.call(server, {:enforce, "bill"})
{:error, {0, _}} = GenServer.call(server, {:enforce, "jane"})
{:ok, {0, _}} = GenServer.call(server, {:enforce, "bill"})
{:error, {0, _}} = GenServer.call(server, {:enforce, "bill"})
Process.sleep(100)
{:ok, {1, _}} = GenServer.call(server, {:enforce, "bill"})
{:ok, {1, _}} = GenServer.call(server, {:enforce, "pam"})
{:ok, {1, _}} = GenServer.call(server, {:enforce, "jane"})
{:ok, {0, _}} = GenServer.call(server, {:enforce, "bill"})
{:error, {0, _}} = GenServer.call(server, {:enforce, "bill"})

%{
  "jane" => {1, _},
  "bill" => {0, _},
  "pam" => {1, _}
} = :sys.get_state(server)
4 Likes