Redis与FastApi
# 引言
Redis 以其超快的内存数据存储能力而闻名,是高性能 Web 应用程序的绝佳搭档。在本文中,我们将介绍如何将 Redis 与 FastAPI 集成,并通过实际示例和用例展示其强大功能。
# 为什么要在 FastAPI 中使用 Redis?
FastAPI 是一个现代的 Python 框架,旨在高效构建 API。将 FastAPI 与 Redis 结合使用可以带来诸多好处:
- 缓存:通过缓存频繁访问的数据,减少数据库负载。
- 会话管理:高效存储用户会话,用于身份验证。
- 限流:防止 API 被滥用。
- 发布/订阅(Pub/Sub):启用实时通信功能。 在本文中,我们将实现使用 Redis 的缓存功能,但在开始之前,我们先使用 Docker 设置 Redis。如果你尚未安装 Docker,请前往 Docker 官网 并为你的操作系统安装 Docker。
# 使用 Docker 设置 Redis
运行以下命令,拉取最新的 Redis 镜像(如果尚未下载),并在一个容器中启动 Redis:
docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
要检查 Redis 容器是否正在运行,执行以下命令:
docker ps
# 将 Redis 与 FastAPI 集成
我们先设置一个简单的 FastAPI 应用程序,并集成 Redis。
## decorator.py
from functools import wraps
from hashlib import sha256
import json
import redis
redis_client = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)
app_cache_key = "app_cache"
def cache_decorator(expire=3600):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 根据函数名和参数生成唯一的缓存键
key = sha256(json.dumps((func.__name__, args, kwargs), sort_keys=True).encode()).hexdigest()
cached_data = redis_client.get(f"{key}_{app_cache_key}")
if cached_data:
return json.loads(cached_data)
# 等待异步函数的结果
result = await func(*args, **kwargs)
# 序列化并缓存结果
redis_client.set(f"{key}_{app_cache_key}", json.dumps(result), ex=expire)
return result
return wrapper
return decorator
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
- 这个装饰器会根据函数名和参数生成一个唯一的缓存键,检查 Redis 中是否有缓存数据,并返回缓存结果或计算并缓存结果,然后返回最新结果。
- 每个缓存条目都可以设置过期时间,确保自动清理,也可以手动删除。
- 你可能会好奇为什么我们在生成的哈希键后面添加了后缀。
- 原因很简单:哈希键是根据函数名和参数生成的。在删除缓存时,重新构造确切的哈希键可能会很困难,因为虽然函数名已知,但重现确切的参数并不总是可行的。
## main.py
from fastapi import FastAPI, HTTPException
import asyncio
from decorator import cache_decorator, redis_client
app = FastAPI()
@app.post("/get_details")
@cache_decorator(expire=3600)
async def get_details(body: dict):
await asyncio.sleep(2)
return {"data": body}
@app.delete("/delete_keys_with_suffix/{suffix}")
async def delete_keys_with_suffix(suffix: str):
# 使用 SCAN 查找以指定后缀结尾的键
keys_to_delete = []
cursor = 0
while True:
cursor, keys = redis_client.scan(cursor=cursor, match=f"*{suffix}")
keys_to_delete.extend(keys)
if cursor == 0:
break
if not keys_to_delete:
raise HTTPException(status_code=404, detail=f"No keys ending with '{suffix}' found")
# 删除这些键
deleted_count = redis_client.delete(*keys_to_delete)
return {"message": f"Deleted {deleted_count} keys ending with '{suffix}'"}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- /get_details 端点展示了如何通过缓存提升性能,模拟了一个耗时操作,并将结果存储在 Redis 中。
- /delete_keys_with_suffix/{suffix} 端点提供了一个工具,用于删除所有以特定后缀结尾的 Redis 键。
# 应用场景
借助redis的持久化能力,我们可以应用在许多场景,如缓存、限流、任务队列、分布式锁等。下面看几个例子。
# 缓存接口数据,减轻数据库压力
@app.get("/user/{user_id}")
def get_user(user_id: int):
cache_key = f"user:{user_id}"
cached = redis_client.get(cache_key)
if cached:
return {"data": cached, "cached": True}
# 模拟从数据库获取数据
user_data = {"id": user_id, "name": f"User{user_id}"}
redis_client.set(cache_key, str(user_data), ex=300) # 缓存5分钟
return {"data": user_data, "cached": False}
2
3
4
5
6
7
8
9
10
11
12
解析: • 先查缓存,命中即返回,避免查询数据库 • set(..., ex=300) 表示缓存 300 秒 • 用字符串模拟数据库返回,可配合 json.dumps()/json.loads() 存取复杂结构
# 接口限流:控制用户访问频率
@app.middleware("http")
async def rate_limiter(request: Request, call_next):
ip = request.client.host
key = f"rate:{ip}"
count = redis_client.get(key)
if count and int(count) >= 10:
raise HTTPException(status_code=429, detail="请求过于频繁")
redis_client.incr(key, amount=1)
redis_client.expire(key, 10)
return await call_next(request)
2
3
4
5
6
7
8
9
10
11
12
13
解析: • 每个 IP 一个独立的 Redis key:rate:127.0.0.1 • incr 自增访问次数;第一次访问 key 会自动创建 • 设置 expire,10 秒后自动重置访问次数 • 超过阈值直接返回 429 错误码(Too Many Requests) 可进一步扩展为滑动窗口或漏桶算法限流
# 使用 Redis 实现异步任务队列
@app.post("/register")
async def register_user(email: str):
redis_client.lpush("mail_queue", email)
return {"msg": "注册成功,稍后将发送通知邮件"}
2
3
4
解析: • lpush 将新邮件任务压入左侧(队头) • Redis 列表天然适合作为 FIFO 队列 • 异步执行邮件通知任务可以解耦业务流程,提高接口响应速度
配套的 Worker 示例:
def mail_worker():
while True:
email = redis_client.rpop("mail_queue") # 从右侧弹出任务
if email:
print(f"给 {email} 发送注册成功邮件...")
time.sleep(1)
2
3
4
5
6
解析: • rpop 从队列尾部取任务,实现先进先出 • 后台可用线程、子进程、或 Celery 实现并发 worker • 处理完毕即可销毁任务,天然具备幂等性和可追踪性
# 分布式锁防止任务重复执行
@app.get("/lock")
def do_task():
lock_key = "lock:task"
lock_id = str(uuid.uuid4())
if redis_client.set(lock_key, lock_id, nx=True, ex=5):
try:
# 执行任务
return {"status": "任务执行成功"}
finally:
if redis_client.get(lock_key) == lock_id:
redis_client.delete(lock_key)
else:
return {"status": "任务进行中,请稍后再试"}
2
3
4
5
6
7
8
9
10
11
12
13
14
解析: • set(..., nx=True):仅在不存在时设置,达到加锁效果 • ex=5:自动过期防止死锁 • 使用 uuid 避免误删其他任务的锁(分布式安全性保障) • 在 finally 中释放锁,确保任务结束就释放资源 ✅ 建议使用 Lua 脚本来实现「判断+删除」的原子性,避免并发误删。
# 结论
将 Redis 与 FastAPI 集成,为创建高性能和可扩展的应用程序开辟了可能性。无论是缓存、会话管理还是实时更新,Redis 都是一个强大的工具,能够无缝补充 FastAPI 的能力。

