PYTHON

Implement Leaky Bucket Rate Limiting for API Consumers

Control outgoing API request rates using a Leaky Bucket algorithm with Python's asyncio to prevent hitting external API limits and ensure smooth operations.

import asyncio
import time
from collections import deque

class LeakyBucketRateLimiter:
    def __init__(self, capacity: int, leak_rate: float):
        """
        Initializes a Leaky Bucket Rate Limiter.
        :param capacity: The maximum number of requests the bucket can hold.
        :param leak_rate: How many requests "leak" (are allowed) per second.
        """
        self.capacity = capacity
        self.leak_rate = leak_rate # requests per second
        self.bucket = deque() # Stores timestamps of when requests were added
        self.last_leak_time = time.monotonic()
        self.lock = asyncio.Lock()

    async def _leak(self):
        """Removes requests from the bucket based on leak_rate."""
        now = time.monotonic()
        time_passed = now - self.last_leak_time
        self.last_leak_time = now

        requests_to_leak = int(time_passed * self.leak_rate)
        
        for _ in range(requests_to_leak):
            if self.bucket:
                self.bucket.popleft() # Remove the oldest request

    async def acquire(self):
        """
        Acquires a slot in the bucket. If the bucket is full, it waits.
        """
        async with self.lock:
            await self._leak() # Attempt to leak before adding a new request

            if len(self.bucket) < self.capacity:
                self.bucket.append(time.monotonic())
                return # Successfully acquired
            else:
                # Bucket is full, calculate when the next slot will be available
                # This assumes uniform leaking, oldest request will leak first
                oldest_request_time = self.bucket[0]
                time_until_leak = (oldest_request_time + (1 / self.leak_rate)) - time.monotonic()
                
                if time_until_leak > 0:
                    await asyncio.sleep(time_until_leak)
                
                # After waiting, retry acquiring
                # A recursive call or a loop with re-checking is safer
                await self.acquire() # Recursively call to re-check after delay

    async def make_api_request(self, request_id: int):
        await self.acquire()
        print(f"[{time.monotonic():.2f}] Making API request {request_id}. Bucket size: {len(self.bucket)}/{self.capacity}")
        await asyncio.sleep(0.1) # Simulate network latency or actual API call

async def main():
    # Allow 5 requests bursts, then 2 requests per second
    limiter = LeakyBucketRateLimiter(capacity=5, leak_rate=2) 
    
    print("Sending 15 requests rapidly...")
    tasks = [limiter.make_api_request(i) for i in range(1, 16)]
    await asyncio.gather(*tasks)
    print("All requests initiated.")

if __name__ == "__main__":
    asyncio.run(main())
How it works: This Python snippet implements a Leaky Bucket algorithm for rate-limiting outgoing API requests using `asyncio`. The `LeakyBucketRateLimiter` class simulates a bucket with a fixed `capacity` that "leaks" at a constant `leak_rate` (requests per second). Before making an API call, the `acquire` method attempts to add a "token" (representing a request) to the bucket. If the bucket is full, it calculates the necessary delay until a slot becomes available due to "leaking" and asynchronously waits. This ensures that the application doesn't exceed an external API's rate limits, preventing errors and potential IP bans, while also allowing for short bursts of requests up to the bucket's capacity.

Need help integrating this into your project?

Our team of expert developers can help you build your custom application from scratch.

Hire DigitalCodeLabs