[docs]classBucketBasedRateController(RateController,ABC):"""Mixin for rate controllers that use buckets."""
[docs]def__init__(self,*buckets:Bucket,should_enter_context:bool=True,max_concurrency:Optional[int]=None,**kwargs:Any,)->None:""" Args: buckets: The buckets that will be managed by the rate controller, optional. should_enter_context: Whether entering the context of the rate controller should also enter the context of the underlying buckets, if any. Defaults to True. max_concurrency: The maximum amount of concurrent requests allowed. Defaults to `None` (no limit). """super().__init__(**kwargs)validate_max_concurrency(max_concurrency)self._bucket=(Noneifnotbucketselsebuckets[0]iflen(buckets)==1elseBucketGroup(*buckets,should_enter_context=should_enter_context))self._should_enter_context=should_enter_contextself._max_concurrency=max_concurrencyself._concurrent_requests=0
[docs]@overrideasyncdef__aenter__(self)->Self:"""Enter the controller's context. Also enters the context of the underlying buckets, if the `should_enter_context` flag was set to `True`. """awaitsuper().__aenter__()ifself._should_enter_contextandself._bucketisnotNone:awaitself._bucket.__aenter__()returnself
[docs]@overrideasyncdef__aexit__(self,*exc_info:Any)->Optional[bool]:"""Exit the controller's context. Also exits the context of the underlying buckets, if the `should_enter_context` flag was set to `True`. """ifself._should_enter_contextandself._bucketisnotNone:awaitself._bucket.__aexit__(*exc_info)returnawaitsuper().__aexit__(*exc_info)
@propertydef_is_concurrency_limited(self)->bool:returnself._max_concurrencyisnotNoneandself._concurrent_requests>=self._max_concurrencydef_assert_can_acquire(self,tokens:float)->None:"""Make sure that the request for the given amount of tokens can be processed. Args: tokens: The amount of tokens to acquire. Raises: RateLimit: Cannot process the request for the given amount of tokens. """ifnotself.can_acquire(tokens):raiseRateLimit(f'Cannot process the request for {tokens} tokens.')@contextmanagerdef_hold_concurrency(self)->Iterator[None]:"""Context manager that handles concurrency management during the execution of a request."""self._concurrent_requests+=1try:yieldfinally:self._concurrent_requests-=1self._on_concurrency_release()def_on_concurrency_release(self)->None:"""Perform additional operations when the amount of concurrent requests lowers."""