跳到主要内容

第47章:代码安全与API防护

🌟 章节导入:走进网络安全防护中心

亲爱的朋友们,欢迎来到我们的网络安全防护中心!这是一个充满安全意识和防护技术的专业安全中心,在这里,我们将见证如何通过安全开发、API防护和数据加密技术,构建坚不可摧的应用安全体系,就像从普通建筑升级到银行金库级别的安全防护一样。

🏛️ 网络安全防护中心全景

想象一下,你正站在一个现代化的网络安全防护中心门口,眼前是四座风格迥异但又紧密相连的建筑群:

🛡️ 安全开发基础大厦

这是我们的第一站,一座充满安全意识的安全开发大厦。在这里:

  • OWASP安全标准室里,安全专家们正在研究最新的安全威胁和防护标准
  • 安全编码规范部的专家们专注于制定安全编码的最佳实践
  • 代码审计工具中心如同专业的安检系统,自动检测代码中的安全隐患

🔐 API安全防护堡垒

这座建筑闪烁着红色的光芒,象征着API安全防护的坚固堡垒

  • 认证授权中心里,JWT、OAuth等认证机制正在保护API访问
  • 输入验证工段负责过滤和验证所有API输入数据
  • 速率限制系统如同一道智能防护墙,防止API滥用和攻击

🔒 数据加密技术中心

这是一座充满密码学魅力的数据加密技术中心

  • 对称加密实验室里,AES等对称加密算法正在保护数据
  • 非对称加密中心提供RSA等非对称加密解决方案
  • 数字签名验证室确保数据的完整性和真实性

🌐 安全API网关平台

最令人兴奋的是这座未来感十足的安全API网关平台

  • 多重认证中心如同企业总部的多重身份验证系统
  • 威胁检测系统实时检测和防御各种安全威胁
  • 安全审计日志中心记录和分析所有安全事件

🚀 安全革命的见证者

在这个网络安全防护中心,我们将见证应用安全的三大革命:

🛡️ 安全开发革命

从普通开发到安全开发,我们将掌握:

  • OWASP Top 10安全威胁防护
  • 安全编码规范和最佳实践
  • 自动化代码审计工具

🔐 API安全革命

从基础API到安全API,我们将实现:

  • 完善的认证授权机制
  • 全面的输入验证和过滤
  • 智能的速率限制和防护

🔒 数据加密革命

从明文传输到加密保护,我们将建立:

  • 对称和非对称加密体系
  • 数字签名和验证机制
  • 端到端的数据保护

🎯 学以致用的企业级项目

在本章的最后,我们将综合运用所学的所有技术,构建一个完整的安全API网关系统。这不仅仅是一个学习项目,更是一个具备实际商业部署价值的企业级应用:

  • 企业应用可以集成这个系统,实现统一的API安全管理和防护
  • 微服务架构可以基于这个系统,实现安全的服务间通信
  • 云原生应用可以利用这个系统,实现企业级的安全防护
  • 技术服务商可以基于这个系统为客户提供定制化的安全API网关解决方案

🔥 准备好了吗?

现在,让我们戴上安全帽,穿上防护服,一起走进这个充满安全魅力的网络安全防护中心。在这里,我们不仅要学习最前沿的安全技术,更要将这些技术转化为真正有价值的安全应用!

准备好迎接这场安全革命了吗?让我们开始这激动人心的学习之旅!


🎯 学习目标(SMART目标)

完成本章学习后,学生将能够:

📚 知识目标

  • 安全开发体系:深入理解OWASP Top 10安全威胁、安全编码规范、代码审计工具等安全开发核心概念
  • API安全防护技术:掌握认证授权机制、输入验证过滤、速率限制实现等API安全防护关键技术
  • 数据加密技术:理解对称加密算法、非对称加密应用、数字签名验证等数据加密核心技术
  • 安全API网关理念:综合运用多重认证、威胁检测、安全审计等企业级安全防护技术

🛠️ 技能目标

  • 安全开发能力:能够遵循安全编码规范,使用代码审计工具发现和修复安全隐患
  • API安全防护能力:具备设计和实现安全API的实战能力,包括认证、授权、输入验证等
  • 数据加密应用能力:掌握使用加密算法保护敏感数据的实践能力
  • 安全系统开发能力:能够构建企业级安全API网关,具备大规模安全系统开发的工程实践能力

💡 素养目标

  • 安全意识:培养安全第一的开发思维模式
  • 安全设计理念:建立安全设计在系统架构中的重要性意识
  • 防护意识:注重多层防护和纵深防御的实践
  • 合规意识:理解安全合规在企业发展中的重要性

📝 知识导图


47.1 安全开发基础

想象一下,您正在建造一座现代化的银行大楼。在开始施工之前,您需要了解可能面临的各种安全威胁,制定严格的安全施工规范,并使用专业的检测工具确保每个环节都符合安全标准。

在软件开发的世界里,安全开发基础就是我们的"安全施工规范"。它帮助我们识别常见的安全威胁,遵循安全编码规范,并使用自动化工具检测代码中的安全隐患。

🔍 OWASP Top 10安全威胁

OWASP(Open Web Application Security Project)是一个专注于Web应用安全的非营利组织。OWASP Top 10列出了当前最严重的10种Web应用安全风险:

# 示例1:OWASP Top 10安全威胁演示
"""
OWASP Top 10安全威胁识别和防护
包含:
- 注入攻击(Injection)
- 失效的身份认证(Broken Authentication)
- 敏感数据暴露(Sensitive Data Exposure)
- XML外部实体(XXE)
- 失效的访问控制(Broken Access Control)
- 安全配置错误(Security Misconfiguration)
- 跨站脚本(XSS)
- 不安全的反序列化(Insecure Deserialization)
- 使用含有已知漏洞的组件(Using Components with Known Vulnerabilities)
- 不足的日志记录和监控(Insufficient Logging & Monitoring)
"""
import re
import html
import json
import hashlib
from typing import Dict, List, Optional, Any
from datetime import datetime
from dataclasses import dataclass
@dataclass
class SecurityThreat:
"""安全威胁信息"""
threat_id: str
name: str
category: str
severity: str # critical, high, medium, low
description: str
examples: List[str]
mitigation: List[str]
class OWASPTop10Analyzer:
"""OWASP Top 10安全威胁分析器"""
def __init__(self):
"""初始化OWASP Top 10威胁库"""
self.threats = self._load_owasp_top10()
print("🛡️ OWASP Top 10安全威胁分析器启动成功!")
def _load_owasp_top10(self) -> Dict[str, SecurityThreat]:
"""加载OWASP Top 10威胁定义"""
threats = {
"A01": SecurityThreat(
threat_id="A01",
name="注入攻击(Injection)",
category="注入",
severity="critical",
description="将不受信任的数据作为命令或查询的一部分发送到解释器",
examples=[
"SQL注入:' OR '1'='1",
"命令注入:; rm -rf /",
"LDAP注入:)(&(uid=*))(|(uid=*))"
],
mitigation=[
"使用参数化查询或预编译语句",
"输入验证和输出编码",
"最小权限原则",
"使用ORM框架"
]
),
"A02": SecurityThreat(
threat_id="A02",
name="失效的身份认证(Broken Authentication)",
category="认证",
severity="high",
description="应用程序的身份认证机制存在缺陷",
examples=[
"弱密码策略",
"会话固定攻击",
"密码明文存储",
"会话超时设置不当"
],
mitigation=[
"实施强密码策略",
"使用安全的会话管理",
"实施多因素认证",
"密码哈希存储"
]
),
"A03": SecurityThreat(
threat_id="A03",
name="敏感数据暴露(Sensitive Data Exposure)",
category="数据保护",
severity="high",
description="应用程序未充分保护敏感数据",
examples=[
"信用卡号明文传输",
"密码未加密存储",
"API密钥硬编码",
"敏感信息记录在日志中"
],
mitigation=[
"使用HTTPS传输",
"数据加密存储",
"密钥安全管理",
"敏感数据脱敏"
]
),
"A04": SecurityThreat(
threat_id="A04",
name="XML外部实体(XXE)",
category="注入",
severity="high",
description="XML解析器处理外部实体引用时存在漏洞",
examples=[
"XXE文件读取",
"XXE SSRF攻击",
"XXE拒绝服务攻击"
],
mitigation=[
"禁用外部实体处理",
"使用JSON替代XML",
"输入验证和过滤",
"使用安全的XML解析器"
]
),
"A05": SecurityThreat(
threat_id="A05",
name="失效的访问控制(Broken Access Control)",
category="访问控制",
severity="high",
description="应用程序的访问控制机制存在缺陷",
examples=[
"水平权限提升",
"垂直权限提升",
"直接对象引用",
"目录遍历攻击"
],
mitigation=[
"实施最小权限原则",
"服务器端访问控制",
"定期权限审计",
"使用访问控制框架"
]
),
"A06": SecurityThreat(
threat_id="A06",
name="安全配置错误(Security Misconfiguration)",
category="配置",
severity="medium",
description="应用程序、框架、服务器等配置不当",
examples=[
"默认账户未修改",
"错误页面泄露信息",
"不必要的服务开启",
"安全头未配置"
],
mitigation=[
"安全配置检查清单",
"自动化配置管理",
"定期安全审计",
"最小化攻击面"
]
),
"A07": SecurityThreat(
threat_id="A07",
name="跨站脚本(XSS)",
category="注入",
severity="high",
description="应用程序未对用户输入进行适当验证和编码",
examples=[
"反射型XSS:<script>alert('XSS')</script>",
"存储型XSS:恶意脚本存储在数据库中",
"DOM型XSS:客户端脚本操作DOM"
],
mitigation=[
"输出编码",
"内容安全策略(CSP)",
"输入验证和过滤",
"使用安全框架"
]
),
"A08": SecurityThreat(
threat_id="A08",
name="不安全的反序列化(Insecure Deserialization)",
category="注入",
severity="high",
description="应用程序反序列化不可信数据时存在漏洞",
examples=[
"对象注入攻击",
"远程代码执行",
"拒绝服务攻击"
],
mitigation=[
"避免反序列化不可信数据",
"使用安全的序列化格式",
"输入验证",
"最小权限反序列化"
]
),
"A09": SecurityThreat(
threat_id="A09",
name="使用含有已知漏洞的组件",
category="依赖",
severity="medium",
description="应用程序使用了含有已知漏洞的组件",
examples=[
"过时的框架版本",
"存在CVE漏洞的库",
"未更新的依赖包"
],
mitigation=[
"定期更新依赖",
"漏洞扫描工具",
"依赖管理策略",
"安全公告关注"
]
),
"A10": SecurityThreat(
threat_id="A10",
name="不足的日志记录和监控",
category="监控",
severity="low",
description="应用程序缺乏足够的日志记录和监控",
examples=[
"未记录安全事件",
"日志信息不足",
"缺乏实时监控",
"告警机制缺失"
],
mitigation=[
"全面的日志记录",
"实时监控系统",
"安全事件告警",
"日志分析工具"
]
)
}
return threats
def analyze_code(self, code: str) -> List[Dict[str, Any]]:
"""分析代码中的安全威胁"""
findings = []
# 检测SQL注入风险
sql_patterns = [
r"execute\s*\(\s*['\"].*%s.*['\"]",
r"query\s*\(\s*['\"].*\+.*['\"]",
r"format\s*\(\s*['\"].*%s.*['\"]"
]
for pattern in sql_patterns:
if re.search(pattern, code, re.IGNORECASE):
findings.append({
"threat_id": "A01",
"threat_name": "注入攻击(Injection)",
"severity": "critical",
"location": "代码中可能存在SQL注入风险",
"recommendation": "使用参数化查询或ORM框架"
})
# 检测XSS风险
xss_patterns = [
r"innerHTML\s*=\s*.*\+",
r"document\.write\s*\(",
r"eval\s*\("
]
for pattern in xss_patterns:
if re.search(pattern, code, re.IGNORECASE):
findings.append({
"threat_id": "A07",
"threat_name": "跨站脚本(XSS)",
"severity": "high",
"location": "代码中可能存在XSS风险",
"recommendation": "使用输出编码和内容安全策略"
})
# 检测敏感数据暴露
sensitive_patterns = [
r"password\s*=\s*['\"][^'\"]+['\"]",
r"api_key\s*=\s*['\"][^'\"]+['\"]",
r"secret\s*=\s*['\"][^'\"]+['\"]"
]
for pattern in sensitive_patterns:
if re.search(pattern, code, re.IGNORECASE):
findings.append({
"threat_id": "A03",
"threat_name": "敏感数据暴露(Sensitive Data Exposure)",
"severity": "high",
"location": "代码中可能存在敏感数据硬编码",
"recommendation": "使用环境变量或密钥管理服务"
})
return findings
def get_threat_info(self, threat_id: str) -> Optional[SecurityThreat]:
"""获取威胁详细信息"""
return self.threats.get(threat_id)
def get_all_threats(self) -> List[SecurityThreat]:
"""获取所有威胁列表"""
return list(self.threats.values())
def print_threat_summary(self):
"""打印威胁摘要"""
print("\n" + "="*60)
print("🛡️ OWASP Top 10安全威胁摘要")
print("="*60)
for threat_id, threat in sorted(self.threats.items()):
severity_icon = {
"critical": "🔴",
"high": "🟠",
"medium": "🟡",
"low": "🟢"
}.get(threat.severity, "⚪")
print(f"\n{severity_icon} {threat.name} ({threat_id})")
print(f" 类别: {threat.category}")
print(f" 严重性: {threat.severity}")
print(f" 描述: {threat.description}")
print(f" 防护措施:")
for mitigation in threat.mitigation:
print(f" - {mitigation}")
# 运行演示
if __name__ == "__main__":
analyzer = OWASPTop10Analyzer()
analyzer.print_threat_summary()
# 代码安全分析示例
print("\n" + "="*60)
print("📝 代码安全分析示例")
print("="*60)
vulnerable_code = """
def get_user(username):
query = "SELECT * FROM users WHERE username = '" + username + "'"
result = execute(query) # SQL注入风险
user_data = {"name": username}
document.innerHTML = user_data["name"] # XSS风险
api_key = "sk-1234567890abcdef" # 敏感数据暴露
return result
"""
findings = analyzer.analyze_code(vulnerable_code)
print(f"\n发现 {len(findings)} 个安全问题:")
for finding in findings:
print(f"\n🔍 {finding['threat_name']} ({finding['severity']})")
print(f" 位置: {finding['location']}")
print(f" 建议: {finding['recommendation']}")

📋 安全编码规范

安全编码规范是安全开发的基础,它定义了编写安全代码的标准和最佳实践:

# 示例2:安全编码规范检查器
"""
安全编码规范检查器
包含:
- 输入验证规范
- 输出编码规范
- 错误处理规范
- 密码安全规范
"""
import re
import hashlib
import secrets
from typing import Dict, List, Optional, Any
from enum import Enum
class SecurityLevel(Enum):
"""安全级别"""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class SecurityCodeChecker:
"""安全编码规范检查器"""
def __init__(self):
"""初始化检查规则"""
self.rules = self._load_security_rules()
print("✅ 安全编码规范检查器启动成功!")
def _load_security_rules(self) -> Dict[str, Dict]:
"""加载安全编码规则"""
return {
"input_validation": {
"name": "输入验证规范",
"rules": [
{
"id": "IV001",
"description": "所有用户输入必须验证",
"pattern": r"def\s+\w+\s*\([^)]*\):",
"check": self._check_input_validation,
"severity": SecurityLevel.CRITICAL
},
{
"id": "IV002",
"description": "使用白名单验证而非黑名单",
"pattern": r"if\s+.*not\s+in\s+\[",
"check": self._check_whitelist_validation,
"severity": SecurityLevel.HIGH
}
]
},
"output_encoding": {
"name": "输出编码规范",
"rules": [
{
"id": "OE001",
"description": "输出到HTML必须编码",
"pattern": r"innerHTML\s*=|document\.write",
"check": self._check_html_encoding,
"severity": SecurityLevel.CRITICAL
}
]
},
"error_handling": {
"name": "错误处理规范",
"rules": [
{
"id": "EH001",
"description": "错误信息不应泄露敏感信息",
"pattern": r"except\s+.*:\s*print",
"check": self._check_error_handling,
"severity": SecurityLevel.HIGH
}
]
},
"password_security": {
"name": "密码安全规范",
"rules": [
{
"id": "PS001",
"description": "密码必须哈希存储",
"pattern": r"password\s*=\s*['\"][^'\"]+['\"]",
"check": self._check_password_storage,
"severity": SecurityLevel.CRITICAL
},
{
"id": "PS002",
"description": "使用强密码策略",
"pattern": r"password\s*=\s*input",
"check": self._check_password_strength,
"severity": SecurityLevel.HIGH
}
]
}
}
def _check_input_validation(self, code: str, match) -> Optional[Dict]:
"""检查输入验证"""
# 检查函数参数是否被验证
if "validate" not in code.lower() and "sanitize" not in code.lower():
return {
"rule_id": "IV001",
"message": "函数参数缺少输入验证",
"severity": SecurityLevel.CRITICAL
}
return None
def _check_whitelist_validation(self, code: str, match) -> Optional[Dict]:
"""检查白名单验证"""
return {
"rule_id": "IV002",
"message": "建议使用白名单验证而非黑名单",
"severity": SecurityLevel.HIGH
}
def _check_html_encoding(self, code: str, match) -> Optional[Dict]:
"""检查HTML编码"""
if "escape" not in code.lower() and "encode" not in code.lower():
return {
"rule_id": "OE001",
"message": "HTML输出缺少编码处理",
"severity": SecurityLevel.CRITICAL
}
return None
def _check_error_handling(self, code: str, match) -> Optional[Dict]:
"""检查错误处理"""
if "password" in code.lower() or "secret" in code.lower():
return {
"rule_id": "EH001",
"message": "错误信息可能泄露敏感信息",
"severity": SecurityLevel.HIGH
}
return None
def _check_password_storage(self, code: str, match) -> Optional[Dict]:
"""检查密码存储"""
return {
"rule_id": "PS001",
"message": "密码不应明文存储,应使用哈希算法",
"severity": SecurityLevel.CRITICAL
}
def _check_password_strength(self, code: str, match) -> Optional[Dict]:
"""检查密码强度"""
return {
"rule_id": "PS002",
"message": "应实施强密码策略验证",
"severity": SecurityLevel.HIGH
}
def check_code(self, code: str) -> List[Dict[str, Any]]:
"""检查代码是否符合安全编码规范"""
findings = []
for category, category_info in self.rules.items():
for rule in category_info["rules"]:
pattern = rule["pattern"]
matches = re.finditer(pattern, code, re.MULTILINE | re.IGNORECASE)
for match in matches:
result = rule["check"](code, match)
if result:
findings.append({
"category": category_info["name"],
**result
})
return findings
def get_security_guidelines(self) -> Dict[str, List[str]]:
"""获取安全编码指南"""
return {
"输入验证": [
"验证所有用户输入的数据类型、长度、格式",
"使用白名单验证而非黑名单",
"在服务器端进行验证,不要仅依赖客户端验证",
"对特殊字符进行转义或过滤"
],
"输出编码": [
"根据输出上下文选择合适的编码方式",
"HTML输出使用HTML实体编码",
"JavaScript输出使用JavaScript编码",
"URL输出使用URL编码"
],
"错误处理": [
"不要向用户泄露系统内部信息",
"记录详细的错误日志供开发人员使用",
"使用通用的错误消息",
"不要将错误信息用于身份验证"
],
"密码安全": [
"使用强密码策略(长度、复杂度)",
"密码必须哈希存储,使用bcrypt或Argon2",
"实施密码过期和重置机制",
"使用多因素认证增强安全性"
],
"会话管理": [
"使用安全的会话标识符",
"实施会话超时机制",
"在登出时销毁会话",
"使用HTTPS传输会话cookie"
],
"加密传输": [
"使用HTTPS传输敏感数据",
"实施证书固定(Certificate Pinning)",
"使用TLS 1.2或更高版本",
"配置安全的安全头"
]
}
# 运行演示
if __name__ == "__main__":
checker = SecurityCodeChecker()
# 获取安全编码指南
print("\n" + "="*60)
print("📋 安全编码指南")
print("="*60)
guidelines = checker.get_security_guidelines()
for category, rules in guidelines.items():
print(f"\n{category}:")
for rule in rules:
print(f" • {rule}")
# 代码安全检查示例
print("\n" + "="*60)
print("🔍 代码安全检查示例")
print("="*60)
sample_code = """
def login(username, password):
# 缺少输入验证
query = "SELECT * FROM users WHERE username = '" + username + "'"
result = execute(query)
# 密码明文存储
user.password = password
# HTML输出未编码
document.innerHTML = username
# 错误信息泄露
try:
authenticate(user)
except Exception as e:
print(f"Error: {e}") # 可能泄露敏感信息
"""
findings = checker.check_code(sample_code)
print(f"\n发现 {len(findings)} 个安全问题:")
for finding in findings:
severity_icon = {
SecurityLevel.CRITICAL: "🔴",
SecurityLevel.HIGH: "🟠",
SecurityLevel.MEDIUM: "🟡",
SecurityLevel.LOW: "🟢"
}.get(finding["severity"], "⚪")
print(f"\n{severity_icon} [{finding['category']}] {finding['message']}")
print(f" 规则ID: {finding['rule_id']}")

🔧 代码审计工具

代码审计工具可以自动检测代码中的安全隐患,提高安全开发效率:

# 示例3:代码审计工具集成
"""
代码审计工具集成
包含:
- 静态代码分析
- 依赖漏洞扫描
- 安全配置检查
"""
import subprocess
import json
import re
from typing import Dict, List, Optional, Any
from pathlib import Path
class CodeAuditTool:
"""代码审计工具"""
def __init__(self):
"""初始化审计工具"""
self.tools = {
"bandit": self._check_bandit,
"safety": self._check_safety,
"semgrep": self._check_semgrep
}
print("🔧 代码审计工具启动成功!")
def _check_bandit(self, code_path: str) -> List[Dict[str, Any]]:
"""使用Bandit进行Python代码安全扫描"""
# Bandit是Python代码安全扫描工具
# 实际使用需要安装: pip install bandit
findings = []
# 模拟Bandit扫描结果
common_issues = [
{
"test_id": "B101",
"severity": "LOW",
"issue": "assert_used",
"message": "Use of assert detected. The enclosed code will be removed when compiling to optimised byte code."
},
{
"test_id": "B602",
"severity": "HIGH",
"issue": "shell_injection_subprocess",
"message": "subprocess call with shell=True identified, security issue."
}
]
# 检查代码中的常见问题
with open(code_path, 'r', encoding='utf-8') as f:
code = f.read()
if "subprocess" in code and "shell=True" in code:
findings.append(common_issues[1])
if "assert " in code:
findings.append(common_issues[0])
return findings
def _check_safety(self, requirements_path: str) -> List[Dict[str, Any]]:
"""使用Safety检查Python依赖漏洞"""
# Safety是Python依赖漏洞扫描工具
# 实际使用需要安装: pip install safety
findings = []
# 模拟Safety扫描结果
if Path(requirements_path).exists():
with open(requirements_path, 'r', encoding='utf-8') as f:
requirements = f.read()
# 检查已知漏洞的包
vulnerable_packages = {
"django": ["<2.2.0", "CVE-2020-9402"],
"requests": ["<2.25.0", "CVE-2020-26137"],
"urllib3": ["<1.26.0", "CVE-2020-26137"]
}
for package, (version, cve) in vulnerable_packages.items():
if package in requirements.lower():
findings.append({
"package": package,
"installed_version": "unknown",
"vulnerability": cve,
"severity": "HIGH",
"message": f"{package} has known security vulnerabilities"
})
return findings
def _check_semgrep(self, code_path: str) -> List[Dict[str, Any]]:
"""使用Semgrep进行代码模式匹配扫描"""
# Semgrep是代码模式匹配工具
# 实际使用需要安装: pip install semgrep
findings = []
# 定义安全规则模式
security_patterns = [
{
"id": "SEC001",
"pattern": r"eval\s*\(",
"message": "Use of eval() detected, potential code injection risk",
"severity": "HIGH"
},
{
"id": "SEC002",
"pattern": r"pickle\.loads\s*\(",
"message": "Use of pickle.loads() detected, potential deserialization risk",
"severity": "HIGH"
},
{
"id": "SEC003",
"pattern": r"md5\s*\(",
"message": "Use of MD5 detected, use SHA-256 or bcrypt for security",
"severity": "MEDIUM"
}
]
if Path(code_path).exists():
with open(code_path, 'r', encoding='utf-8') as f:
code = f.read()
for pattern_info in security_patterns:
if re.search(pattern_info["pattern"], code, re.IGNORECASE):
findings.append({
"rule_id": pattern_info["id"],
"message": pattern_info["message"],
"severity": pattern_info["severity"]
})
return findings
def audit_project(self, project_path: str) -> Dict[str, Any]:
"""审计整个项目"""
results = {
"bandit_findings": [],
"safety_findings": [],
"semgrep_findings": [],
"summary": {}
}
project_path_obj = Path(project_path)
# Bandit扫描
python_files = list(project_path_obj.rglob("*.py"))
for py_file in python_files[:5]: # 限制扫描文件数量
findings = self._check_bandit(str(py_file))
results["bandit_findings"].extend(findings)
# Safety扫描
requirements_file = project_path_obj / "requirements.txt"
if requirements_file.exists():
results["safety_findings"] = self._check_safety(str(requirements_file))
# Semgrep扫描
for py_file in python_files[:5]:
findings = self._check_semgrep(str(py_file))
results["semgrep_findings"].extend(findings)
# 生成摘要
results["summary"] = {
"total_findings": len(results["bandit_findings"]) +
len(results["safety_findings"]) +
len(results["semgrep_findings"]),
"high_severity": sum(1 for f in results["bandit_findings"] +
results["semgrep_findings"]
if f.get("severity") == "HIGH"),
"medium_severity": sum(1 for f in results["bandit_findings"] +
results["semgrep_findings"]
if f.get("severity") == "MEDIUM"),
"low_severity": sum(1 for f in results["bandit_findings"] +
results["semgrep_findings"]
if f.get("severity") == "LOW")
}
return results
def generate_report(self, results: Dict[str, Any]) -> str:
"""生成审计报告"""
report = []
report.append("="*60)
report.append("🔍 代码安全审计报告")
report.append("="*60)
report.append(f"\n📊 审计摘要:")
report.append(f" 总发现问题: {results['summary']['total_findings']}")
report.append(f" 高危问题: {results['summary']['high_severity']}")
report.append(f" 中危问题: {results['summary']['medium_severity']}")
report.append(f" 低危问题: {results['summary']['low_severity']}")
if results['bandit_findings']:
report.append(f"\n🔧 Bandit扫描结果 ({len(results['bandit_findings'])} 个问题):")
for finding in results['bandit_findings']:
severity_icon = {
"HIGH": "🔴",
"MEDIUM": "🟠",
"LOW": "🟡"
}.get(finding.get("severity", "UNKNOWN"), "⚪")
report.append(f" {severity_icon} [{finding.get('test_id', 'N/A')}] {finding.get('message', 'N/A')}")
if results['safety_findings']:
report.append(f"\n📦 Safety依赖扫描结果 ({len(results['safety_findings'])} 个问题):")
for finding in results['safety_findings']:
report.append(f" 🔴 [{finding.get('package', 'N/A')}] {finding.get('message', 'N/A')}")
report.append(f" CVE: {finding.get('vulnerability', 'N/A')}")
if results['semgrep_findings']:
report.append(f"\n🔍 Semgrep扫描结果 ({len(results['semgrep_findings'])} 个问题):")
for finding in results['semgrep_findings']:
severity_icon = {
"HIGH": "🔴",
"MEDIUM": "🟠",
"LOW": "🟡"
}.get(finding.get("severity", "UNKNOWN"), "⚪")
report.append(f" {severity_icon} [{finding.get('rule_id', 'N/A')}] {finding.get('message', 'N/A')}")
return "\n".join(report)
# 运行演示
if __name__ == "__main__":
auditor = CodeAuditTool()
# 创建示例项目文件
import tempfile
import os
with tempfile.TemporaryDirectory() as temp_dir:
# 创建示例Python文件
sample_code = """
import subprocess
import pickle
import hashlib
def unsafe_function(user_input):
# 不安全的代码示例
subprocess.call(user_input, shell=True) # Shell注入风险
data = pickle.loads(user_input) # 反序列化风险
hash_value = hashlib.md5(user_input.encode()).hexdigest() # 弱哈希
return hash_value
"""
sample_file = Path(temp_dir) / "unsafe_code.py"
sample_file.write_text(sample_code, encoding='utf-8')
# 创建requirements.txt
requirements_file = Path(temp_dir) / "requirements.txt"
requirements_file.write_text("django==2.1.0\nrequests==2.20.0\n", encoding='utf-8')
# 执行审计
print("🔍 开始代码安全审计...")
results = auditor.audit_project(temp_dir)
# 生成报告
report = auditor.generate_report(results)
print(report)

47.2 API安全防护

API安全防护是保护API免受各种攻击的关键技术。就像银行需要多重身份验证、严格的访问控制和实时监控一样,API也需要完善的认证授权、输入验证和速率限制机制。

🔐 认证授权机制

认证(Authentication)是验证用户身份的过程,授权(Authorization)是确定用户权限的过程:

# 示例4:JWT令牌认证系统
"""
JWT令牌认证实现
包含:
- Token生成
- Token验证
- 刷新令牌机制
- 权限控制
"""
import jwt
import time
from datetime import datetime, timedelta
from typing import Dict, Optional, List
from dataclasses import dataclass
from enum import Enum
class TokenType(Enum):
"""令牌类型"""
ACCESS = "access"
REFRESH = "refresh"
@dataclass
class TokenPayload:
"""令牌载荷"""
user_id: int
username: str
roles: List[str]
token_type: TokenType
exp: datetime
iat: datetime
class JWTAuthService:
"""JWT认证服务"""
def __init__(self, secret_key: str, algorithm: str = "HS256"):
"""初始化JWT服务"""
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expiry = timedelta(minutes=15) # 访问令牌15分钟
self.refresh_token_expiry = timedelta(days=7) # 刷新令牌7天
print("🔐 JWT认证服务启动成功!")
def generate_access_token(self, user_id: int, username: str,
roles: List[str] = None) -> str:
"""生成访问令牌"""
now = datetime.utcnow()
payload = {
"user_id": user_id,
"username": username,
"roles": roles or [],
"token_type": TokenType.ACCESS.value,
"iat": now,
"exp": now + self.access_token_expiry
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
print(f"✅ 生成访问令牌: user_id={user_id}, username={username}")
return token
def generate_refresh_token(self, user_id: int, username: str) -> str:
"""生成刷新令牌"""
now = datetime.utcnow()
payload = {
"user_id": user_id,
"username": username,
"token_type": TokenType.REFRESH.value,
"iat": now,
"exp": now + self.refresh_token_expiry
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
print(f"✅ 生成刷新令牌: user_id={user_id}")
return token
def verify_token(self, token: str, token_type: TokenType = TokenType.ACCESS) -> Optional[Dict]:
"""验证令牌"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# 检查令牌类型
if payload.get("token_type") != token_type.value:
print(f"❌ 令牌类型不匹配: 期望{token_type.value}, 实际{payload.get('token_type')}")
return None
# 检查是否过期(jwt.decode会自动检查,但我们可以额外验证)
exp = datetime.utcfromtimestamp(payload["exp"])
if exp < datetime.utcnow():
print("❌ 令牌已过期")
return None
print(f"✅ 令牌验证成功: user_id={payload.get('user_id')}")
return payload
except jwt.ExpiredSignatureError:
print("❌ 令牌已过期")
return None
except jwt.InvalidTokenError as e:
print(f"❌ 无效令牌: {str(e)}")
return None
def refresh_access_token(self, refresh_token: str) -> Optional[str]:
"""使用刷新令牌获取新的访问令牌"""
payload = self.verify_token(refresh_token, TokenType.REFRESH)
if not payload:
return None
# 生成新的访问令牌
return self.generate_access_token(
payload["user_id"],
payload["username"],
payload.get("roles", [])
)
def extract_user_info(self, token: str) -> Optional[Dict]:
"""从令牌中提取用户信息"""
payload = self.verify_token(token)
if payload:
return {
"user_id": payload.get("user_id"),
"username": payload.get("username"),
"roles": payload.get("roles", [])
}
return None
class RoleBasedAccessControl:
"""基于角色的访问控制(RBAC)"""
def __init__(self):
"""初始化RBAC系统"""
self.roles_permissions = {
"admin": ["read", "write", "delete", "admin"],
"editor": ["read", "write"],
"viewer": ["read"]
}
print("🔐 RBAC系统启动成功!")
def has_permission(self, user_roles: List[str], required_permission: str) -> bool:
"""检查用户是否有特定权限"""
for role in user_roles:
permissions = self.roles_permissions.get(role, [])
if required_permission in permissions or "admin" in permissions:
return True
return False
def check_access(self, user_roles: List[str], resource: str, action: str) -> bool:
"""检查用户对资源的访问权限"""
# 资源权限映射
resource_permissions = {
"users": {
"read": ["viewer", "editor", "admin"],
"write": ["editor", "admin"],
"delete": ["admin"]
},
"posts": {
"read": ["viewer", "editor", "admin"],
"write": ["editor", "admin"],
"delete": ["admin"]
}
}
allowed_roles = resource_permissions.get(resource, {}).get(action, [])
return any(role in allowed_roles for role in user_roles) or "admin" in user_roles
# 运行演示
if __name__ == "__main__":
jwt_service = JWTAuthService(secret_key="your-secret-key-here")
# 生成令牌
access_token = jwt_service.generate_access_token(
user_id=1,
username="alice",
roles=["admin", "user"]
)
refresh_token = jwt_service.generate_refresh_token(
user_id=1,
username="alice"
)
print(f"\n访问令牌: {access_token[:50]}...")
print(f"刷新令牌: {refresh_token[:50]}...")
# 验证令牌
payload = jwt_service.verify_token(access_token)
if payload:
print(f"\n令牌载荷: {payload}")
# 提取用户信息
user_info = jwt_service.extract_user_info(access_token)
print(f"\n用户信息: {user_info}")
# RBAC权限检查
rbac = RoleBasedAccessControl()
print(f"\n权限检查:")
print(f" admin角色是否有write权限: {rbac.has_permission(['admin'], 'write')}")
print(f" viewer角色是否有write权限: {rbac.has_permission(['viewer'], 'write')}")
print(f" editor角色是否可以删除users: {rbac.check_access(['editor'], 'users', 'delete')}")
print(f" admin角色是否可以删除users: {rbac.check_access(['admin'], 'users', 'delete')}")

🛡️ 输入验证过滤

输入验证是API安全的第一道防线,可以有效防止注入攻击、XSS攻击等安全威胁:

# 示例5:API输入验证和过滤系统
"""
API输入验证和过滤
包含:
- SQL注入防护
- XSS攻击防护
- 命令注入防护
- 路径遍历防护
"""
import re
import html
from typing import Dict, List, Optional, Any
from urllib.parse import quote, unquote
class InputValidator:
"""输入验证器"""
def __init__(self):
"""初始化验证规则"""
self.validation_rules = self._load_validation_rules()
print("🛡️ 输入验证器启动成功!")
def _load_validation_rules(self) -> Dict[str, Dict]:
"""加载验证规则"""
return {
"email": {
"pattern": r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
"max_length": 254
},
"phone": {
"pattern": r'^1[3-9]\d{9}$', # 中国手机号
"max_length": 11
},
"username": {
"pattern": r'^[a-zA-Z0-9_]{3,20}$',
"max_length": 20
},
"password": {
"pattern": r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]{8,}$',
"min_length": 8,
"max_length": 128
}
}
def validate_email(self, email: str) -> bool:
"""验证邮箱格式"""
if not email or len(email) > self.validation_rules["email"]["max_length"]:
return False
return bool(re.match(self.validation_rules["email"]["pattern"], email))
def validate_phone(self, phone: str) -> bool:
"""验证手机号格式"""
if not phone or len(phone) != 11:
return False
return bool(re.match(self.validation_rules["phone"]["pattern"], phone))
def validate_username(self, username: str) -> bool:
"""验证用户名格式"""
if not username:
return False
if len(username) < 3 or len(username) > 20:
return False
return bool(re.match(self.validation_rules["username"]["pattern"], username))
def validate_password(self, password: str) -> bool:
"""验证密码强度"""
if not password:
return False
if len(password) < 8 or len(password) > 128:
return False
return bool(re.match(self.validation_rules["password"]["pattern"], password))
def sanitize_string(self, value: str, max_length: int = 1000) -> str:
"""清理字符串输入"""
if not isinstance(value, str):
value = str(value)
# 移除前后空格
value = value.strip()
# 限制长度
if len(value) > max_length:
value = value[:max_length]
# HTML转义(防止XSS)
value = html.escape(value)
return value
class SQLInjectionProtection:
"""SQL注入防护"""
def __init__(self):
"""初始化SQL注入检测模式"""
self.sql_patterns = [
r"(union|select|insert|update|delete|drop|create|alter|exec|execute)",
r"(--|#|/\*|\*/)",
r"(\bor\b|\band\b)\s+\d+\s*=\s*\d+",
r"(\bor\b|\band\b)\s+['\"].*['\"]",
r"(sleep|benchmark|waitfor)\s*\(",
r"(\'|(\\\')|(;)|(\\)|(\|))"
]
print("🛡️ SQL注入防护启动成功!")
def detect_sql_injection(self, value: str) -> bool:
"""检测SQL注入攻击"""
if not isinstance(value, str):
return False
value_lower = value.lower()
for pattern in self.sql_patterns:
if re.search(pattern, value_lower, re.IGNORECASE):
return True
return False
def sanitize_sql_input(self, value: str) -> str:
"""清理SQL输入"""
# 移除SQL关键字
sql_keywords = ['select', 'insert', 'update', 'delete', 'drop', 'create', 'alter']
value_lower = value.lower()
for keyword in sql_keywords:
value = re.sub(rf'\b{keyword}\b', '', value, flags=re.IGNORECASE)
# 转义特殊字符
value = value.replace("'", "''")
value = value.replace(";", "")
value = value.replace("--", "")
return value
class XSSProtection:
"""XSS攻击防护"""
def __init__(self):
"""初始化XSS检测模式"""
self.xss_patterns = [
r"<script[^>]*>.*?</script>",
r"javascript:",
r"on\w+\s*=", # 事件处理器
r"<iframe[^>]*>",
r"<object[^>]*>",
r"<embed[^>]*>",
r"<img[^>]*onerror",
r"<svg[^>]*onload"
]
print("🛡️ XSS防护启动成功!")
def detect_xss(self, value: str) -> bool:
"""检测XSS攻击"""
if not isinstance(value, str):
return False
for pattern in self.xss_patterns:
if re.search(pattern, value, re.IGNORECASE):
return True
return False
def sanitize_html(self, html_content: str) -> str:
"""清理HTML内容"""
# HTML转义
sanitized = html.escape(html_content)
return sanitized
class APISecurityMiddleware:
"""API安全中间件"""
def __init__(self):
"""初始化安全中间件"""
self.validator = InputValidator()
self.sql_protection = SQLInjectionProtection()
self.xss_protection = XSSProtection()
print("🛡️ API安全中间件启动成功!")
def validate_request(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""验证请求数据"""
errors = []
sanitized_data = {}
for key, value in data.items():
if isinstance(value, str):
# SQL注入检测
if self.sql_protection.detect_sql_injection(value):
errors.append(f"字段 {key} 包含潜在的SQL注入攻击")
continue
# XSS检测
if self.xss_protection.detect_xss(value):
errors.append(f"字段 {key} 包含潜在的XSS攻击")
continue
# 清理输入
sanitized_data[key] = self.validator.sanitize_string(value)
else:
sanitized_data[key] = value
return {
"valid": len(errors) == 0,
"errors": errors,
"data": sanitized_data
}
# 运行演示
if __name__ == "__main__":
security = APISecurityMiddleware()
# 测试输入验证
test_data = {
"username": "alice",
"email": "alice@example.com",
"phone": "13800138000",
"description": "<script>alert('XSS')</script>",
"query": "'; DROP TABLE users; --"
}
result = security.validate_request(test_data)
print("请求验证结果:")
print(f"有效: {result['valid']}")
if result['errors']:
print("错误:")
for error in result['errors']:
print(f" - {error}")
print(f"清理后的数据: {result['data']}")
# 测试邮箱验证
print(f"\n邮箱验证: {security.validator.validate_email('alice@example.com')}")
print(f"手机号验证: {security.validator.validate_phone('13800138000')}")

⏱️ 速率限制实现

速率限制可以防止API滥用和DDoS攻击,保护系统资源:

# 示例6:API速率限制系统
"""
API速率限制实现
包含:
- 令牌桶算法
- 漏桶算法
- 滑动窗口算法
- IP白名单机制
"""
import time
from typing import Dict, List, Optional
from collections import deque
from datetime import datetime, timedelta
from threading import Lock
class TokenBucket:
"""令牌桶算法"""
def __init__(self, capacity: int, refill_rate: float):
"""
初始化令牌桶
capacity: 桶容量(最大令牌数)
refill_rate: 每秒补充的令牌数
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self.lock = Lock()
def consume(self, tokens: int = 1) -> bool:
"""消费令牌"""
with self.lock:
# 补充令牌
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
# 检查是否有足够的令牌
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def get_available_tokens(self) -> int:
"""获取可用令牌数"""
with self.lock:
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
return int(self.tokens)
class LeakyBucket:
"""漏桶算法"""
def __init__(self, capacity: int, leak_rate: float):
"""
初始化漏桶
capacity: 桶容量
leak_rate: 每秒漏出的请求数
"""
self.capacity = capacity
self.leak_rate = leak_rate
self.queue = deque()
self.last_leak = time.time()
self.lock = Lock()
def add_request(self) -> bool:
"""添加请求"""
with self.lock:
# 漏出请求
now = time.time()
elapsed = now - self.last_leak
requests_to_leak = int(elapsed * self.leak_rate)
for _ in range(min(requests_to_leak, len(self.queue))):
self.queue.popleft()
self.last_leak = now
# 检查是否有空间
if len(self.queue) < self.capacity:
self.queue.append(now)
return True
return False
def get_queue_size(self) -> int:
"""获取队列大小"""
with self.lock:
# 漏出请求
now = time.time()
elapsed = now - self.last_leak
requests_to_leak = int(elapsed * self.leak_rate)
for _ in range(min(requests_to_leak, len(self.queue))):
self.queue.popleft()
self.last_leak = now
return len(self.queue)
class SlidingWindow:
"""滑动窗口算法"""
def __init__(self, window_size: int, max_requests: int):
"""
初始化滑动窗口
window_size: 窗口大小(秒)
max_requests: 窗口内最大请求数
"""
self.window_size = window_size
self.max_requests = max_requests
self.requests = deque()
self.lock = Lock()
def add_request(self) -> bool:
"""添加请求"""
with self.lock:
now = time.time()
# 移除窗口外的请求
while self.requests and self.requests[0] < now - self.window_size:
self.requests.popleft()
# 检查是否超过限制
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def get_request_count(self) -> int:
"""获取窗口内的请求数"""
with self.lock:
now = time.time()
# 移除窗口外的请求
while self.requests and self.requests[0] < now - self.window_size:
self.requests.popleft()
return len(self.requests)
class RateLimiter:
"""速率限制器"""
def __init__(self, algorithm: str = "token_bucket", **kwargs):
"""
初始化速率限制器
algorithm: 算法类型 (token_bucket, leaky_bucket, sliding_window)
"""
self.algorithm = algorithm
self.limiters: Dict[str, any] = {}
self.whitelist: set = set()
self.lock = Lock()
# 根据算法创建限流器
if algorithm == "token_bucket":
self.capacity = kwargs.get("capacity", 100)
self.refill_rate = kwargs.get("refill_rate", 10.0)
elif algorithm == "leaky_bucket":
self.capacity = kwargs.get("capacity", 100)
self.leak_rate = kwargs.get("leak_rate", 10.0)
elif algorithm == "sliding_window":
self.window_size = kwargs.get("window_size", 60)
self.max_requests = kwargs.get("max_requests", 100)
print(f"⏱️ 速率限制器启动成功!算法: {algorithm}")
def add_to_whitelist(self, identifier: str):
"""添加到白名单"""
with self.lock:
self.whitelist.add(identifier)
print(f"✅ 添加到白名单: {identifier}")
def remove_from_whitelist(self, identifier: str):
"""从白名单移除"""
with self.lock:
self.whitelist.discard(identifier)
print(f"❌ 从白名单移除: {identifier}")
def is_whitelisted(self, identifier: str) -> bool:
"""检查是否在白名单中"""
return identifier in self.whitelist
def _get_limiter(self, identifier: str):
"""获取或创建限流器"""
if identifier not in self.limiters:
if self.algorithm == "token_bucket":
self.limiters[identifier] = TokenBucket(self.capacity, self.refill_rate)
elif self.algorithm == "leaky_bucket":
self.limiters[identifier] = LeakyBucket(self.capacity, self.leak_rate)
elif self.algorithm == "sliding_window":
self.limiters[identifier] = SlidingWindow(self.window_size, self.max_requests)
return self.limiters[identifier]
def check_rate_limit(self, identifier: str) -> Dict[str, any]:
"""检查速率限制"""
# 白名单检查
if self.is_whitelisted(identifier):
return {
"allowed": True,
"remaining": -1, # 无限制
"reset_time": None
}
# 获取限流器
limiter = self._get_limiter(identifier)
# 根据算法检查
if self.algorithm == "token_bucket":
allowed = limiter.consume()
remaining = limiter.get_available_tokens()
return {
"allowed": allowed,
"remaining": remaining,
"reset_time": None
}
elif self.algorithm == "leaky_bucket":
allowed = limiter.add_request()
queue_size = limiter.get_queue_size()
return {
"allowed": allowed,
"remaining": self.capacity - queue_size,
"reset_time": None
}
elif self.algorithm == "sliding_window":
allowed = limiter.add_request()
request_count = limiter.get_request_count()
return {
"allowed": allowed,
"remaining": self.max_requests - request_count,
"reset_time": time.time() + self.window_size
}
return {"allowed": False, "remaining": 0, "reset_time": None}
# 运行演示
if __name__ == "__main__":
# 令牌桶算法演示
print("="*60)
print("令牌桶算法演示")
print("="*60)
token_bucket = RateLimiter("token_bucket", capacity=10, refill_rate=2.0)
for i in range(15):
result = token_bucket.check_rate_limit("user1")
status = "✅ 允许" if result["allowed"] else "❌ 拒绝"
print(f"请求 {i+1}: {status} (剩余: {result['remaining']})")
time.sleep(0.1)
# 滑动窗口算法演示
print("\n" + "="*60)
print("滑动窗口算法演示")
print("="*60)
sliding_window = RateLimiter("sliding_window", window_size=60, max_requests=5)
for i in range(10):
result = sliding_window.check_rate_limit("user2")
status = "✅ 允许" if result["allowed"] else "❌ 拒绝"
print(f"请求 {i+1}: {status} (剩余: {result['remaining']})")
time.sleep(0.5)

47.3 数据加密技术

数据加密是保护敏感数据的关键技术。就像银行金库需要多重锁和密码保护一样,我们的数据也需要多层加密保护。

🔐 对称加密算法

对称加密使用相同的密钥进行加密和解密,速度快,适合大量数据加密:

# 示例7:对称加密算法实现
"""
对称加密算法实现
包含:
- AES加密算法
- 密钥管理策略
- 加密模式选择
- 性能优化
"""
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding, hashes
from cryptography.hazmat.backends import default_backend
import os
import base64
from typing import Tuple
class AESEncryption:
"""AES加密算法实现"""
def __init__(self, key: bytes = None):
"""
初始化AES加密器
key: 32字节密钥(AES-256)
"""
if key is None:
key = os.urandom(32) # 生成随机密钥
elif len(key) != 32:
raise ValueError("密钥必须是32字节(AES-256)")
self.key = key
self.backend = default_backend()
print("🔐 AES加密器启动成功!")
def encrypt(self, plaintext: str) -> Tuple[bytes, bytes]:
"""
加密数据
返回: (加密数据, 初始化向量)
"""
# 生成随机IV
iv = os.urandom(16)
# 创建加密器
cipher = Cipher(
algorithms.AES(self.key),
modes.CBC(iv),
backend=self.backend
)
encryptor = cipher.encryptor()
# 填充数据
padder = padding.PKCS7(128).padder()
padded_data = padder.update(plaintext.encode('utf-8'))
padded_data += padder.finalize()
# 加密
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return ciphertext, iv
def decrypt(self, ciphertext: bytes, iv: bytes) -> str:
"""
解密数据
"""
# 创建解密器
cipher = Cipher(
algorithms.AES(self.key),
modes.CBC(iv),
backend=self.backend
)
decryptor = cipher.decryptor()
# 解密
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
# 去除填充
unpadder = padding.PKCS7(128).unpadder()
plaintext = unpadder.update(padded_plaintext)
plaintext += unpadder.finalize()
return plaintext.decode('utf-8')
class FernetEncryption:
"""Fernet对称加密(基于AES)"""
def __init__(self, key: bytes = None):
"""
初始化Fernet加密器
key: Fernet密钥(32字节)
"""
if key is None:
key = Fernet.generate_key()
elif isinstance(key, str):
key = key.encode()
self.key = key
self.cipher = Fernet(key)
print("🔐 Fernet加密器启动成功!")
def encrypt(self, plaintext: str) -> str:
"""加密数据"""
encrypted = self.cipher.encrypt(plaintext.encode('utf-8'))
return base64.urlsafe_b64encode(encrypted).decode('utf-8')
def decrypt(self, encrypted_data: str) -> str:
"""解密数据"""
encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode('utf-8'))
decrypted = self.cipher.decrypt(encrypted_bytes)
return decrypted.decode('utf-8')
# 运行演示
if __name__ == "__main__":
# AES加密演示
print("="*60)
print("AES加密演示")
print("="*60)
aes = AESEncryption()
plaintext = "这是需要加密的敏感数据"
ciphertext, iv = aes.encrypt(plaintext)
print(f"原文: {plaintext}")
print(f"密文: {ciphertext.hex()[:50]}...")
print(f"IV: {iv.hex()}")
decrypted = aes.decrypt(ciphertext, iv)
print(f"解密: {decrypted}")
print(f"加密成功: {'✅' if plaintext == decrypted else '❌'}")
# Fernet加密演示
print("\n" + "="*60)
print("Fernet加密演示")
print("="*60)
fernet = FernetEncryption()
plaintext = "这是需要加密的敏感数据"
encrypted = fernet.encrypt(plaintext)
print(f"原文: {plaintext}")
print(f"密文: {encrypted[:50]}...")
decrypted = fernet.decrypt(encrypted)
print(f"解密: {decrypted}")
print(f"加密成功: {'✅' if plaintext == decrypted else '❌'}")

🔑 非对称加密应用

非对称加密使用公钥和私钥对,公钥用于加密,私钥用于解密,适合密钥交换和数字签名:

# 示例8:非对称加密和数字签名实现
"""
非对称加密和数字签名实现
包含:
- RSA加密算法
- 密钥交换机制
- 数字签名生成和验证
"""
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
import hashlib
from typing import Tuple
class RSAEncryption:
"""RSA非对称加密实现"""
def __init__(self, key_size: int = 2048):
"""
初始化RSA加密器
key_size: 密钥长度(2048或4096位)
"""
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
self.public_key = self.private_key.public_key()
print(f"🔑 RSA加密器启动成功!密钥长度: {key_size}位")
def get_public_key_pem(self) -> str:
"""获取公钥PEM格式"""
pem = self.public_key.serialize(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return pem.decode('utf-8')
def encrypt(self, plaintext: str, public_key=None) -> bytes:
"""
使用公钥加密数据
"""
if public_key is None:
public_key = self.public_key
ciphertext = public_key.encrypt(
plaintext.encode('utf-8'),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return ciphertext
def decrypt(self, ciphertext: bytes) -> str:
"""
使用私钥解密数据
"""
plaintext = self.private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return plaintext.decode('utf-8')
class DigitalSignature:
"""数字签名实现"""
def __init__(self, private_key=None, public_key=None):
"""
初始化数字签名器
"""
if private_key is None:
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
self.private_key = private_key
self.public_key = public_key or private_key.public_key()
print("✍️ 数字签名器启动成功!")
def sign(self, message: str) -> bytes:
"""
生成数字签名
"""
signature = self.private_key.sign(
message.encode('utf-8'),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return signature
def verify(self, message: str, signature: bytes) -> bool:
"""
验证数字签名
"""
try:
self.public_key.verify(
signature,
message.encode('utf-8'),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False
# 运行演示
if __name__ == "__main__":
# RSA加密演示
print("="*60)
print("RSA加密演示")
print("="*60)
rsa_enc = RSAEncryption()
plaintext = "这是需要加密的敏感数据"
ciphertext = rsa_enc.encrypt(plaintext)
print(f"原文: {plaintext}")
print(f"密文: {ciphertext.hex()[:50]}...")
decrypted = rsa_enc.decrypt(ciphertext)
print(f"解密: {decrypted}")
print(f"加密成功: {'✅' if plaintext == decrypted else '❌'}")
# 数字签名演示
print("\n" + "="*60)
print("数字签名演示")
print("="*60)
signer = DigitalSignature()
message = "这是需要签名的消息"
signature = signer.sign(message)
print(f"消息: {message}")
print(f"签名: {signature.hex()[:50]}...")
is_valid = signer.verify(message, signature)
print(f"签名验证: {'✅ 有效' if is_valid else '❌ 无效'}")
# 篡改检测
tampered_message = "这是被篡改的消息"
is_valid_tampered = signer.verify(tampered_message, signature)
print(f"篡改后验证: {'✅ 有效' if is_valid_tampered else '❌ 无效(检测到篡改)'}")

47.4 综合项目:安全API网关

在本章的最后,我们将综合运用所学的所有安全技术,构建一个完整的安全API网关系统。这个系统将集成多重认证、威胁检测、安全审计等功能。

# 示例9:安全API网关系统
"""
安全API网关系统
包含:
- 多重认证机制
- 威胁检测系统
- 安全审计日志
- 统一安全策略
"""
from typing import Dict, List, Optional, Any
from datetime import datetime
from dataclasses import dataclass, asdict
import json
@dataclass
class SecurityEvent:
"""安全事件"""
event_id: str
timestamp: datetime
event_type: str
user_id: Optional[str]
ip_address: str
endpoint: str
severity: str
details: Dict[str, Any]
class SecureAPIGateway:
"""安全API网关"""
def __init__(self):
"""初始化安全API网关"""
# 导入之前定义的组件
# 在实际项目中,这些应该从独立的模块导入
self.jwt_auth = None # JWTAuthService实例
self.rate_limiter = None # RateLimiter实例
self.security_middleware = None # APISecurityMiddleware实例
self.encryption = None # FernetEncryption实例
self.security_events: List[SecurityEvent] = []
self.blocked_ips: set = set()
self.failed_attempts: Dict[str, int] = {}
print("🌐 安全API网关启动成功!")
def authenticate_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""认证请求"""
# 检查IP是否被封锁
client_ip = request.get("ip_address", "unknown")
if client_ip in self.blocked_ips:
self._log_security_event(
"access_blocked",
None,
client_ip,
request.get("endpoint", ""),
"high",
{"reason": "IP被封锁"}
)
return {
"authenticated": False,
"error": "IP地址已被封锁"
}
# JWT认证
auth_header = request.get("headers", {}).get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
# 这里应该调用JWT验证
# payload = self.jwt_auth.verify_token(token)
# 为演示目的,模拟验证
payload = {"user_id": "1", "username": "alice", "roles": ["user"]}
if payload:
return {
"authenticated": True,
"user_id": payload.get("user_id"),
"username": payload.get("username"),
"roles": payload.get("roles", [])
}
# 记录失败尝试
self.failed_attempts[client_ip] = self.failed_attempts.get(client_ip, 0) + 1
if self.failed_attempts[client_ip] >= 5:
self.blocked_ips.add(client_ip)
self._log_security_event(
"ip_blocked",
None,
client_ip,
request.get("endpoint", ""),
"critical",
{"reason": "多次认证失败"}
)
return {
"authenticated": False,
"error": "认证失败"
}
def check_rate_limit(self, request: Dict[str, Any]) -> bool:
"""检查速率限制"""
client_ip = request.get("ip_address", "unknown")
# 这里应该调用RateLimiter
# result = self.rate_limiter.check_rate_limit(client_ip)
# 为演示目的,模拟检查
return True
def validate_input(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""验证输入"""
# 这里应该调用APISecurityMiddleware
# result = self.security_middleware.validate_request(request.get("data", {}))
# 为演示目的,模拟验证
return {
"valid": True,
"errors": [],
"data": request.get("data", {})
}
def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理API请求"""
# 1. 认证
auth_result = self.authenticate_request(request)
if not auth_result.get("authenticated"):
return {
"status_code": 401,
"error": auth_result.get("error", "未授权")
}
# 2. 速率限制
if not self.check_rate_limit(request):
self._log_security_event(
"rate_limit_exceeded",
auth_result.get("user_id"),
request.get("ip_address", "unknown"),
request.get("endpoint", ""),
"medium",
{}
)
return {
"status_code": 429,
"error": "请求过于频繁"
}
# 3. 输入验证
validation_result = self.validate_input(request)
if not validation_result.get("valid"):
self._log_security_event(
"input_validation_failed",
auth_result.get("user_id"),
request.get("ip_address", "unknown"),
request.get("endpoint", ""),
"high",
{"errors": validation_result.get("errors", [])}
)
return {
"status_code": 400,
"error": "输入验证失败",
"details": validation_result.get("errors", [])
}
# 4. 处理业务逻辑
self._log_security_event(
"request_processed",
auth_result.get("user_id"),
request.get("ip_address", "unknown"),
request.get("endpoint", ""),
"low",
{}
)
return {
"status_code": 200,
"data": {
"message": "请求处理成功",
"user": auth_result.get("username")
}
}
def _log_security_event(self, event_type: str, user_id: Optional[str],
ip_address: str, endpoint: str, severity: str,
details: Dict[str, Any]):
"""记录安全事件"""
event = SecurityEvent(
event_id=f"evt_{datetime.now().timestamp()}",
timestamp=datetime.now(),
event_type=event_type,
user_id=user_id,
ip_address=ip_address,
endpoint=endpoint,
severity=severity,
details=details
)
self.security_events.append(event)
severity_icon = {
"critical": "🔴",
"high": "🟠",
"medium": "🟡",
"low": "🟢"
}.get(severity, "⚪")
print(f"{severity_icon} 安全事件: {event_type} | IP: {ip_address} | 端点: {endpoint}")
def get_security_report(self) -> Dict[str, Any]:
"""获取安全报告"""
recent_events = [e for e in self.security_events
if (datetime.now() - e.timestamp).total_seconds() < 3600]
event_counts = {}
for event in recent_events:
event_counts[event.event_type] = event_counts.get(event.event_type, 0) + 1
return {
"total_events_1h": len(recent_events),
"event_types": event_counts,
"blocked_ips": len(self.blocked_ips),
"failed_attempts": sum(self.failed_attempts.values())
}
# 运行演示
if __name__ == "__main__":
gateway = SecureAPIGateway()
# 模拟API请求
print("="*60)
print("安全API网关演示")
print("="*60)
# 正常请求
normal_request = {
"endpoint": "/api/users",
"method": "GET",
"ip_address": "192.168.1.100",
"headers": {
"Authorization": "Bearer valid_token_here"
},
"data": {
"username": "alice"
}
}
response = gateway.handle_request(normal_request)
print(f"\n正常请求响应: {response['status_code']}")
# 未认证请求
unauth_request = {
"endpoint": "/api/users",
"method": "GET",
"ip_address": "192.168.1.200",
"headers": {},
"data": {}
}
response = gateway.handle_request(unauth_request)
print(f"未认证请求响应: {response['status_code']} - {response.get('error')}")
# 安全报告
print("\n" + "="*60)
print("安全报告")
print("="*60)
report = gateway.get_security_report()
print(json.dumps(report, indent=2, ensure_ascii=False, default=str))

📚 实践练习

练习1:OWASP Top 10威胁分析

分析一个Web应用,识别其中可能存在的OWASP Top 10安全威胁,并制定相应的防护措施。

练习2:安全编码规范检查

使用代码审计工具检查一个Python项目,修复发现的安全问题。

练习3:JWT认证系统实现

实现一个完整的JWT认证系统,包括令牌生成、验证、刷新等功能。

练习4:API速率限制

实现一个基于滑动窗口的API速率限制系统,支持不同用户的不同限制策略。

练习5:数据加密应用

使用对称和非对称加密算法,实现一个敏感数据加密存储系统。


🤔 思考题

  1. 安全与性能的平衡:如何在保证安全性的同时,不影响系统性能?
  2. 多层防护策略:为什么需要多层安全防护?各层防护的作用是什么?
  3. 密钥管理:在分布式系统中,如何安全地管理和分发加密密钥?
  4. 安全审计:安全审计日志应该记录哪些信息?如何保护审计日志本身的安全?
  5. 威胁检测:如何设计一个有效的威胁检测系统,能够实时发现和响应安全威胁?

📖 拓展阅读

  1. OWASP官方文档https://owasp.org/
  2. OWASP Top 10:了解最新的Web应用安全威胁
  3. JWT规范:RFC 7519 - JSON Web Token
  4. 加密算法标准:AES、RSA等加密算法的详细规范
  5. API安全最佳实践:RESTful API安全设计指南

✅ 检查清单

完成本章学习后,请检查以下内容:

知识掌握

  • 理解OWASP Top 10安全威胁及其防护措施
  • 掌握安全编码规范和最佳实践
  • 了解代码审计工具的使用方法
  • 理解JWT认证机制和RBAC权限控制
  • 掌握API输入验证和过滤技术
  • 理解速率限制算法(令牌桶、漏桶、滑动窗口)
  • 掌握对称和非对称加密算法
  • 理解数字签名的原理和应用

技能应用

  • 能够识别和修复常见的安全漏洞
  • 能够实现JWT认证和授权系统
  • 能够实现API输入验证和速率限制
  • 能够使用加密算法保护敏感数据
  • 能够设计和实现安全API网关

实践项目

  • 完成安全编码规范检查练习
  • 完成JWT认证系统实现
  • 完成API速率限制系统实现
  • 完成数据加密应用开发
  • 完成安全API网关综合项目

恭喜您完成第47章的学习! 您已经掌握了代码安全和API防护的核心技术,可以开始构建安全的企业级应用了。在下一章,我们将学习数据隐私与AI伦理,探索如何在AI应用中保护用户隐私和遵循伦理规范。 <|tool▁calls▁begin|><|tool▁call▁begin|> todo_write