How to implement token bucket rate limiting with FastAPI

by SkillAiNest

APIs power everything from mobile apps to enterprise platforms, silently handling millions of requests every day. Without security measures in place, a misconfigured client or a burst of automated traffic can overwhelm your service, causing poor performance for everyone.

Rate limiting prevents this. It controls how many requests a client can make within a given time, protecting your infrastructure from both intentional misuse and accidental overload.

Among the various algorithms used for rate limiting, Token bucket Stands out for its balance of simplicity and flexibility. Unlike fixed window counters that reset abruptly, token buckets allow short bursts of traffic while enforcing a sustainable long-term rate. This makes it a practical choice for APIs where clients occasionally need to send quick requests without penalty.

In this guide, you will implement a token bucket rate limiter in a FastAPI application. You’ll build the algorithm from scratch as a Python class, wire it into FastAPI as middleware with per-user tracking, add standard rate-limit headers to your responses, and test everything with a simple script. By the end, you’ll have a working rate limiter that you can add to any FastAPI project.

What we will cover:

  1. Conditions

  2. Understanding the Token Bucket Algorithm

  3. Setting up a Fast API project

  4. Implementing the TokenBucket class

  5. Adding Per-User Rate-Limiting Middleware

  6. Checking the rate limit

  7. Where the rate limit fits into your architecture.

  8. The result

Conditions

To follow this tutorial, you will need:

  • Python 3.9 or later installed on your machine. You can verify your version by running python --version.

  • Familiarity with Python and basic knowledge of how HTTP APIs work.

  • A text editor Like VS Code, Vim, or any editor you prefer.

Understanding the Token Bucket Algorithm

Before writing code, it helps to understand the procedure you’re building.

The token bucket algorithm model is rate constrained with two simple concepts: a The bucket which holds the token, and a Replenishment process which adds tokens at a fixed rate.

Here’s how it works:

  1. The bucket starts full, containing a fixed maximum number of tokens (capacity).

  2. Each incoming request costs one token. If there are tokens available in the bucket, the request is allowed, and a token will be removed.

  3. If the bucket is empty, the request is rejected with a. 429 Too Many Requests Answer

  4. Tokens are added back to the bucket at a constant replenishment rate, regardless of the number of requests coming in. A bucket never exceeds its maximum capacity

Capacity determines how large a burst the system can absorb. The refill rate defines the sustained throughput. For example, a bucket with a capacity of 10 and a refill rate of 2 tokens per second allows a client to fire 10 requests immediately, but after that, it can only make 2 requests per second until the bucket is refilled.

This two-parameter design gives you precise control over:

ParameterControls.Example
Ability (max tokens)Maximum burst size10 tokens = 10 requests at once
Refill rateSustained throughput2 tokens/sec = 2 requests/sec long term
Replenishment intervalGranularity of the refill1.0 seconds = tokens added every second.

Compared to other rate-limiting algorithms:

  • Fixed window The counters reset at tight intervals (for example, every minute), which can allow doubling of the desired rate at the edges of the window. The token bucket has no such limit.

  • Sliding window Counters are more accurate but more complex to implement and maintain.

  • The leaky bucket Processes requests at a fixed rate and queues the rest. A token bucket is similar, but allows bursts instead of forcing a constant speed.

Token buckets are widely used in production systems. AWS API Gateway, NGINX, and Stripe all use variations of it.

Setting up a Fast API project

Create the project directory and install the dependencies:

mkdir fastapi-ratelimit && cd fastapi-ratelimit

Create and activate the virtual environment:

python -m venv venv

On Linux/macOS:

source venv/bin/activate

On Windows:

venv\Scripts\activate

Install FastAPI and Uvicorn:

pip install fastapi uvicorn

Create the project file structure:

fastapi-ratelimit/
├── main.py
└── ratelimiter.py

make main.py With a minimal Fast API application:

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello, world!"}

Start the server to verify the setup:

uvicorn main:app --reload

You should see output similar to this:

INFO:     Uvicorn running on  (Press CTRL+C to quit)
INFO:     Started reloader process

Open in your browser. Or run curl . You should receive:

{"message": "Hello, world!"}

With the project running, you can proceed to create the rate limiter.

Implementing the TokenBucket class

open ratelimiter.py In your editor, add the following code. This class implements the token bucket algorithm with thread-safe operations:

import time
import threading


class TokenBucket:
    """
    Token Bucket rate limiter.

    Each bucket starts full at `max_tokens` and refills `refill_rate`
    tokens every `interval` seconds, up to the maximum capacity.
    """

    def __init__(self, max_tokens: int, refill_rate: int, interval: float):
        """
        Initialize a new Token Bucket.

        :param max_tokens: Maximum number of tokens the bucket can hold (burst capacity).
        :param refill_rate: Number of tokens added per refill interval.
        :param interval: Time in seconds between refills.
        """
        assert max_tokens > 0, "max_tokens must be positive"
        assert refill_rate > 0, "refill_rate must be positive"
        assert interval > 0, "interval must be positive"

        self.max_tokens = max_tokens
        self.refill_rate = refill_rate
        self.interval = interval

        self.tokens = max_tokens
        self.refilled_at = time.time()
        self.lock = threading.Lock()

    def _refill(self):
        """Add tokens based on elapsed time since the last refill."""
        now = time.time()
        elapsed = now - self.refilled_at

        if elapsed >= self.interval:
            num_refills = int(elapsed // self.interval)
            self.tokens = min(
                self.max_tokens,
                self.tokens + num_refills * self.refill_rate
            )
            # Advance the timestamp by the number of full intervals consumed,
            # not to `now`, so partial intervals aren't lost.
            self.refilled_at += num_refills * self.interval

    def allow_request(self, tokens: int = 1) -> bool:
        """
        Attempt to consume `tokens` from the bucket.

        Returns True if the request is allowed, False if the bucket
        does not have enough tokens.
        """
        with self.lock:
            self._refill()

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def get_remaining(self) -> int:
        """Return the current number of available tokens."""
        with self.lock:
            self._refill()
            return self.tokens

    def get_reset_time(self) -> float:
        """Return the Unix timestamp when the next refill occurs."""
        with self.lock:
            return self.refilled_at + self.interval

The class has three public methods:

  • allow_request() is the basic method. It refills the token based on the elapsed time, then tries to use it. It returns. True If the application is allowed, False If the bucket is empty.

  • get_remaining() Returns the number of tokens left by the client. You’ll use this for response headings.

  • get_reset_time() Returns when the next token is added. This is also reflected in the response headers.

gave threading.Lock Ensures that concurrent requests do not cause race conditions when reading or modifying the token count. This is important because FastAPI runs request handlers concurrently.

Note: This implementation stores the state of the bucket in memory. If you restart the server, all buckets are reset. For restarts or persistence across multiple server instances, you’ll store the token count in Redis or a similar external store. The in-memory approach is sufficient for single instance deployments.

Adding Per-User Rate-Limiting Middleware

A global bucket would strangle all consumers together. A heavy user can override the limit for everyone. Instead, you would assign each user a separate bucket, identified by their IP address.

Add the following. ratelimiter.pyunder the TokenBucket class:

from collections import defaultdict


class RateLimiterStore:
    """
    Manages per-user Token Buckets.

    Each unique client key (e.g., IP address) gets its own bucket
    with identical parameters.
    """

    def __init__(self, max_tokens: int, refill_rate: int, interval: float):
        self.max_tokens = max_tokens
        self.refill_rate = refill_rate
        self.interval = interval
        self._buckets: dict(str, TokenBucket) = {}
        self._lock = threading.Lock()

    def get_bucket(self, key: str) -> TokenBucket:
        """
        Return the TokenBucket for a given client key.
        Creates a new bucket if one does not exist yet.
        """
        with self._lock:
            if key not in self._buckets:
                self._buckets(key) = TokenBucket(
                    max_tokens=self.max_tokens,
                    refill_rate=self.refill_rate,
                    interval=self.interval,
                )
            return self._buckets(key)

Open now. main.py and replace its contents with the complete application, including rate-limiting middleware:

import time

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

from ratelimiter import RateLimiterStore

app = FastAPI()

# Configure rate limits: 10 requests burst, 2 tokens added every 1 second.
limiter = RateLimiterStore(max_tokens=10, refill_rate=2, interval=1.0)


@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    """
    Middleware that enforces per-IP rate limiting on every request.
    Adds standard rate limit headers to every response.
    """
    # Identify the client by IP address.
    client_ip = request.client.host
    bucket = limiter.get_bucket(client_ip)

    # Check if the client has tokens available.
    if not bucket.allow_request():
        retry_after = bucket.get_reset_time() - time.time()
        return JSONResponse(
            status_code=429,
            content={"detail": "Too many requests. Try again later."},
            headers={
                "Retry-After": str(max(1, int(retry_after))),
                "X-RateLimit-Limit": str(bucket.max_tokens),
                "X-RateLimit-Remaining": str(bucket.get_remaining()),
                "X-RateLimit-Reset": str(int(bucket.get_reset_time())),
            },
        )

    # Request is allowed. Process it and add rate limit headers to the response.
    response = await call_next(request)
    response.headers("X-RateLimit-Limit") = str(bucket.max_tokens)
    response.headers("X-RateLimit-Remaining") = str(bucket.get_remaining())
    response.headers("X-RateLimit-Reset") = str(int(bucket.get_reset_time()))
    return response


@app.get("/")
async def root():
    return {"message": "Hello, world!"}


@app.get("/data")
async def get_data():
    return {"data": "Some important information"}


@app.get("/health")
async def health():
    return {"status": "ok"}

The middleware does the following on each incoming request:

  1. Extracts the client’s IP address from request.client.host.

  2. Retrieves (or creates) this client’s token bucket from the store.

  3. Calls allow_request(). If the bucket is empty, it returns. 429 Answer with one Retry-After The header is telling the client how long to wait.

  4. If tokens are available, it normally processes the request and appends a Rate Limit header to the response.

All three X-RateLimit-* Headers follow one. A widely adopted convention:

HeaderMeaning
X-RateLimit-LimitMaximum Burst Capacity (Maximum Tokens)
X-RateLimit-RemainingThere are tokens left in the current bucket.
X-RateLimit-ResetUnix timestamp when next refill occurs.

These headers allow well-behaved clients to throttle themselves before reaching the limit.

Checking the rate limit

If the server is not already running, restart it:

uvicorn main:app --reload

Manual testing with curl

Manual testing with curl Useful during development when you want to quickly verify that your middleware is working. A request lets you verify that the rate limit headers are present, the values ​​are correct, and that a token has been used as expected.

This approach is fast and requires no additional setup, making it ideal for spot checking your configuration after making changes.

Send a request and inspect the response:

curl -i /data

You should see a 200 Reply with headers like:

HTTP/1.1 200 OK
x-ratelimit-limit: 10
x-ratelimit-remaining: 9
x-ratelimit-reset: 1739836801

Automatic burst test

While curl Confirms that the rate limiter is enabled, it cannot confirm that the limiter actually blocks requests when the bucket is empty. For this, you need to send requests faster than the refill rate and observe it. 429 Answers An automated burst test is essential before deploying to production, after changing your bucket parameters, or when you need to verify both blocking and refilling behavior.

Create a file named test_ratelimit.py In your project directory:

import requests
import time


def test_burst():
    """Send 15 rapid requests to trigger the rate limit."""
    url = "/data"
    results = ()

    for i in range(15):
        response = requests.get(url)
        remaining = response.headers.get("X-RateLimit-Remaining", "N/A")
        results.append((i + 1, response.status_code, remaining))
        print(f"Request {i+1:2d} | Status: {response.status_code} | Remaining: {remaining}")

    print()

    allowed = sum(1 for _, status, _ in results if status == 200)
    blocked = sum(1 for _, status, _ in results if status == 429)
    print(f"Allowed: {allowed}, Blocked: {blocked}")


def test_refill():
    """Exhaust tokens, wait for a refill, then confirm requests succeed again."""
    url = "/data"

    print("\n--- Exhausting tokens ---")
    for i in range(12):
        response = requests.get(url)
        print(f"Request {i+1:2d} | Status: {response.status_code}")

    print("\n--- Waiting 3 seconds for refill ---")
    time.sleep(3)

    print("\n--- Sending requests after refill ---")
    for i in range(5):
        response = requests.get(url)
        remaining = response.headers.get("X-RateLimit-Remaining", "N/A")
        print(f"Request {i+1:2d} | Status: {response.status_code} | Remaining: {remaining}")


if __name__ == "__main__":
    print("=== Burst Test ===")
    test_burst()

    # Allow bucket to refill before next test
    time.sleep(6)

    print("\n=== Refill Test ===")
    test_refill()

Install. requests library if you don’t have it:

pip install requests

Run the test:

python test_ratelimit.py

You should see output similar to this:

=== Burst Test ===
Request  1 | Status: 200 | Remaining: 9
Request  2 | Status: 200 | Remaining: 8
Request  3 | Status: 200 | Remaining: 7
...
Request 10 | Status: 200 | Remaining: 0
Request 11 | Status: 429 | Remaining: 0
Request 12 | Status: 429 | Remaining: 0
...
Request 15 | Status: 429 | Remaining: 0

Allowed: 10, Blocked: 5

The first 10 requests are successful (one token from the entire bucket each). Requests from 11 to 15 are rejected because the bucket is empty. The replenishment test then verifies that after waiting, tokens reappear and requests succeed again.

Note: The exact split between allowed and blocked requests may vary slightly over time. Tokens can quickly refill between requests. This is expected behavior.

Where the rate limit fits into your architecture.

The implementation in this tutorial runs inside your application process, which is the simplest approach and works well for single-instance deployments. In large systems, rate limiting usually occurs at multiple layers:

  • API Gateway level (NGINX, Kong, Traefik, Envoy): A coarse global rate limit applied to all traffic before it reaches your application. It protects against massive abuse and DDoS.

  • Application level (This tutorial): Fine per-user or per-endpoint limits within your service. This is useful for enforcing different quotas at different API levels.

  • both: Many production systems combine a gateway-level global limiter with an in-app per-user limiter. The gateway captures the flood and the application enforces the business rules.

For multi-instance deployments (multiple server processes behind a load balancer), in-memory RateLimiterStore Under no circumstances will the state share. In this case, replace the in-memory dictionary with Redis. The token bucket logic remains the same – only the storage layer changes.

The result

In this guide, you created a token bucket rate limiter from scratch and integrated it into a FastAPI application with per-user tracking and standard rate limit response headers. You also test the implementation to verify that the burst capability and refill behavior work as expected.

The token bucket algorithm gives you two-way control, burst tolerance capability and refill rate for sustained throughput, meeting the majority of rate-limiting requirements.

From here, you can expand the base:

  • Replacing an in-memory store with Redis for multi-instance deployments.

  • Applying different rate limits per endpoint by creating separate RateLimiterStore Examples

  • Using authentic user IDs instead of IP addresses for more accurate client identification.

  • Adding and logging metrics to track how often clients are being throttled.

You may also like

Leave a Comment

At Skillainest, we believe the future belongs to those who embrace AI, upgrade their skills, and stay ahead of the curve.

Get latest news

Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

@2025 Skillainest.Designed and Developed by Pro