APIs power everything from mobile apps to enterprise platforms, silently handling millions of requests every day. Without security measures in place, a misconfigured client or a burst of automated traffic can overwhelm your service, causing poor performance for everyone.
Rate limiting prevents this. It controls how many requests a client can make within a given time, protecting your infrastructure from both intentional misuse and accidental overload.
Among the various algorithms used for rate limiting, Token bucket Stands out for its balance of simplicity and flexibility. Unlike fixed window counters that reset abruptly, token buckets allow short bursts of traffic while enforcing a sustainable long-term rate. This makes it a practical choice for APIs where clients occasionally need to send quick requests without penalty.
In this guide, you will implement a token bucket rate limiter in a FastAPI application. You’ll build the algorithm from scratch as a Python class, wire it into FastAPI as middleware with per-user tracking, add standard rate-limit headers to your responses, and test everything with a simple script. By the end, you’ll have a working rate limiter that you can add to any FastAPI project.
What we will cover:
Conditions
To follow this tutorial, you will need:
Python 3.9 or later installed on your machine. You can verify your version by running
python --version.Familiarity with Python and basic knowledge of how HTTP APIs work.
A text editor Like VS Code, Vim, or any editor you prefer.
Understanding the Token Bucket Algorithm
Before writing code, it helps to understand the procedure you’re building.
The token bucket algorithm model is rate constrained with two simple concepts: a The bucket which holds the token, and a Replenishment process which adds tokens at a fixed rate.
Here’s how it works:
The bucket starts full, containing a fixed maximum number of tokens (capacity).
Each incoming request costs one token. If there are tokens available in the bucket, the request is allowed, and a token will be removed.
If the bucket is empty, the request is rejected with a.
429 Too Many RequestsAnswerTokens are added back to the bucket at a constant replenishment rate, regardless of the number of requests coming in. A bucket never exceeds its maximum capacity
Capacity determines how large a burst the system can absorb. The refill rate defines the sustained throughput. For example, a bucket with a capacity of 10 and a refill rate of 2 tokens per second allows a client to fire 10 requests immediately, but after that, it can only make 2 requests per second until the bucket is refilled.
This two-parameter design gives you precise control over:
| Parameter | Controls. | Example |
|---|---|---|
| Ability (max tokens) | Maximum burst size | 10 tokens = 10 requests at once |
| Refill rate | Sustained throughput | 2 tokens/sec = 2 requests/sec long term |
| Replenishment interval | Granularity of the refill | 1.0 seconds = tokens added every second. |
Compared to other rate-limiting algorithms:
Fixed window The counters reset at tight intervals (for example, every minute), which can allow doubling of the desired rate at the edges of the window. The token bucket has no such limit.
Sliding window Counters are more accurate but more complex to implement and maintain.
The leaky bucket Processes requests at a fixed rate and queues the rest. A token bucket is similar, but allows bursts instead of forcing a constant speed.
Token buckets are widely used in production systems. AWS API Gateway, NGINX, and Stripe all use variations of it.
Setting up a Fast API project
Create the project directory and install the dependencies:
mkdir fastapi-ratelimit && cd fastapi-ratelimit
Create and activate the virtual environment:
python -m venv venv
On Linux/macOS:
source venv/bin/activate
On Windows:
venv\Scripts\activate
Install FastAPI and Uvicorn:
pip install fastapi uvicorn
Create the project file structure:
fastapi-ratelimit/
├── main.py
└── ratelimiter.py
make main.py With a minimal Fast API application:
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello, world!"}
Start the server to verify the setup:
uvicorn main:app --reload
You should see output similar to this:
INFO: Uvicorn running on (Press CTRL+C to quit)
INFO: Started reloader process
Open in your browser. Or run curl . You should receive:
{"message": "Hello, world!"}
With the project running, you can proceed to create the rate limiter.
Implementing the TokenBucket class
open ratelimiter.py In your editor, add the following code. This class implements the token bucket algorithm with thread-safe operations:
import time
import threading
class TokenBucket:
"""
Token Bucket rate limiter.
Each bucket starts full at `max_tokens` and refills `refill_rate`
tokens every `interval` seconds, up to the maximum capacity.
"""
def __init__(self, max_tokens: int, refill_rate: int, interval: float):
"""
Initialize a new Token Bucket.
:param max_tokens: Maximum number of tokens the bucket can hold (burst capacity).
:param refill_rate: Number of tokens added per refill interval.
:param interval: Time in seconds between refills.
"""
assert max_tokens > 0, "max_tokens must be positive"
assert refill_rate > 0, "refill_rate must be positive"
assert interval > 0, "interval must be positive"
self.max_tokens = max_tokens
self.refill_rate = refill_rate
self.interval = interval
self.tokens = max_tokens
self.refilled_at = time.time()
self.lock = threading.Lock()
def _refill(self):
"""Add tokens based on elapsed time since the last refill."""
now = time.time()
elapsed = now - self.refilled_at
if elapsed >= self.interval:
num_refills = int(elapsed // self.interval)
self.tokens = min(
self.max_tokens,
self.tokens + num_refills * self.refill_rate
)
# Advance the timestamp by the number of full intervals consumed,
# not to `now`, so partial intervals aren't lost.
self.refilled_at += num_refills * self.interval
def allow_request(self, tokens: int = 1) -> bool:
"""
Attempt to consume `tokens` from the bucket.
Returns True if the request is allowed, False if the bucket
does not have enough tokens.
"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def get_remaining(self) -> int:
"""Return the current number of available tokens."""
with self.lock:
self._refill()
return self.tokens
def get_reset_time(self) -> float:
"""Return the Unix timestamp when the next refill occurs."""
with self.lock:
return self.refilled_at + self.interval
The class has three public methods:
allow_request()is the basic method. It refills the token based on the elapsed time, then tries to use it. It returns.TrueIf the application is allowed,FalseIf the bucket is empty.get_remaining()Returns the number of tokens left by the client. You’ll use this for response headings.get_reset_time()Returns when the next token is added. This is also reflected in the response headers.
gave threading.Lock Ensures that concurrent requests do not cause race conditions when reading or modifying the token count. This is important because FastAPI runs request handlers concurrently.
Note: This implementation stores the state of the bucket in memory. If you restart the server, all buckets are reset. For restarts or persistence across multiple server instances, you’ll store the token count in Redis or a similar external store. The in-memory approach is sufficient for single instance deployments.
Adding Per-User Rate-Limiting Middleware
A global bucket would strangle all consumers together. A heavy user can override the limit for everyone. Instead, you would assign each user a separate bucket, identified by their IP address.
Add the following. ratelimiter.pyunder the TokenBucket class:
from collections import defaultdict
class RateLimiterStore:
"""
Manages per-user Token Buckets.
Each unique client key (e.g., IP address) gets its own bucket
with identical parameters.
"""
def __init__(self, max_tokens: int, refill_rate: int, interval: float):
self.max_tokens = max_tokens
self.refill_rate = refill_rate
self.interval = interval
self._buckets: dict(str, TokenBucket) = {}
self._lock = threading.Lock()
def get_bucket(self, key: str) -> TokenBucket:
"""
Return the TokenBucket for a given client key.
Creates a new bucket if one does not exist yet.
"""
with self._lock:
if key not in self._buckets:
self._buckets(key) = TokenBucket(
max_tokens=self.max_tokens,
refill_rate=self.refill_rate,
interval=self.interval,
)
return self._buckets(key)
Open now. main.py and replace its contents with the complete application, including rate-limiting middleware:
import time
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from ratelimiter import RateLimiterStore
app = FastAPI()
# Configure rate limits: 10 requests burst, 2 tokens added every 1 second.
limiter = RateLimiterStore(max_tokens=10, refill_rate=2, interval=1.0)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""
Middleware that enforces per-IP rate limiting on every request.
Adds standard rate limit headers to every response.
"""
# Identify the client by IP address.
client_ip = request.client.host
bucket = limiter.get_bucket(client_ip)
# Check if the client has tokens available.
if not bucket.allow_request():
retry_after = bucket.get_reset_time() - time.time()
return JSONResponse(
status_code=429,
content={"detail": "Too many requests. Try again later."},
headers={
"Retry-After": str(max(1, int(retry_after))),
"X-RateLimit-Limit": str(bucket.max_tokens),
"X-RateLimit-Remaining": str(bucket.get_remaining()),
"X-RateLimit-Reset": str(int(bucket.get_reset_time())),
},
)
# Request is allowed. Process it and add rate limit headers to the response.
response = await call_next(request)
response.headers("X-RateLimit-Limit") = str(bucket.max_tokens)
response.headers("X-RateLimit-Remaining") = str(bucket.get_remaining())
response.headers("X-RateLimit-Reset") = str(int(bucket.get_reset_time()))
return response
@app.get("/")
async def root():
return {"message": "Hello, world!"}
@app.get("/data")
async def get_data():
return {"data": "Some important information"}
@app.get("/health")
async def health():
return {"status": "ok"}
The middleware does the following on each incoming request:
Extracts the client’s IP address from
request.client.host.Retrieves (or creates) this client’s token bucket from the store.
Calls
allow_request(). If the bucket is empty, it returns.429Answer with oneRetry-AfterThe header is telling the client how long to wait.If tokens are available, it normally processes the request and appends a Rate Limit header to the response.
All three X-RateLimit-* Headers follow one. A widely adopted convention:
| Header | Meaning |
|---|---|
X-RateLimit-Limit | Maximum Burst Capacity (Maximum Tokens) |
X-RateLimit-Remaining | There are tokens left in the current bucket. |
X-RateLimit-Reset | Unix timestamp when next refill occurs. |
These headers allow well-behaved clients to throttle themselves before reaching the limit.
Checking the rate limit
If the server is not already running, restart it:
uvicorn main:app --reload
Manual testing with curl
Manual testing with curl Useful during development when you want to quickly verify that your middleware is working. A request lets you verify that the rate limit headers are present, the values ​​are correct, and that a token has been used as expected.
This approach is fast and requires no additional setup, making it ideal for spot checking your configuration after making changes.
Send a request and inspect the response:
curl -i /data
You should see a 200 Reply with headers like:
HTTP/1.1 200 OK
x-ratelimit-limit: 10
x-ratelimit-remaining: 9
x-ratelimit-reset: 1739836801
Automatic burst test
While curl Confirms that the rate limiter is enabled, it cannot confirm that the limiter actually blocks requests when the bucket is empty. For this, you need to send requests faster than the refill rate and observe it. 429 Answers An automated burst test is essential before deploying to production, after changing your bucket parameters, or when you need to verify both blocking and refilling behavior.
Create a file named test_ratelimit.py In your project directory:
import requests
import time
def test_burst():
"""Send 15 rapid requests to trigger the rate limit."""
url = "/data"
results = ()
for i in range(15):
response = requests.get(url)
remaining = response.headers.get("X-RateLimit-Remaining", "N/A")
results.append((i + 1, response.status_code, remaining))
print(f"Request {i+1:2d} | Status: {response.status_code} | Remaining: {remaining}")
print()
allowed = sum(1 for _, status, _ in results if status == 200)
blocked = sum(1 for _, status, _ in results if status == 429)
print(f"Allowed: {allowed}, Blocked: {blocked}")
def test_refill():
"""Exhaust tokens, wait for a refill, then confirm requests succeed again."""
url = "/data"
print("\n--- Exhausting tokens ---")
for i in range(12):
response = requests.get(url)
print(f"Request {i+1:2d} | Status: {response.status_code}")
print("\n--- Waiting 3 seconds for refill ---")
time.sleep(3)
print("\n--- Sending requests after refill ---")
for i in range(5):
response = requests.get(url)
remaining = response.headers.get("X-RateLimit-Remaining", "N/A")
print(f"Request {i+1:2d} | Status: {response.status_code} | Remaining: {remaining}")
if __name__ == "__main__":
print("=== Burst Test ===")
test_burst()
# Allow bucket to refill before next test
time.sleep(6)
print("\n=== Refill Test ===")
test_refill()
Install. requests library if you don’t have it:
pip install requests
Run the test:
python test_ratelimit.py
You should see output similar to this:
=== Burst Test ===
Request 1 | Status: 200 | Remaining: 9
Request 2 | Status: 200 | Remaining: 8
Request 3 | Status: 200 | Remaining: 7
...
Request 10 | Status: 200 | Remaining: 0
Request 11 | Status: 429 | Remaining: 0
Request 12 | Status: 429 | Remaining: 0
...
Request 15 | Status: 429 | Remaining: 0
Allowed: 10, Blocked: 5
The first 10 requests are successful (one token from the entire bucket each). Requests from 11 to 15 are rejected because the bucket is empty. The replenishment test then verifies that after waiting, tokens reappear and requests succeed again.
Note: The exact split between allowed and blocked requests may vary slightly over time. Tokens can quickly refill between requests. This is expected behavior.
Where the rate limit fits into your architecture.
The implementation in this tutorial runs inside your application process, which is the simplest approach and works well for single-instance deployments. In large systems, rate limiting usually occurs at multiple layers:
API Gateway level (NGINX, Kong, Traefik, Envoy): A coarse global rate limit applied to all traffic before it reaches your application. It protects against massive abuse and DDoS.
Application level (This tutorial): Fine per-user or per-endpoint limits within your service. This is useful for enforcing different quotas at different API levels.
both: Many production systems combine a gateway-level global limiter with an in-app per-user limiter. The gateway captures the flood and the application enforces the business rules.
For multi-instance deployments (multiple server processes behind a load balancer), in-memory RateLimiterStore Under no circumstances will the state share. In this case, replace the in-memory dictionary with Redis. The token bucket logic remains the same – only the storage layer changes.
The result
In this guide, you created a token bucket rate limiter from scratch and integrated it into a FastAPI application with per-user tracking and standard rate limit response headers. You also test the implementation to verify that the burst capability and refill behavior work as expected.
The token bucket algorithm gives you two-way control, burst tolerance capability and refill rate for sustained throughput, meeting the majority of rate-limiting requirements.
From here, you can expand the base:
Replacing an in-memory store with Redis for multi-instance deployments.
Applying different rate limits per endpoint by creating separate
RateLimiterStoreExamplesUsing authentic user IDs instead of IP addresses for more accurate client identification.
Adding and logging metrics to track how often clients are being throttled.