Using bucket groups¶
Chaining multiple buckets¶
Sometimes you may want to enforce several rate limit constraints for your application. For example, many API gateways define both a long-running rate limitation, as well as a one that watches a shorter period of time.
For example, if you want to interact with the Github API, here are some specifications for the rate limits:
5000 requests are allowed per hour.
900 points are allowed per minute per REST API endpoint.
Below is how you could implement such rate limiting strategy using Rate Control. Shorter periods of time are used so that you can run this example at home:
2 requests are allowed every second.
3 requests are allowed over each 2-second window.
from asyncio import run, sleep
from rate_control import Duration, FixedWindowCounter, RateLimit, RateLimiter
async def main() -> None:
first_bucket = FixedWindowCounter(2, Duration.SECOND)
second_bucket = FixedWindowCounter(3, 2 * Duration.SECOND)
async with RateLimiter(first_bucket, second_bucket) as rate_limiter:
async with rate_limiter.request():
print('First request passes')
async with rate_limiter.request():
print('Second request passes')
try:
async with rate_limiter.request(): ...
except RateLimit:
print('First bucket is empty')
await sleep(Duration.SECOND)
await sleep(0) # yield control to the buckets
async with rate_limiter.request():
print('New request passes after replenishment')
try:
async with rate_limiter.request(): ...
except RateLimit:
print('Now second bucket is empty')
run(main())
from trio import run, sleep
from trio.lowlevel import checkpoint
from rate_control import Duration, FixedWindowCounter, RateLimit, RateLimiter
async def main() -> None:
first_bucket = FixedWindowCounter(2, Duration.SECOND)
second_bucket = FixedWindowCounter(3, 2 * Duration.SECOND)
async with RateLimiter(first_bucket, second_bucket) as rate_limiter:
async with rate_limiter.request():
print('First request passes')
async with rate_limiter.request():
print('Second request passes')
try:
async with rate_limiter.request(): ...
except RateLimit:
print('First bucket is empty')
await sleep(Duration.SECOND)
await checkpoint() # yield control to the buckets
async with rate_limiter.request():
print('New request passes after replenishment')
try:
async with rate_limiter.request(): ...
except RateLimit:
print('Now second bucket is empty')
run(main)
from anyio import run, sleep
from anyio.lowlevel import checkpoint
from rate_control import Duration, FixedWindowCounter, RateLimit, RateLimiter
async def main() -> None:
first_bucket = FixedWindowCounter(2, Duration.SECOND)
second_bucket = FixedWindowCounter(3, 2 * Duration.SECOND)
async with RateLimiter(first_bucket, second_bucket) as rate_limiter:
async with rate_limiter.request():
print('First request passes')
async with rate_limiter.request():
print('Second request passes')
try:
async with rate_limiter.request(): ...
except RateLimit:
print('First bucket is empty')
await sleep(Duration.SECOND)
await checkpoint() # yield control to the buckets
async with rate_limiter.request():
print('New request passes after replenishment')
try:
async with rate_limiter.request(): ...
except RateLimit:
print('Now second bucket is empty')
run(main)
First request passes
Second request passes
First bucket is empty
New request passes after replenishment
Now second bucket is empty
Composite buckets¶
What happens under the hood in the above example is that the rate limiter groups
the two buckets in a BucketGroup
, which does the job of watching the
replenishments for each of the underlying buckets.
BucketGroup
is a subclass of Bucket
,
therefore everything you may do with buckets, you can also do with bucket groups,
may it be consuming tokens, waiting for refill, or even forming token groups of token groups!