FastAPI, a modern web framework for building APIs with Python, offers a robust platform for developing high-performance web applications. Managing the rate at which clients can access your API endpoints is essential to prevent abuse, manage server load, and ensure fair usage of resources. It is a best practice not just for your externally exposed applications but also for internal applications.
Implementing rate limiting in FastAPI might seem like a daunting task for some developers. However, fear not, as we’re about to explore a quick and straightforward approach to integrating rate limiting into your FastAPI projects.
We will use a simple Python module called fastapi-limiter.
Install dependencies
$ pip install fastapi-limiter uvicorn
Run Redis using a docker container (I am assuming you have docker or rancher desktop installed). fastapi-limiter
uses redis to maintain and update the rate limit state.
$ docker run --name my-redis-server -p 6379:6379 -d redis redis-server --loglevel warning
The full Python file is at the end of the post but first I will discuss the specific parts.
First, we create a FastAPI object with a lifespan argument to ensure the proper creation and handling of the Redis and FastAPI limiter objects. Two important arguments to pay attention to identifier
and http_callback
. identifier
is the identifier for the route limit. By default, the limits are applied based on the IP. But we will use a custom identifier just for illustrative purposes. http_callback
is the Callback when access is forbidden. The default is to raise HTTPException
with 429
status code. Again we will use a custom callback for illustrative purposes.
@asynccontextmanager
async def lifespan(_: FastAPI):
redis_connection = redis.from_url(REDIS_URL, encoding="utf8")
await FastAPILimiter.init(
redis=redis_connection,
identifier=service_name_identifier,
http_callback=custom_callback,
)
yield
await FastAPILimiter.close()
app = FastAPI(lifespan=lifespan)
Our identifier
is using a custom header Service-Name
. The custom_callback
is modifying the default message to also tell the user when to retry again.
async def service_name_identifier(request: Request):
service = request.headers.get("Service-Name")
return service
async def custom_callback(request: Request, response: Response, pexpire: int):
"""
default callback when too many requests
:param request:
:param pexpire: The remaining milliseconds
:param response:
:return:
"""
expire = ceil(pexpire / 1000)
raise HTTPException(
status.HTTP_429_TOO_MANY_REQUESTS,
f"Too Many Requests. Retry after {expire} seconds.",
headers={"Retry-After": str(expire)},
)
Let’s create three endpoints. One without any rate limits and two with different rate limits. Let’s assume here that the /upload
endpoint is going to be heavily and more costly for the server than the /search
endpoint.
@app.get("/")
async def index():
return {"msg": "This endpoint has no limits."}
@app.get("/search", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def search_handler(request: Request):
return {"msg": "This endpoint has a rate limit of 2 requests per 5 seconds."}
@app.get("/upload", dependencies=[Depends(RateLimiter(times=2, seconds=10))])
async def upload_handler(request: Request):
return {"msg": "This endpoint has a rate limit of 2 requests per 10 seconds."}
Testing
Here’s a simple test to send 5 requests
#!/bin/bash
ENDPOINT=${1:-"/"}
SLEEP=${2:-"1"}
# Send a request in $SLEEP seconds with a loop
for i in {1..5}; do
curl -H "Service-Name: service-1" http://127.0.0.1:8000${ENDPOINT}
echo
sleep $SLEEP
done
If you try this script you will see the expected behavior. For example, sending a request to /upload
, every 2 seconds would mean that your first 2 requests go through but the rest run into rate limiting.
$ ./testing.sh /upload 2
{"msg":"This endpoint has a rate limit of 2 requests per 10 seconds."}
{"msg":"This endpoint has a rate limit of 2 requests per 10 seconds."}
{"detail":"Too Many Requests. Retry after 6 seconds."}
{"detail":"Too Many Requests. Retry after 4 seconds."}
{"detail":"Too Many Requests. Retry after 2 seconds."}
If you send requests using different Service-Name
headers in parallel you’ll find rate limits are applied separately on them.
Full Python file
from math import ceil
import redis.asyncio as redis
import uvicorn
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from fastapi import status
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
REDIS_URL = "redis://127.0.0.1:6379"
async def service_name_identifier(request: Request):
service = request.headers.get("Service-Name")
return service
async def custom_callback(request: Request, response: Response, pexpire: int):
"""
default callback when too many requests
:param request:
:param pexpire: The remaining milliseconds
:param response:
:return:
"""
expire = ceil(pexpire / 1000)
raise HTTPException(
status.HTTP_429_TOO_MANY_REQUESTS,
f"Too Many Requests. Retry after {expire} seconds.",
headers={"Retry-After": str(expire)},
)
@asynccontextmanager
async def lifespan(_: FastAPI):
redis_connection = redis.from_url(REDIS_URL, encoding="utf8")
await FastAPILimiter.init(
redis=redis_connection,
identifier=service_name_identifier,
http_callback=custom_callback,
)
yield
await FastAPILimiter.close()
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def index():
return {"msg": "This endpoint has no limits."}
@app.get("/search", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def search_handler(request: Request):
return {"msg": "This endpoint has a rate limit of 2 requests per 5 seconds."}
@app.get("/upload", dependencies=[Depends(RateLimiter(times=2, seconds=10))])
async def upload_handler(request: Request):
return {"msg": "This endpoint has a rate limit of 2 requests per 10 seconds."}
if __name__ == "__main__":
uvicorn.run(app="server:app", reload=True)