PYTHON
Implement Leaky Bucket Rate Limiting for API Consumers
Control outgoing API request rates using a Leaky Bucket algorithm with Python's asyncio to prevent hitting external API limits and ensure smooth operations.
import asyncio
import time
from collections import deque
class LeakyBucketRateLimiter:
def __init__(self, capacity: int, leak_rate: float):
"""
Initializes a Leaky Bucket Rate Limiter.
:param capacity: The maximum number of requests the bucket can hold.
:param leak_rate: How many requests "leak" (are allowed) per second.
"""
self.capacity = capacity
self.leak_rate = leak_rate # requests per second
self.bucket = deque() # Stores timestamps of when requests were added
self.last_leak_time = time.monotonic()
self.lock = asyncio.Lock()
async def _leak(self):
"""Removes requests from the bucket based on leak_rate."""
now = time.monotonic()
time_passed = now - self.last_leak_time
self.last_leak_time = now
requests_to_leak = int(time_passed * self.leak_rate)
for _ in range(requests_to_leak):
if self.bucket:
self.bucket.popleft() # Remove the oldest request
async def acquire(self):
"""
Acquires a slot in the bucket. If the bucket is full, it waits.
"""
async with self.lock:
await self._leak() # Attempt to leak before adding a new request
if len(self.bucket) < self.capacity:
self.bucket.append(time.monotonic())
return # Successfully acquired
else:
# Bucket is full, calculate when the next slot will be available
# This assumes uniform leaking, oldest request will leak first
oldest_request_time = self.bucket[0]
time_until_leak = (oldest_request_time + (1 / self.leak_rate)) - time.monotonic()
if time_until_leak > 0:
await asyncio.sleep(time_until_leak)
# After waiting, retry acquiring
# A recursive call or a loop with re-checking is safer
await self.acquire() # Recursively call to re-check after delay
async def make_api_request(self, request_id: int):
await self.acquire()
print(f"[{time.monotonic():.2f}] Making API request {request_id}. Bucket size: {len(self.bucket)}/{self.capacity}")
await asyncio.sleep(0.1) # Simulate network latency or actual API call
async def main():
# Allow 5 requests bursts, then 2 requests per second
limiter = LeakyBucketRateLimiter(capacity=5, leak_rate=2)
print("Sending 15 requests rapidly...")
tasks = [limiter.make_api_request(i) for i in range(1, 16)]
await asyncio.gather(*tasks)
print("All requests initiated.")
if __name__ == "__main__":
asyncio.run(main())
How it works: This Python snippet implements a Leaky Bucket algorithm for rate-limiting outgoing API requests using `asyncio`. The `LeakyBucketRateLimiter` class simulates a bucket with a fixed `capacity` that "leaks" at a constant `leak_rate` (requests per second). Before making an API call, the `acquire` method attempts to add a "token" (representing a request) to the bucket. If the bucket is full, it calculates the necessary delay until a slot becomes available due to "leaking" and asynchronously waits. This ensures that the application doesn't exceed an external API's rate limits, preventing errors and potential IP bans, while also allowing for short bursts of requests up to the bucket's capacity.