JWT 令牌验证

实战 FastAPI + JWT:如何优雅实现单设备登录与账号有效期控制

在开发多平台接入的综合管理系统时,统一的账号认证是整个架构的安全基石。对于前后端分离的项目,JSON Web Token (JWT) 是目前最主流的鉴权方案。

然而,标准的 JWT 存在一个天然的痛点:无状态。一旦服务端签发了 Token,在它自然过期之前,服务端很难直接主动废弃它。这会导致在处理“多端互踢(单设备登录)”以及“临时账号到期管控”时遇到麻烦。

本文将基于 FastAPI 和 PostgreSQL,详细拆解一套在真实生产环境中运行的权限控制方案。这套方案通过引入 session_version(会话版本号)机制,巧妙地融合了无状态的轻量感与有状态的强管控。

一 核心设计思路

为了打破纯 JWT 的无状态限制,我们在数据库的用户表中引入一个核心字段:session_version(会话版本号)。

  1. 登录埋点:用户每次成功登录时,服务端生成一个当前的时间戳作为新的 session_version,更新到数据库中,并将这个版本号一并打包写入 JWT 的 Payload 中。
  2. 鉴权校验:用户携带 Token 请求接口时,路由守卫不仅会校验 Token 的合法性和有效期,还会从数据库中查出该用户最新的 session_version
  3. 互踢裁决:如果 Token 携带的版本号与数据库中的最新版本号不一致,说明该账号在其他地方进行了登录(旧 Token 被“顶号”),服务端直接拒绝请求。
  4. 安全退出:用户主动退出时,只需将数据库中的 session_version 重置为 0,该用户持有的所有旧 Token 即可瞬间失效。

二 基础安全与环境配置

首先,集中管理我们的全局配置和密钥。在系统根目录下,我们需要定义 JWT 的加密密钥、算法以及 Token 的默认存活周期。

# core/config.py
from pathlib import Path

# JWT 与安全配置
SECRET_KEY = "your-super-secret-key"  # 生产环境务必使用复杂随机字符串并从环境变量读取
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # Token 有效期:24小时

三 登录接口:签发门禁卡与版本号更迭

登录接口承担着多项校验任务。除了常规的密码比对(推荐使用 bcrypt),还需要处理账号的禁用状态以及临时账号的到期拦截。

# api/auth.py
from fastapi import APIRouter, HTTPException, Depends
from datetime import timedelta, datetime, timezone
import time
from core.database import get_dash_db
from core.security import verify_password, create_access_token

router = APIRouter()

@router.post("/login")
def login(login_data: LoginRequest):
    conn = get_dash_db()
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE username = %s", (login_data.username,))
    user = cursor.fetchone()
    
    # 1. 验证用户和密码
    if not user or not verify_password(login_data.password, user['password']):
        conn.close()
        raise HTTPException(status_code=401, detail="用户名或密码错误")
        
    # 2. 检查账户启用状态
    if user['status'] != 1:
        conn.close()
        raise HTTPException(status_code=403, detail="账户已被禁用,请联系管理员")

    # 3. 检查账户有效期 (针对临时账号)
    if user['expire_time']:
        current_utc_now = datetime.now(timezone.utc)
        expire_time = user['expire_time']
        # 容错处理:确保时间对象具备时区信息,避免时区天真(naive)导致的比对异常
        if expire_time.tzinfo is None:
            expire_time = expire_time.replace(tzinfo=timezone.utc)
   
        if expire_time < current_utc_now:
            conn.close()
            raise HTTPException(status_code=403, detail="账户已过期,请联系管理员")

    # 4. 生成并写入新的会话版本号,实现“顶号”逻辑
    session_version = int(time.time())
    cursor.execute(
        "UPDATE users SET session_version = %s WHERE id = %s", 
        (session_version, user['id'])
    )
    conn.commit()
    conn.close()

    # 5. 将 session_version 塞进 JWT 的 payload 中
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={
            "sub": user['username'], 
            "id": user['id'], 
            "role": user.get('role', 'user'),
            "session_version": session_version  
        },
        expires_delta=access_token_expires
    )

    return {
        "success": True,
        "access_token": access_token,
        "token_type": "bearer",
        "user": {"id": user['id'], "username": user['username']}
    }

值得注意的是,在时间处理上,后端必须保持高度的一致性。Python 原生的 datetime 容易出现 Naive Time(无时区时间)与 Aware Time(带时区时间)比较报错的问题,使用 replace(tzinfo=timezone.utc) 强制对齐时区是一个极其稳妥的工程实践。

四 路由守卫:拦截器中的核心裁决

有了带版本号的 Token,接下来需要在所有受保护的路由前加装一层守卫(FastAPI 的 Depends 机制)。

这里的关键在于:不仅要解密 Token,还要回源数据库比对版本号。

# core/security.py
import jwt
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from core.config import SECRET_KEY, ALGORITHM
from core.database import get_dash_db

security = HTTPBearer()

def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        # 解密 Token 提取 Payload
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: int = payload.get("id")
        token_version: int = payload.get("session_version") 
        
        if user_id is None:
            raise HTTPException(status_code=401, detail="无效的认证凭证")

        # 查库比对版本号
        conn = get_dash_db()
        try:
            cursor = conn.cursor()
            cursor.execute("SELECT session_version FROM users WHERE id = %s", (user_id,))
            row = cursor.fetchone()
        finally:
            cursor.close()
            conn.close()
        
        # 核心裁决:如果数据库里的版本号跟 Token 里的对不上
        if not row or row['session_version'] != token_version:
            # 抛出特定的 401 详情,前端可捕获 'KICKED_OUT' 来弹窗提示“您的账号已在别处登录”
            raise HTTPException(status_code=401, detail="KICKED_OUT")

        return payload
        
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="登录已过期,请重新登录")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="无效的 Token")

前端在全局请求拦截器中,只需捕获 HTTP 401 且 detail === 'KICKED_OUT' 的响应,就可以平滑地清除本地缓存并弹出友好的互踢提示。

五 安全退出:物理级失效

在纯 JWT 架构中,退出登录通常只能依赖前端删除本地存储的 Token,服务端无能为力。但在我们的版本号机制下,服务端的退出接口变得极具掌控力。

# api/auth.py
@router.post("/logout")
def logout(current_user: dict = Depends(get_current_user)):
    user_id = current_user.get("id")
    if user_id:
        conn = get_dash_db()
        cursor = conn.cursor()
        # 将会话版本号置为 0,让该用户流通在外的所有 Token 瞬间失去校验资格
        cursor.execute("UPDATE users SET session_version = 0 WHERE id = %s", (user_id,))
        conn.commit()
        conn.close()
    return {"success": True, "message": "已安全退出"}

六 总结与进阶思考

这套基于 session_version 的鉴权方案,完美解决了 JWT 无法主动失效的顽疾,同时也实现了对多平台登录状态的精确控制。

性能考量: 当前的实现中,路由守卫每次拦截请求都需要进行一次 PostgreSQL 数据库查询。在常规的并发量下,数据库连接池加上单字段主键查询的速度完全可以胜任。但如果系统流量进一步膨胀,遇到高频次的空间数据接口或频繁的瓦片请求,鉴权层的查库操作可能会成为瓶颈。

进阶优化方向: 可以将 user_id : session_version 的键值对缓存到 Redis 中。登录时更新 Redis,鉴权时直接读 Redis,从而将 I/O 成本降到最低,进一步释放系统在高并发环境下的性能潜力。

版权声明: 如无特别声明,本文版权归 Yucol 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:《 实战 FastAPI + JWT:如何优雅实现单设备登录与账号有效期控制 》

本文链接:https://yucol.top/tech/JWT.html

本文最后一次更新为 天前,文章中的某些内容可能已过时!