JWT 令牌验证
实战 FastAPI + JWT:如何优雅实现单设备登录与账号有效期控制
在开发多平台接入的综合管理系统时,统一的账号认证是整个架构的安全基石。对于前后端分离的项目,JSON Web Token (JWT) 是目前最主流的鉴权方案。
然而,标准的 JWT 存在一个天然的痛点:无状态。一旦服务端签发了 Token,在它自然过期之前,服务端很难直接主动废弃它。这会导致在处理“多端互踢(单设备登录)”以及“临时账号到期管控”时遇到麻烦。
本文将基于 FastAPI 和 PostgreSQL,详细拆解一套在真实生产环境中运行的权限控制方案。这套方案通过引入 session_version(会话版本号)机制,巧妙地融合了无状态的轻量感与有状态的强管控。
一 核心设计思路
为了打破纯 JWT 的无状态限制,我们在数据库的用户表中引入一个核心字段:session_version(会话版本号)。
-
登录埋点:用户每次成功登录时,服务端生成一个当前的时间戳作为新的
session_version,更新到数据库中,并将这个版本号一并打包写入 JWT 的 Payload 中。 -
鉴权校验:用户携带 Token 请求接口时,路由守卫不仅会校验 Token 的合法性和有效期,还会从数据库中查出该用户最新的
session_version。 - 互踢裁决:如果 Token 携带的版本号与数据库中的最新版本号不一致,说明该账号在其他地方进行了登录(旧 Token 被“顶号”),服务端直接拒绝请求。
-
安全退出:用户主动退出时,只需将数据库中的
session_version重置为 0,该用户持有的所有旧 Token 即可瞬间失效。
二 基础安全与环境配置
首先,集中管理我们的全局配置和密钥。在系统根目录下,我们需要定义 JWT 的加密密钥、算法以及 Token 的默认存活周期。
# core/config.py
from pathlib import Path
# JWT 与安全配置
SECRET_KEY = "your-super-secret-key" # 生产环境务必使用复杂随机字符串并从环境变量读取
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # Token 有效期:24小时
三 登录接口:签发门禁卡与版本号更迭
登录接口承担着多项校验任务。除了常规的密码比对(推荐使用 bcrypt),还需要处理账号的禁用状态以及临时账号的到期拦截。
# api/auth.py
from fastapi import APIRouter, HTTPException, Depends
from datetime import timedelta, datetime, timezone
import time
from core.database import get_dash_db
from core.security import verify_password, create_access_token
router = APIRouter()
@router.post("/login")
def login(login_data: LoginRequest):
conn = get_dash_db()
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE username = %s", (login_data.username,))
user = cursor.fetchone()
# 1. 验证用户和密码
if not user or not verify_password(login_data.password, user['password']):
conn.close()
raise HTTPException(status_code=401, detail="用户名或密码错误")
# 2. 检查账户启用状态
if user['status'] != 1:
conn.close()
raise HTTPException(status_code=403, detail="账户已被禁用,请联系管理员")
# 3. 检查账户有效期 (针对临时账号)
if user['expire_time']:
current_utc_now = datetime.now(timezone.utc)
expire_time = user['expire_time']
# 容错处理:确保时间对象具备时区信息,避免时区天真(naive)导致的比对异常
if expire_time.tzinfo is None:
expire_time = expire_time.replace(tzinfo=timezone.utc)
if expire_time < current_utc_now:
conn.close()
raise HTTPException(status_code=403, detail="账户已过期,请联系管理员")
# 4. 生成并写入新的会话版本号,实现“顶号”逻辑
session_version = int(time.time())
cursor.execute(
"UPDATE users SET session_version = %s WHERE id = %s",
(session_version, user['id'])
)
conn.commit()
conn.close()
# 5. 将 session_version 塞进 JWT 的 payload 中
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={
"sub": user['username'],
"id": user['id'],
"role": user.get('role', 'user'),
"session_version": session_version
},
expires_delta=access_token_expires
)
return {
"success": True,
"access_token": access_token,
"token_type": "bearer",
"user": {"id": user['id'], "username": user['username']}
}
值得注意的是,在时间处理上,后端必须保持高度的一致性。Python 原生的 datetime 容易出现 Naive Time(无时区时间)与 Aware Time(带时区时间)比较报错的问题,使用 replace(tzinfo=timezone.utc) 强制对齐时区是一个极其稳妥的工程实践。
四 路由守卫:拦截器中的核心裁决
有了带版本号的 Token,接下来需要在所有受保护的路由前加装一层守卫(FastAPI 的 Depends 机制)。
这里的关键在于:不仅要解密 Token,还要回源数据库比对版本号。
# core/security.py
import jwt
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from core.config import SECRET_KEY, ALGORITHM
from core.database import get_dash_db
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
# 解密 Token 提取 Payload
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: int = payload.get("id")
token_version: int = payload.get("session_version")
if user_id is None:
raise HTTPException(status_code=401, detail="无效的认证凭证")
# 查库比对版本号
conn = get_dash_db()
try:
cursor = conn.cursor()
cursor.execute("SELECT session_version FROM users WHERE id = %s", (user_id,))
row = cursor.fetchone()
finally:
cursor.close()
conn.close()
# 核心裁决:如果数据库里的版本号跟 Token 里的对不上
if not row or row['session_version'] != token_version:
# 抛出特定的 401 详情,前端可捕获 'KICKED_OUT' 来弹窗提示“您的账号已在别处登录”
raise HTTPException(status_code=401, detail="KICKED_OUT")
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="登录已过期,请重新登录")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="无效的 Token")
前端在全局请求拦截器中,只需捕获 HTTP 401 且 detail === 'KICKED_OUT' 的响应,就可以平滑地清除本地缓存并弹出友好的互踢提示。
五 安全退出:物理级失效
在纯 JWT 架构中,退出登录通常只能依赖前端删除本地存储的 Token,服务端无能为力。但在我们的版本号机制下,服务端的退出接口变得极具掌控力。
# api/auth.py
@router.post("/logout")
def logout(current_user: dict = Depends(get_current_user)):
user_id = current_user.get("id")
if user_id:
conn = get_dash_db()
cursor = conn.cursor()
# 将会话版本号置为 0,让该用户流通在外的所有 Token 瞬间失去校验资格
cursor.execute("UPDATE users SET session_version = 0 WHERE id = %s", (user_id,))
conn.commit()
conn.close()
return {"success": True, "message": "已安全退出"}
六 总结与进阶思考
这套基于 session_version 的鉴权方案,完美解决了 JWT 无法主动失效的顽疾,同时也实现了对多平台登录状态的精确控制。
性能考量: 当前的实现中,路由守卫每次拦截请求都需要进行一次 PostgreSQL 数据库查询。在常规的并发量下,数据库连接池加上单字段主键查询的速度完全可以胜任。但如果系统流量进一步膨胀,遇到高频次的空间数据接口或频繁的瓦片请求,鉴权层的查库操作可能会成为瓶颈。
进阶优化方向:
可以将 user_id : session_version 的键值对缓存到 Redis 中。登录时更新 Redis,鉴权时直接读 Redis,从而将 I/O 成本降到最低,进一步释放系统在高并发环境下的性能潜力。
版权声明: 如无特别声明,本文版权归 Yucol 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:《 实战 FastAPI + JWT:如何优雅实现单设备登录与账号有效期控制 》
本文链接:https://yucol.top/tech/JWT.html
本文最后一次更新为 天前,文章中的某些内容可能已过时!