[docs]classBaseRateBucket(TokenBasedBucket,ContextAware,Bucket,ABC):"""Base class for token buckets that refill at a certain rate."""
[docs]def__init__(self,capacity:float,delay:float,**kwargs:Any)->None:""" Args: capacity: The number of tokens that can be acquired within `delay`. delay: The refill delay in seconds. """super().__init__(capacity,**kwargs)validate_delay(delay)self._delay=delayself._refill_event=Event()
def_ensure_refill(self,tokens:float=1)->None:ifself._should_schedule_refill():try:self._task_group.start_soon(self._wait_and_refill,tokens)exceptAttributeErrorase:raiseRuntimeError(f"Make sure to enter the bucket's context using 'async with {self}'")frome@abstractmethoddef_should_schedule_refill(self)->bool:""" Returns: Whether a replenishment of the bucket should be scheduled. """asyncdef_wait_and_refill(self,tokens:float)->None:awaitsleep(self._delay)self._refill(tokens)self._refill_event.set()self._refill_event=Event()@abstractmethoddef_refill(self,tokens:float)->None:"""Add some tokens back to the bucket."""