Quick and easy rate limiting for FastAPI Apps

Bilal
4 min readMar 4, 2024

--

FastAPI, a modern web framework for building APIs with Python, offers a robust platform for developing high-performance web applications. Managing the rate at which clients can access your API endpoints is essential to prevent abuse, manage server load, and ensure fair usage of resources. It is a best practice not just for your externally exposed applications but also for internal applications.

Implementing rate limiting in FastAPI might seem like a daunting task for some developers. However, fear not, as we’re about to explore a quick and straightforward approach to integrating rate limiting into your FastAPI projects.

Photo by Ilya Pavlov on Unsplash

We will use a simple Python module called fastapi-limiter.

Install dependencies

$ pip install fastapi-limiter uvicorn

Run Redis using a docker container (I am assuming you have docker or rancher desktop installed). fastapi-limiter uses redis to maintain and update the rate limit state.

$ docker run --name my-redis-server -p 6379:6379 -d redis redis-server  --loglevel warning

The full Python file is at the end of the post but first I will discuss the specific parts.

First, we create a FastAPI object with a lifespan argument to ensure the proper creation and handling of the Redis and FastAPI limiter objects. Two important arguments to pay attention to identifier and http_callback. identifier is the identifier for the route limit. By default, the limits are applied based on the IP. But we will use a custom identifier just for illustrative purposes. http_callback is the Callback when access is forbidden. The default is to raise HTTPException with 429 status code. Again we will use a custom callback for illustrative purposes.

@asynccontextmanager
async def lifespan(_: FastAPI):
redis_connection = redis.from_url(REDIS_URL, encoding="utf8")
await FastAPILimiter.init(
redis=redis_connection,
identifier=service_name_identifier,
http_callback=custom_callback,
)
yield
await FastAPILimiter.close()


app = FastAPI(lifespan=lifespan)

Our identifier is using a custom header Service-Name. The custom_callback is modifying the default message to also tell the user when to retry again.

async def service_name_identifier(request: Request):
service = request.headers.get("Service-Name")
return service


async def custom_callback(request: Request, response: Response, pexpire: int):
"""
default callback when too many requests
:param request:
:param pexpire: The remaining milliseconds
:param response:
:return:
"""
expire = ceil(pexpire / 1000)

raise HTTPException(
status.HTTP_429_TOO_MANY_REQUESTS,
f"Too Many Requests. Retry after {expire} seconds.",
headers={"Retry-After": str(expire)},
)

Let’s create three endpoints. One without any rate limits and two with different rate limits. Let’s assume here that the /upload endpoint is going to be heavily and more costly for the server than the /search endpoint.

@app.get("/")
async def index():
return {"msg": "This endpoint has no limits."}


@app.get("/search", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def search_handler(request: Request):
return {"msg": "This endpoint has a rate limit of 2 requests per 5 seconds."}


@app.get("/upload", dependencies=[Depends(RateLimiter(times=2, seconds=10))])
async def upload_handler(request: Request):
return {"msg": "This endpoint has a rate limit of 2 requests per 10 seconds."}

Testing

Here’s a simple test to send 5 requests

#!/bin/bash

ENDPOINT=${1:-"/"}
SLEEP=${2:-"1"}

# Send a request in $SLEEP seconds with a loop
for i in {1..5}; do
curl -H "Service-Name: service-1" http://127.0.0.1:8000${ENDPOINT}
echo
sleep $SLEEP
done

If you try this script you will see the expected behavior. For example, sending a request to /upload, every 2 seconds would mean that your first 2 requests go through but the rest run into rate limiting.

$ ./testing.sh /upload 2
{"msg":"This endpoint has a rate limit of 2 requests per 10 seconds."}
{"msg":"This endpoint has a rate limit of 2 requests per 10 seconds."}
{"detail":"Too Many Requests. Retry after 6 seconds."}
{"detail":"Too Many Requests. Retry after 4 seconds."}
{"detail":"Too Many Requests. Retry after 2 seconds."}

If you send requests using different Service-Name headers in parallel you’ll find rate limits are applied separately on them.

Full Python file

from math import ceil
import redis.asyncio as redis
import uvicorn
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from fastapi import status
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter

REDIS_URL = "redis://127.0.0.1:6379"


async def service_name_identifier(request: Request):
service = request.headers.get("Service-Name")
return service


async def custom_callback(request: Request, response: Response, pexpire: int):
"""
default callback when too many requests
:param request:
:param pexpire: The remaining milliseconds
:param response:
:return:
"""
expire = ceil(pexpire / 1000)

raise HTTPException(
status.HTTP_429_TOO_MANY_REQUESTS,
f"Too Many Requests. Retry after {expire} seconds.",
headers={"Retry-After": str(expire)},
)


@asynccontextmanager
async def lifespan(_: FastAPI):
redis_connection = redis.from_url(REDIS_URL, encoding="utf8")
await FastAPILimiter.init(
redis=redis_connection,
identifier=service_name_identifier,
http_callback=custom_callback,
)
yield
await FastAPILimiter.close()


app = FastAPI(lifespan=lifespan)


@app.get("/")
async def index():
return {"msg": "This endpoint has no limits."}


@app.get("/search", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def search_handler(request: Request):
return {"msg": "This endpoint has a rate limit of 2 requests per 5 seconds."}


@app.get("/upload", dependencies=[Depends(RateLimiter(times=2, seconds=10))])
async def upload_handler(request: Request):
return {"msg": "This endpoint has a rate limit of 2 requests per 10 seconds."}


if __name__ == "__main__":
uvicorn.run(app="server:app", reload=True)

--

--

Bilal

Learning new things everyday. Writing about things that I learn and observe. PhD in computer science. https://www.linkedin.com/in/mbilalce/