Scheduling requests¶
Overview¶
Anticipating rate limit errors for your client is great, but wouldn’t it be neat if you could play along with these constraints, and schedule rate limited requests to be executed when the restrictions are loose again?
This is the purpose of Rate Control’s Scheduler
.
If the underlying bucket allows for the request to be executed,
then it is fired instantly. Otherwise, the request is placed in a queue,
and will be executed as soon as possible.
Usage¶
Here is how the Scheduler
can be used:
from asyncio import gather, get_running_loop, run
from rate_control import Duration, FixedWindowCounter, RateController, Scheduler
def current_time() -> float:
loop = get_running_loop()
return loop.time()
async def request_print(controller: RateController, start_time: float) -> None:
async with controller.request():
print(f'Elapsed: {current_time() - start_time :.1f} seconds')
async def main() -> None:
bucket = FixedWindowCounter(capacity=2, duration=Duration.SECOND)
async with Scheduler(bucket) as scheduler:
await gather(*(
request_print(scheduler, current_time())
for _ in range(3)
))
run(main())
from trio import current_time, open_nursery, run
from rate_control import Duration, FixedWindowCounter, RateController, Scheduler
async def request_print(controller: RateController, start_time: float) -> None:
async with controller.request():
print(f'Elapsed: {current_time() - start_time :.1f} seconds')
async def main() -> None:
bucket = FixedWindowCounter(capacity=2, duration=Duration.SECOND)
async with Scheduler(bucket) as scheduler, open_nursery() as nursery:
for _ in range(3):
nursery.start_soon(request_print, scheduler, current_time())
run(main)
from anyio import create_task_group, current_time, run
from rate_control import Duration, FixedWindowCounter, RateController, Scheduler
async def request_print(controller: RateController, start_time: float) -> None:
async with controller.request():
print(f'Elapsed: {current_time() - start_time :.1f} seconds')
async def main() -> None:
bucket = FixedWindowCounter(capacity=2, duration=Duration.SECOND)
async with Scheduler(bucket) as scheduler, create_task_group() as task_group:
for _ in range(3):
task_group.start_soon(request_print, scheduler, current_time())
run(main)
Elapsed: 0.0 seconds
Elapsed: 0.0 seconds
Elapsed: 1.0 seconds
Similarly, the Scheduler
can postpone jobs
if the specified max_concurrency
was reached,
to be processed when another request exits the
request()
context.
Fill or kill¶
You can choose to fall back to the simple rate limiting behavior
for a given request, by providing a fill_or_kill=True
argument
to request()
.
The RateLimit
exception will be raised if the request
cannot be processed instantly.
Request prioritization¶
Queuing requests¶
By default, scheduled jobs are placed in a queue, and will be executed by ascending cost, and in the order they were received if these weights are equal.
A complete reference of the queue algorithms offered by Rate Control can be found on this page.
Specifying a Priority
¶
Sometimes, some functionalities of your application may be more critical than others.
In order to schedule the execution of important requests before the others,
request()
can take a priority
argument.
Under the hood, there is one request queue for each available priority level. Therefore, requests with higher priority will bypass the queue algorithms that apply within a same priority level.