Buckets

class rate_control.Bucket[source]

Bases: ABC

Abstract base class for buckets.

async __aenter__() Self[source]

Enter the bucket context.

It may for example start scheduling replenishments.

async __aexit__(*_: Any) bool | None[source]

Exit the bucket context.

It may for example cancel internal background tasks.

abstract acquire(tokens: float) None[source]

Acquire the given amount of tokens.

Parameters:

tokens (float) – The amount of tokens to acquire.

Raises:

RateLimit – Cannot acquire the given amount of tokens.

abstract can_acquire(tokens: float) bool[source]

Whether the given amount of tokens can be acquired.

Parameters:

tokens (float) – The amount of tokens that we want to acquire.

Returns:

Whether the given amount of tokens is available to consume.

abstract async wait_for_refill() None[source]

Wait until some tokens are replenished.

class rate_control.FixedWindowCounter[source]

Bases: BaseWindowedTokenBucket, CapacityUpdatingBucket

Bucket whose refill strategy follows the fixed window counter algorithm.

The bucket refills once every duration seconds, to cap its tokens back to capacity.

__init__(capacity: float, duration: float, **kwargs: Any) None[source]
Parameters:
  • capacity (float) – The number of tokens that can be acquired within duration.

  • duration (float) – The window duration in seconds.

acquire(tokens: float) None

Acquire the given amount of tokens.

Parameters:

tokens (float) – The amount of tokens to acquire.

Raises:

RateLimit – Cannot acquire the given amount of tokens.

can_acquire(tokens: float) bool

Whether the given amount of tokens can be acquired.

Parameters:

tokens (float) – The amount of tokens that we want to acquire.

Returns:

Whether the given amount of tokens is available to consume.

update_capacity(new_capacity: float) None

Update the bucket’s token capacity.

Changes take effect instantly, and the amount of remaining tokens is updated accordingly.

Parameters:

new_capacity (float) – The new token capacity of the bucket.

async wait_for_refill() None

Wait until some tokens are replenished.

class rate_control.LeakyBucket[source]

Bases: BaseRateBucket

Bucket whose refill strategy follows the leaky bucket algorithm.

Only one request can get executed every delay seconds.

__init__(delay: float, **kwargs: Any) None[source]
Parameters:

delay (float) – The delay before a new request can pass through.

acquire(tokens: float = 1) None[source]

Acquire the given amount of tokens.

Parameters:

tokens (float) – The amount of tokens to acquire.

Raises:

RateLimit – Cannot acquire the given amount of tokens.

can_acquire(tokens: float = 1) bool[source]

Whether the given amount of tokens can be acquired.

Parameters:

tokens (float) – The amount of tokens that we want to acquire.

Returns:

Whether the given amount of tokens is available to consume.

async wait_for_refill() None

Wait until some tokens are replenished.

class rate_control.SlidingWindowLog[source]

Bases: BaseWindowedTokenBucket, CapacityUpdatingBucket

Bucket whose refill strategy follows the sliding window log algorithm.

Every consumed tokens get replenished after duration seconds.

__init__(capacity: float, duration: float, **kwargs: Any) None
Parameters:
  • capacity (float) – The number of tokens that can be acquired within duration.

  • duration (float) – The window duration in seconds.

acquire(tokens: float) None

Acquire the given amount of tokens.

Parameters:

tokens (float) – The amount of tokens to acquire.

Raises:

RateLimit – Cannot acquire the given amount of tokens.

can_acquire(tokens: float) bool

Whether the given amount of tokens can be acquired.

Parameters:

tokens (float) – The amount of tokens that we want to acquire.

Returns:

Whether the given amount of tokens is available to consume.

update_capacity(new_capacity: float) None

Update the bucket’s token capacity.

Changes take effect instantly, and the amount of remaining tokens is updated accordingly.

Parameters:

new_capacity (float) – The new token capacity of the bucket.

async wait_for_refill() None

Wait until some tokens are replenished.

class rate_control.BucketGroup[source]

Bases: ContextAware, Bucket, Iterable[Bucket]

Composite bucket that aggregates other buckets.

__init__(*buckets: Bucket, should_enter_context: bool = True, **kwargs: Any) None[source]
Parameters:
  • buckets (Bucket) – The buckets to aggregate within this bucket group.

  • should_enter_context (bool) – Whether entering the context of the bucket group should also enter the context of the underlying buckets. Defaults to True.

acquire(tokens: float) None[source]

For each underlying bucket, acquire the given amount of tokens.

Parameters:

tokens (float) – The amount of tokens to acquire.

Raises:

RateLimit – Any of the underlying buckets cannot acquire the given amount of tokens.

can_acquire(tokens: float) bool[source]

Whether the given amount of tokens can be acquired.

Parameters:

tokens (float) – The amount of tokens that we want to acquire.

Returns:

Whether all the underlying buckets can acquire the given amount of tokens.

async wait_for_refill() None[source]

Wait until any of the underlying buckets refills.