章工运维 章工运维
首页
  • linux
  • windows
  • 中间件
  • 监控
  • 网络
  • 存储
  • 安全
  • 防火墙
  • 数据库
  • 系统
  • docker
  • 运维工具
  • other
  • elk
  • K8S
  • ansible
  • Jenkins
  • GitLabCI_CD
  • 随笔
  • 面试
  • 工具
  • 收藏夹
  • Shell
  • python
  • golang
友链
  • 索引

    • 分类
    • 标签
    • 归档
    • 首页 (opens new window)
    • 关于我 (opens new window)
    • 图床 (opens new window)
    • 评论 (opens new window)
    • 导航栏 (opens new window)
周刊
GitHub (opens new window)

章工运维

业精于勤,荒于嬉
首页
  • linux
  • windows
  • 中间件
  • 监控
  • 网络
  • 存储
  • 安全
  • 防火墙
  • 数据库
  • 系统
  • docker
  • 运维工具
  • other
  • elk
  • K8S
  • ansible
  • Jenkins
  • GitLabCI_CD
  • 随笔
  • 面试
  • 工具
  • 收藏夹
  • Shell
  • python
  • golang
友链
  • 索引

    • 分类
    • 标签
    • 归档
    • 首页 (opens new window)
    • 关于我 (opens new window)
    • 图床 (opens new window)
    • 评论 (opens new window)
    • 导航栏 (opens new window)
周刊
GitHub (opens new window)
  • python

    • python基础

    • FastAPI

      • FastAPI-快速入门1
      • FastAPI-路由2
      • FastApi-参数提交3
      • FastApi-响应报文
      • FastApi-错误处理
      • FastApi-中间件
      • FastApi-跨域处理
      • 依赖注入之Depends
      • Redis与FastApi
        • 缓存接口数据,减轻数据库压力
        • 接口限流:控制用户访问频率
        • 使用 Redis 实现异步任务队列
        • 分布式锁防止任务重复执行
    • python每日练习脚本

    • python3给防火墙添加放行
    • python生成部署脚本
    • python将多个文件内容输出到一个文件中
    • 使用 Aligo 定时备份服务器文件
    • python监控日志文件并发送钉钉告警
    • python监控数据库脚本并发送钉钉告警
    • 使用python编写自动化发布脚本
    • 查询redis列表某个元素
    • centos7安装python3
    • python环境管理工具介绍
    • conda安装和镜像源配置
    • pip更换国内源
    • python爬虫
    • python环境启动服务报错缺少glibc库版本
    • 监控目录或文件变化
    • 批量更改文件
    • python引用数据库
    • python模块集合
  • shell

  • go

  • 编程
  • python
  • FastAPI
章工运维
2025-05-22
目录

Redis与FastApi

# 引言

Redis 以其超快的内存数据存储能力而闻名,是高性能 Web 应用程序的绝佳搭档。在本文中,我们将介绍如何将 Redis 与 FastAPI 集成,并通过实际示例和用例展示其强大功能。

# 为什么要在 FastAPI 中使用 Redis?

FastAPI 是一个现代的 Python 框架,旨在高效构建 API。将 FastAPI 与 Redis 结合使用可以带来诸多好处:

  • 缓存:通过缓存频繁访问的数据,减少数据库负载。
  • 会话管理:高效存储用户会话,用于身份验证。
  • 限流:防止 API 被滥用。
  • 发布/订阅(Pub/Sub):启用实时通信功能。 在本文中,我们将实现使用 Redis 的缓存功能,但在开始之前,我们先使用 Docker 设置 Redis。如果你尚未安装 Docker,请前往 Docker 官网 并为你的操作系统安装 Docker。

# 使用 Docker 设置 Redis

运行以下命令,拉取最新的 Redis 镜像(如果尚未下载),并在一个容器中启动 Redis:

docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
1

要检查 Redis 容器是否正在运行,执行以下命令:

docker ps
1

# 将 Redis 与 FastAPI 集成

我们先设置一个简单的 FastAPI 应用程序,并集成 Redis。

## decorator.py
from functools import wraps
from hashlib import sha256
import json
import redis
redis_client = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)
app_cache_key = "app_cache"

def cache_decorator(expire=3600):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 根据函数名和参数生成唯一的缓存键
            key = sha256(json.dumps((func.__name__, args, kwargs), sort_keys=True).encode()).hexdigest()
            cached_data = redis_client.get(f"{key}_{app_cache_key}")
            if cached_data:
                return json.loads(cached_data)
            # 等待异步函数的结果
            result = await func(*args, **kwargs)
            # 序列化并缓存结果
            redis_client.set(f"{key}_{app_cache_key}", json.dumps(result), ex=expire)
            return result
        return wrapper
    return decorator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  • 这个装饰器会根据函数名和参数生成一个唯一的缓存键,检查 Redis 中是否有缓存数据,并返回缓存结果或计算并缓存结果,然后返回最新结果。
  • 每个缓存条目都可以设置过期时间,确保自动清理,也可以手动删除。
  • 你可能会好奇为什么我们在生成的哈希键后面添加了后缀。
  • 原因很简单:哈希键是根据函数名和参数生成的。在删除缓存时,重新构造确切的哈希键可能会很困难,因为虽然函数名已知,但重现确切的参数并不总是可行的。
## main.py
from fastapi import FastAPI, HTTPException
import asyncio
from decorator import cache_decorator, redis_client
app = FastAPI()

@app.post("/get_details")
@cache_decorator(expire=3600)
async def get_details(body: dict):
    await asyncio.sleep(2)
    return {"data": body}

@app.delete("/delete_keys_with_suffix/{suffix}")
async def delete_keys_with_suffix(suffix: str):
    # 使用 SCAN 查找以指定后缀结尾的键
    keys_to_delete = []
    cursor = 0
    while True:
        cursor, keys = redis_client.scan(cursor=cursor, match=f"*{suffix}")
        keys_to_delete.extend(keys)
        if cursor == 0:
            break
    if not keys_to_delete:
        raise HTTPException(status_code=404, detail=f"No keys ending with '{suffix}' found")
    # 删除这些键
    deleted_count = redis_client.delete(*keys_to_delete)
    return {"message": f"Deleted {deleted_count} keys ending with '{suffix}'"}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  • /get_details 端点展示了如何通过缓存提升性能,模拟了一个耗时操作,并将结果存储在 Redis 中。
  • /delete_keys_with_suffix/{suffix} 端点提供了一个工具,用于删除所有以特定后缀结尾的 Redis 键。

# 应用场景

借助redis的持久化能力,我们可以应用在许多场景,如缓存、限流、任务队列、分布式锁等。下面看几个例子。

# 缓存接口数据,减轻数据库压力

@app.get("/user/{user_id}")
def get_user(user_id: int):
    cache_key = f"user:{user_id}"
    cached = redis_client.get(cache_key)
    if cached:
        return {"data": cached, "cached": True}

    # 模拟从数据库获取数据
    user_data = {"id": user_id, "name": f"User{user_id}"}
    
    redis_client.set(cache_key, str(user_data), ex=300)  # 缓存5分钟
    return {"data": user_data, "cached": False}
1
2
3
4
5
6
7
8
9
10
11
12

解析: • 先查缓存,命中即返回,避免查询数据库 • set(..., ex=300) 表示缓存 300 秒 • 用字符串模拟数据库返回,可配合 json.dumps()/json.loads() 存取复杂结构

# 接口限流:控制用户访问频率

@app.middleware("http")
async def rate_limiter(request: Request, call_next):
    ip = request.client.host
    key = f"rate:{ip}"
    
    count = redis_client.get(key)
    if count and int(count) >= 10:
        raise HTTPException(status_code=429, detail="请求过于频繁")
    
    redis_client.incr(key, amount=1)
    redis_client.expire(key, 10)

    return await call_next(request)
1
2
3
4
5
6
7
8
9
10
11
12
13

解析: • 每个 IP 一个独立的 Redis key:rate:127.0.0.1 • incr 自增访问次数;第一次访问 key 会自动创建 • 设置 expire,10 秒后自动重置访问次数 • 超过阈值直接返回 429 错误码(Too Many Requests) 可进一步扩展为滑动窗口或漏桶算法限流

# 使用 Redis 实现异步任务队列

@app.post("/register")
async def register_user(email: str):
    redis_client.lpush("mail_queue", email)
    return {"msg": "注册成功,稍后将发送通知邮件"}
1
2
3
4

解析: • lpush 将新邮件任务压入左侧(队头) • Redis 列表天然适合作为 FIFO 队列 • 异步执行邮件通知任务可以解耦业务流程,提高接口响应速度

配套的 Worker 示例:

def mail_worker():
    while True:
        email = redis_client.rpop("mail_queue")  # 从右侧弹出任务
        if email:
            print(f"给 {email} 发送注册成功邮件...")
        time.sleep(1)
1
2
3
4
5
6

解析: • rpop 从队列尾部取任务,实现先进先出 • 后台可用线程、子进程、或 Celery 实现并发 worker • 处理完毕即可销毁任务,天然具备幂等性和可追踪性

# 分布式锁防止任务重复执行

@app.get("/lock")
def do_task():
    lock_key = "lock:task"
    lock_id = str(uuid.uuid4())

    if redis_client.set(lock_key, lock_id, nx=True, ex=5):
        try:
            # 执行任务
            return {"status": "任务执行成功"}
        finally:
            if redis_client.get(lock_key) == lock_id:
                redis_client.delete(lock_key)
    else:
        return {"status": "任务进行中,请稍后再试"}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

解析: • set(..., nx=True):仅在不存在时设置,达到加锁效果 • ex=5:自动过期防止死锁 • 使用 uuid 避免误删其他任务的锁(分布式安全性保障) • 在 finally 中释放锁,确保任务结束就释放资源 ✅ 建议使用 Lua 脚本来实现「判断+删除」的原子性,避免并发误删。

# 结论

将 Redis 与 FastAPI 集成,为创建高性能和可扩展的应用程序开辟了可能性。无论是缓存、会话管理还是实时更新,Redis 都是一个强大的工具,能够无缝补充 FastAPI 的能力。


原文链接 (opens new window)

微信 支付宝
上次更新: 2025/05/22, 11:04:47

← 依赖注入之Depends 监控系统资源情况并发送邮件告警→

最近更新
01
个人开源项目推介平台汇总整理
06-27
02
英语学习技巧网站
06-27
03
5个开源流程图制作软件
06-27
更多文章>
Theme by Vdoing | Copyright © 2019-2025 | 点击查看十年之约 | 鄂ICP备2024072800号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式