[docs]classBucketGroup(ContextAware,Bucket,Iterable[Bucket]):"""Composite bucket that aggregates other buckets."""
[docs]def__init__(self,*buckets:Bucket,should_enter_context:bool=True,**kwargs:Any)->None:""" Args: buckets: The buckets to aggregate within this bucket group. should_enter_context: Whether entering the context of the bucket group should also enter the context of the underlying buckets. Defaults to `True`. """super().__init__(**kwargs)self._buckets=bucketsself._should_enter_context=should_enter_contextself._send_stream,self._recv_stream=create_memory_object_stream[Bucket]()
[docs]@overrideasyncdefwait_for_refill(self)->None:"""Wait until any of the underlying buckets refills."""withsuppress(WouldBlock):whileTrue:refilled_bucket=self._recv_stream.receive_nowait()self._listen_for_refill(refilled_bucket)awaitself._recv_stream.receive()
[docs]@overridedefcan_acquire(self,tokens:float)->bool:"""Whether the given amount of tokens can be acquired. Args: tokens: The amount of tokens that we want to acquire. Returns: Whether all the underlying buckets can acquire the given amount of tokens. """returnall(bucket.can_acquire(tokens)forbucketinself._buckets)
[docs]@overridedefacquire(self,tokens:float)->None:"""For each underlying bucket, acquire the given amount of tokens. Args: tokens: The amount of tokens to acquire. Raises: RateLimit: Any of the underlying buckets cannot acquire the given amount of tokens. """self._assert_can_acquire(tokens)forbucketinself._buckets:bucket.acquire(tokens)