第13章 - 文件操作与数据持久化
🎯 本章目标: 掌握Python中的文件操作、数据序列化和数据库操作,建立完整的数据持久化技能体系
🏠 生活化引入
想象一下,你是一个图书馆管理员:
- 文件操作 就像管理图书馆的藏书:分类、整理、查找、借阅
- 数据序列化 就像把书籍信息记录在卡片上:保存和读取书籍详情
- 数据库 就像图书馆的电子管理系统:高效存储和检索大量信息
本章将教你成为一个优秀的"数据管理员"!
📚 知识地图
13.1 文件系统操作详解
🎓 核心概念:图书馆管理系统
文件系统就像一个巨大的图书馆:
- 文件 = 图书馆中的每一本书
- 目录 = 图书馆的书架和分类区域
- 路径 = 找到特定书籍的地址(几楼几排几号书架)
- 文件属性 = 书籍的基本信息(作者、出版日期、页数等)
📝 现代化路径操作:pathlib模块
from pathlib import Pathimport osimport shutilimport timefrom datetime import datetimefrom typing import List, Optionalclass SmartFileManager:"""智能文件管理器 - 像图书馆管理员一样管理文件"""def __init__(self, base_path: str = "."):"""初始化文件管理器Args:base_path: 基础工作目录,默认为当前目录"""self.base_path = Path(base_path).resolve()print(f"📁 文件管理器初始化完成,工作目录: {self.base_path}")def get_file_info(self, file_path: str) -> dict:"""获取文件详细信息 - 就像查看图书的基本信息卡片Args:file_path: 文件路径Returns:包含文件信息的字典"""path = Path(file_path)if not path.exists():return {"error": "文件不存在"}stat = path.stat()info = {"name": path.name, # 文件名"parent": str(path.parent), # 父目录"size": stat.st_size, # 文件大小(字节)"size_human": self._format_size(stat.st_size), # 人类可读的大小"created": datetime.fromtimestamp(stat.st_ctime), # 创建时间"modified": datetime.fromtimestamp(stat.st_mtime), # 修改时间"is_file": path.is_file(), # 是否为文件"is_dir": path.is_dir(), # 是否为目录"suffix": path.suffix, # 文件扩展名"absolute_path": str(path.absolute()) # 绝对路径}return infodef _format_size(self, size_bytes: int) -> str:"""将字节数转换为人类可读的格式"""for unit in ['B', 'KB', 'MB', 'GB']:if size_bytes < 1024:return f"{size_bytes:.1f} {unit}"size_bytes /= 1024return f"{size_bytes:.1f} TB"def create_directory_structure(self, structure: dict, base_path: Optional[str] = None):"""创建目录结构 - 就像在图书馆中设置新的书架分类Args:structure: 目录结构字典base_path: 基础路径,默认使用工作目录"""if base_path is None:base_path = self.base_pathelse:base_path = Path(base_path)def create_recursive(struct: dict, current_path: Path):for name, content in struct.items():new_path = current_path / nameif isinstance(content, dict):# 创建目录new_path.mkdir(parents=True, exist_ok=True)print(f"📁 创建目录: {new_path}")create_recursive(content, new_path)else:# 创建文件new_path.parent.mkdir(parents=True, exist_ok=True)with open(new_path, 'w', encoding='utf-8') as f:f.write(str(content))print(f"📄 创建文件: {new_path}")create_recursive(structure, base_path)def find_files(self, pattern: str, search_path: Optional[str] = None) -> List[Path]:"""查找文件 - 就像在图书馆中按关键词搜索书籍Args:pattern: 搜索模式(支持通配符)search_path: 搜索路径,默认使用工作目录Returns:匹配的文件路径列表"""if search_path is None:search_path = self.base_pathelse:search_path = Path(search_path)# 使用glob模式匹配matches = list(search_path.rglob(pattern))print(f"🔍 搜索模式 '{pattern}' 找到 {len(matches)} 个匹配项")return matchesdef organize_files_by_type(self, source_dir: str, target_dir: str):"""按文件类型整理文件 - 就像按主题分类图书Args:source_dir: 源目录target_dir: 目标目录"""source_path = Path(source_dir)target_path = Path(target_dir)# 定义文件类型分类file_categories = {'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg'],'documents': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt'],'spreadsheets': ['.xls', '.xlsx', '.csv', '.ods'],'presentations': ['.ppt', '.pptx', '.odp'],'code': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.c'],'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'],'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'],'video': ['.mp4', '.avi', '.mkv', '.mov', '.wmv']}# 创建分类目录for category in file_categories:category_path = target_path / categorycategory_path.mkdir(parents=True, exist_ok=True)# 整理文件moved_count = 0for file_path in source_path.rglob('*'):if file_path.is_file():file_extension = file_path.suffix.lower()# 查找文件所属分类target_category = 'others' # 默认分类for category, extensions in file_categories.items():if file_extension in extensions:target_category = categorybreak# 移动文件target_file_path = target_path / target_category / file_path.name# 处理文件名冲突counter = 1original_name = target_file_path.stemwhile target_file_path.exists():target_file_path = target_file_path.parent / f"{original_name}_{counter}{file_path.suffix}"counter += 1shutil.move(str(file_path), str(target_file_path))print(f"📦 移动文件: {file_path.name} → {target_category}/")moved_count += 1print(f"✅ 文件整理完成,共移动 {moved_count} 个文件")def clean_empty_directories(self, path: str):"""清理空目录 - 就像移除图书馆中的空书架Args:path: 要清理的路径"""path = Path(path)removed_count = 0# 从最深层开始清理(自底向上)for dir_path in sorted(path.rglob('*'), key=lambda p: len(p.parts), reverse=True):if dir_path.is_dir():try:# 尝试删除空目录dir_path.rmdir()print(f"🗑️ 删除空目录: {dir_path}")removed_count += 1except OSError:# 目录不为空,跳过passprint(f"✅ 清理完成,共删除 {removed_count} 个空目录")# 使用示例def demo_file_manager():"""文件管理器演示"""print("=== 智能文件管理器演示 ===\n")# 创建文件管理器manager = SmartFileManager("./demo_files")# 1. 创建演示目录结构print("1. 创建演示目录结构")demo_structure = {"documents": {"reports": {"2025_report.txt": "这是2025年的报告内容","2023_report.txt": "这是2023年的报告内容"},"manuals": {"user_manual.txt": "用户手册内容","admin_manual.txt": "管理员手册内容"}},"images": {"photos": {},"icons": {}},"temp_files": {"temp1.tmp": "临时文件1","temp2.tmp": "临时文件2"}}manager.create_directory_structure(demo_structure)print()# 2. 获取文件信息print("2. 获取文件信息")file_info = manager.get_file_info("./demo_files/documents/reports/2025_report.txt")for key, value in file_info.items():print(f" {key}: {value}")print()# 3. 查找文件print("3. 查找所有.txt文件")txt_files = manager.find_files("*.txt", "./demo_files")for file_path in txt_files:print(f" 📄 {file_path}")print()# 4. 清理 空目录print("4. 清理空目录")manager.clean_empty_directories("./demo_files")print()if __name__ == "__main__":demo_file_manager()
🛠️ 实战项目:智能文件整理工具
import osimport hashlibfrom collections import defaultdictfrom typing import Dict, Setclass DuplicateFileFinder:"""重复文件查找器 - 找出图书馆中的重复藏书"""def __init__(self):self.file_hashes: Dict[str, List[str]] = defaultdict(list)self.processed_count = 0def calculate_file_hash(self, file_path: str) -> str:"""计算文件的MD5哈希值 - 就像给每本书一个唯一的身份证号"""hash_md5 = hashlib.md5()try:with open(file_path, "rb") as f:# 分块读取,避免大文件占用过多内存for chunk in iter(lambda: f.read(4096), b""):hash_md5.update(chunk)return hash_md5.hexdigest()except (IOError, OSError) as e:print(f"⚠️ 无法读取文件 {file_path}: {e}")return ""def scan_directory(self, directory: str) -> Dict[str, List[str]]:"""扫描目录查找重复文件Args:directory: 要扫描的目录路径Returns:重复文件字典,键为哈希值,值为文件路径列表"""print(f"🔍 开始扫描目录: {directory}")for root, dirs, files in os.walk(directory):for file in files:file_path = os.path.join(root, file)file_hash = self.calculate_file_hash(file_path)if file_hash:self.file_hashes[file_hash].append(file_path)self.processed_count += 1if self.processed_count % 100 == 0:print(f" 已处理 {self.processed_count} 个文件...")# 只返回有重复的文件duplicates = {hash_val: paths for hash_val, paths in self.file_hashes.items()if len(paths) > 1}print(f"✅ 扫描完成,共处理 {self.processed_count} 个文件")print(f"🔍 发现 {len(duplicates)} 组重复文件")return duplicatesdef generate_report(self, duplicates: Dict[str, List[str]]) -> str:"""生成重复文件报告"""report_lines = ["# 重复文件报告\n"]total_duplicates = 0total_wasted_space = 0for i, (file_hash, file_paths) in enumerate(duplicates.items(), 1):report_lines.append(f"## 重复组 {i}")report_lines.append(f"**文件哈希**: `{file_hash}`")report_lines.append(f"**重复数量**: {len(file_paths)}")# 获取文件大小try:file_size = os.path.getsize(file_paths[0])wasted_space = file_size * (len(file_paths) - 1)total_wasted_space += wasted_spacereport_lines.append(f"**文件大小**: {self._format_size(file_size)}")report_lines.append(f"**浪费空间**: {self._format_size(wasted_space)}")except OSError:passreport_lines.append("**文件路径**:")for path in file_paths:report_lines.append(f"- `{path}`")report_lines.append("")total_duplicates += len(file_paths) - 1# 添加总结summary = f"""## 📊 总结报告- **重复文件组数**: {len(duplicates)}- **重复文件总数**: {total_duplicates}- **浪费存储空间**: {self._format_size(total_wasted_space)}- **建议**: 保留每组中的一个文件,删除其余重复文件---*报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*"""report_lines.append(summary)return "\n".join(report_lines)def _format_size(self, size_bytes: int) -> str:"""格式化文件大小"""for unit in ['B', 'KB', 'MB', 'GB']:if size_bytes < 1024:return f"{size_bytes:.1f} {unit}"size_bytes /= 1024return f"{size_bytes:.1f} TB"# 使用示例def demo_duplicate_finder():"""重复文件查找器演示"""print("=== 重复文件查找器演示 ===\n")finder = DuplicateFileFinder()# 扫描当前目录duplicates = finder.scan_directory(".")if duplicates:# 生成报告report = finder.generate_report(duplicates)# 保存报告到文件with open("duplicate_files_report.md", "w", encoding="utf-8") as f:f.write(report)print("📄 重复文件报告已保存到 duplicate_files_report.md")# 显示前几组重复文件print("\n🔍 发现的重复文件(前3组):")for i, (file_hash, file_paths) in enumerate(list(duplicates.items())[:3], 1):print(f"\n重复组 {i}:")for path in file_paths:print(f" 📄 {path}")else:print("✅ 未发现重复文件")if __name__ == "__main__":demo_duplicate_finder()
13.2 文件读写与编码处理
🎓 核心概念:借书还书系统
文件读写就像图书馆的借书还书过程:
- 打开文件 = 从书架上取下一本书
- 读取内容 = 翻阅书籍内容
- 写入内容 = 在笔记本上记录内容
- 关闭文件 = 把书放回书架
📝 高效文件处理系统
import csvimport jsonimport tempfileimport gzipfrom contextlib import contextmanagerfrom typing import Iterator, Any, Optionalclass AdvancedFileProcessor:"""高级文件处理器 - 像专业图书管理员一样处理各种文件"""def __init__(self):self.encoding = 'utf-8'self.buffer_size = 8192 # 8KB缓冲区@contextmanagerdef safe_file_operation(self, file_path: str, mode: str, **kwargs):"""安全的文件操作上下文管理器"""file_handle = Nonetry:file_handle = open(file_path, mode, encoding=self.encoding, **kwargs)yield file_handleexcept IOError as e:print(f"❌ 文件操作失败: {e}")raisefinally:if file_handle:file_handle.close()def read_large_file_in_chunks(self, file_path: str, chunk_size: int = None) -> Iterator[str]:"""分块读取大文件 - 就像一页一页地翻阅厚重的百科全书Args:file_path: 文件路径chunk_size: 块大小,默认使用缓冲区大小Yields:文件内容块"""if chunk_size is None:chunk_size = self.buffer_sizewith self.safe_file_operation(file_path, 'r') as file:while True:chunk = file.read(chunk_size)if not chunk:breakyield chunkdef read_lines_efficiently(self, file_path: str) -> Iterator[str]:"""高效逐行读取 - 就像逐条阅读图书目录Args:file_path: 文件路径Yields:文件的每一行"""with self.safe_file_operation(file_path, 'r') as file:for line_number, line in enumerate(file, 1):yield line.rstrip('\n\r') # 移除行尾换行符def write_with_backup(self, file_path: str, content: str, backup: bool = True):"""安全写入文件(带备份)- 就像修改重要文档时先做备份Args:file_path: 文件路径content: 要写入的内容backup: 是否创建备份"""file_path = Path(file_path)# 创建备份if backup and file_path.exists():backup_path = file_path.with_suffix(file_path.suffix + '.backup')shutil.copy2(file_path, backup_path)print(f"📋 创建备份: {backup_path}")# 使用临时文件确保原子性写入with tempfile.NamedTemporaryFile(mode='w',encoding=self.encoding,dir=file_path.parent,delete=False) as temp_file:temp_file.write(content)temp_path = temp_file.name# 原子性替换shutil.move(temp_path, file_path)print(f"✅ 文件写入完成: {file_path}")def detect_encoding(self, file_path: str) -> str:"""检测文件编码 - 就像识别不同语言的书籍Args:file_path: 文件路径Returns:检测到的编码格式"""# 常见编码列表encodings = ['utf-8', 'gbk', 'gb2312', 'utf-16', 'ascii', 'latin-1']for encoding in encodings:try:with open(file_path, 'r', encoding=encoding) as file:file.read(1024) # 读取前1KB进行测试return encodingexcept UnicodeDecodeError:continuereturn 'utf-8' # 默认返回UTF-8def convert_encoding(self, input_path: str, output_path: str,source_encoding: str, target_encoding: str = 'utf-8'):"""转换文件编码 - 就像翻译不同语言的书籍Args:input_path: 输入文件路径output_path: 输出文件路径source_encoding: 源编码target_encoding: 目标编码"""try:with open(input_path, 'r', encoding=source_encoding) as source_file:content = source_file.read()with open(output_path, 'w', encoding=target_encoding) as target_file:target_file.write(content)print(f"✅ 编码转换完成: {source_encoding} → {target_encoding}")except UnicodeDecodeError as e:print(f"❌ 编码转换失败: {e}")raisedef compress_text_file(self, input_path: str, output_path: str):"""压缩文本文件 - 就像把书籍装进压缩袋节省空间Args:input_path: 输入文件路径output_path: 输出压缩文件路径"""with open(input_path, 'rb') as input_file:with gzip.open(output_path, 'wb') as output_file:shutil.copyfileobj(input_file, output_file)# 计算压缩率original_size = os.path.getsize(input_path)compressed_size = os.path.getsize(output_path)ratio = (1 - compressed_size / original_size) * 100print(f"✅ 文件压缩完成")print(f" 原始大小: {self._format_size(original_size)}")print(f" 压缩大小: {self._format_size(compressed_size)}")print(f" 压缩率: {ratio:.1f}%")def _format_size(self, size_bytes: int) -> str:"""格式化文件大小"""for unit in ['B', 'KB', 'MB', 'GB']:if size_bytes < 1024:return f"{size_bytes:.1f} {unit}"size_bytes /= 1024return f"{size_bytes:.1f} TB"class LogAnalyzer:"""日志分析工具 - 像专业的数据分析师一样处理日志文件"""def __init__(self):self.processor = AdvancedFileProcessor()self.stats = {'total_lines': 0,'error_lines': 0,'warning_lines': 0,'info_lines': 0,'ip_addresses': defaultdict(int),'status_codes': defaultdict(int),'timestamps': []}def analyze_log_file(self, log_path: str) -> dict:"""分析日志文件 - 就像研究图书借阅记录找出规律Args:log_path: 日志文件路径Returns:分析结果字典"""print(f"📊 开始分析日志文件: {log_path}")# 重置统计信息self.stats = {'total_lines': 0,'error_lines': 0,'warning_lines': 0,'info_lines': 0,'ip_addresses': defaultdict(int),'status_codes': defaultdict(int),'timestamps': []}# 逐行分析日志for line in self.processor.read_lines_efficiently(log_path):self._analyze_line(line)self.stats['total_lines'] += 1if self.stats['total_lines'] % 10000 == 0:print(f" 已分析 {self.stats['total_lines']} 行...")# 生成分析报告return self._generate_analysis_report()def _analyze_line(self, line: str):"""分析单行日志"""line_lower = line.lower()# 统计日志级别if 'error' in line_lower:self.stats['error_lines'] += 1elif 'warning' in line_lower or 'warn' in line_lower:self.stats['warning_lines'] += 1elif 'info' in line_lower:self.stats['info_lines'] += 1# 提取IP地址(简单正则匹配)import reip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'ips = re.findall(ip_pattern, line)for ip in ips:self.stats['ip_addresses'][ip] += 1# 提取HTTP状态码status_pattern = r'\b[1-5][0-9]{2}\b'statuses = re.findall(status_pattern, line)for status in statuses:self.stats['status_codes'][status] += 1def _generate_analysis_report(self) -> dict:"""生成分析报告"""report = {'summary': {'total_lines': self.stats['total_lines'],'error_lines': self.stats['error_lines'],'warning_lines': self.stats['warning_lines'],'info_lines': self.stats['info_lines'],'error_rate': self.stats['error_lines'] / max(self.stats['total_lines'], 1) * 100},'top_ips': dict(sorted(self.stats['ip_addresses'].items(),key=lambda x: x[1], reverse=True)[:10]),'status_codes': dict(self.stats['status_codes'])}print(f"✅ 日志分析完成")print(f" 总行数: {report['summary']['total_lines']}")print(f" 错误行数: {report['summary']['error_lines']}")print(f" 错误率: {report['summary']['error_rate']:.2f}%")return reportdef save_analysis_report(self, report: dict, output_path: str):"""保存分析报告到文件"""with open(output_path, 'w', encoding='utf-8') as f:json.dump(report, f, indent=2, ensure_ascii=False)print(f"📄 分析报告已保存到: {output_path}")# 使用示例def demo_file_processing():"""文件处理演示"""print("=== 高级文件处理演示 ===\n")processor = AdvancedFileProcessor()# 1. 创建演示文件print("1. 创建演示文件")demo_content = """这是第一行内容这是第二行内容,包含中文字符This is the third line with English这是第四行,测试编码处理最后一行内容"""with open("demo.txt", "w", encoding="utf-8") as f:f.write(demo_content)print(" ✅ 演示文件创建完成")# 2. 逐行读取文件print("\n2. 逐行读取文件")for i, line in enumerate(processor.read_lines_efficiently("demo.txt"), 1):print(f" 第{i}行: {line}")# 3. 检测文件编码print("\n3. 检测文件编码")encoding = processor.detect_encoding("demo.txt")print(f" 检测到的编码: {encoding}")# 4. 安全写入文件print("\n4. 安全写入文件(带备份)")new_content = demo_content + "\n这是新添加的内容"processor.write_with_backup("demo.txt", new_content)# 5. 压缩文件print("\n5. 压缩文件")processor.compress_text_file("demo.txt", "demo.txt.gz")print("\n✅ 文件处理演示完成")if __name__ == "__main__":demo_file_processing()
13.3 数据序列化与格式转换
🎓 核心概念:数据的"冷冻"与"解冻"
数据序列化就像食物的保存过程:
- 序列化 = 把新鲜食物放进冷冻室保存(将数据转换为可存储的格式)
- 反序列化 = 把冷冻食物解冻使用(将存储的数据恢复为程序可用的对象)
- 不同格式 = 不同的保存方法(冷冻、腌制、晒干等)
📝 通用数据转换系统
import jsonimport pickleimport csvimport xml.etree.ElementTree as ETimport configparserimport yaml # 需要安装: pip install pyyamlfrom dataclasses import dataclass, asdictfrom typing import Any, Dict, List, Unionfrom datetime import datetime, dateimport sqlite3@dataclassclass Student:"""学生信息类 - 演示对象序列化"""id: intname: strage: intgrades: List[float]enrollment_date: strdef __post_init__(self):"""初始化后处理"""if isinstance(self.enrollment_date, str):self.enrollment_date = datetime.strptime(self.enrollment_date, "%Y-%m-%d")class UniversalDataConverter:"""通用数据转换器 - 像万能翻译官一样处理各种数据格式"""def __init__(self):self.supported_formats = ['json', 'pickle', 'csv', 'xml', 'yaml', 'ini']print("🔄 通用数据转换器初始化完成")print(f" 支持格式: {', '.join(self.supported_formats)}")def save_data(self, data: Any, file_path: str, format_type: str = None):"""保存数据到文件 - 选择最适合的保存方法Args:data: 要保存的数据file_path: 文件路径format_type: 格式类型,如果不指定则从文件扩展名推断"""if format_type is None:format_type = self._detect_format_from_path(file_path)print(f"💾 保存数据到 {file_path} (格式: {format_type})")try:if format_type == 'json':self._save_json(data, file_path)elif format_type == 'pickle':self._save_pickle(data, file_path)elif format_type == 'csv':self._save_csv(data, file_path)elif format_type == 'xml':self._save_xml(data, file_path)elif format_type == 'yaml':self._save_yaml(data, file_path)elif format_type == 'ini':self._save_ini(data, file_path)else:raise ValueError(f"不支持的格式: {format_type}")print(f"✅ 数据保存成功")except Exception as e:print(f"❌ 数据保存失败: {e}")raisedef load_data(self, file_path: str, format_type: str = None) -> Any:"""从文件加载数据 - 选择正确的解冻方法Args:file_path: 文件路径format_type: 格式类型,如果不指定则从文件扩展名推断Returns:加载的数据"""if format_type is None:format_type = self._detect_format_from_path(file_path)print(f"📂 从 {file_path} 加载数据 (格式: {format_type})")try:if format_type == 'json':data = self._load_json(file_path)elif format_type == 'pickle':data = self._load_pickle(file_path)elif format_type == 'csv':data = self._load_csv(file_path)elif format_type == 'xml':data = self._load_xml(file_path)elif format_type == 'yaml':data = self._load_yaml(file_path)elif format_type == 'ini':data = self._load_ini(file_path)else:raise ValueError(f"不支持的格式: {format_type}")print(f"✅ 数据加载成功")return dataexcept Exception as e:print(f"❌ 数据加载失败: {e}")raisedef convert_format(self, input_path: str, output_path: str,input_format: str = None, output_format: str = None):"""格式转换 - 像翻译官一样在不同格式间转换Args:input_path: 输入文件路径output_path: 输出文件路径input_format: 输入格式output_format: 输出格式"""print(f"🔄 格式转换: {input_path} → {output_path}")# 加载原始数据data = self.load_data(input_path, input_format)# 保存为新格式self.save_data(data, output_path, output_format)print(f"✅ 格式转换完成")def _detect_format_from_path(self, file_path: str) -> str:"""从文件路径检测格式"""extension = Path(file_path).suffix.lower()format_map = {'.json': 'json','.pkl': 'pickle','.pickle': 'pickle','.csv': 'csv','.xml': 'xml','.yaml': 'yaml','.yml': 'yaml','.ini': 'ini','.cfg': 'ini'}return format_map.get(extension, 'json')def _save_json(self, data: Any, file_path: str):"""保存JSON格式"""def json_serializer(obj):"""JSON序列化器 - 处理特殊对象"""if isinstance(obj, datetime):return obj.isoformat()elif isinstance(obj, date):return obj.isoformat()elif hasattr(obj, '__dict__'):return asdict(obj) if hasattr(obj, '__dataclass_fields__') else obj.__dict__raise TypeError(f"无法序列化对象类型: {type(obj)}")with open(file_path, 'w', encoding='utf-8') as f:json.dump(data, f, indent=2, ensure_ascii=False, default=json_serializer)def _load_json(self, file_path: str) -> Any:"""加载JSON格式"""with open(file_path, 'r', encoding='utf-8') as f:return json.load(f)def _save_pickle(self, data: Any, file_path: str):"""保存Pickle格式"""with open(file_path, 'wb') as f:pickle.dump(data, f)def _load_pickle(self, file_path: str) -> Any:"""加载Pickle格式"""with open(file_path, 'rb') as f:return pickle.load(f)def _save_csv(self, data: Any, file_path: str):"""保存CSV格式"""if not isinstance(data, list):raise ValueError("CSV格式需要列表类型的数据")if not data:return# 获取字段名if isinstance(data[0], dict):fieldnames = data[0].keys()elif hasattr(data[0], '__dict__'):fieldnames = data[0].__dict__.keys()elif hasattr(data[0], '__dataclass_fields__'):fieldnames = data[0].__dataclass_fields__.keys()else:raise ValueError("不支持的数据类型用于CSV格式")with open(file_path, 'w', newline='', encoding='utf-8') as f:writer = csv.DictWriter(f, fieldnames=fieldnames)writer.writeheader()for item in data:if isinstance(item, dict):writer.writerow(item)elif hasattr(item, '__dict__'):if hasattr(item, '__dataclass_fields__'):writer.writerow(asdict(item))else:writer.writerow(item.__dict__)def _load_csv(self, file_path: str) -> List[Dict]:"""加载CSV格式"""data = []with open(file_path, 'r', encoding='utf-8') as f:reader = csv.DictReader(f)for row in reader:data.append(dict(row))return datadef _save_yaml(self, data: Any, file_path: str):"""保存YAML格式"""with open(file_path, 'w', encoding='utf-8') as f:yaml.dump(data, f, default_flow_style=False, allow_unicode=True)def _load_yaml(self, file_path: str) -> Any:"""加载YAML格式"""with open(file_path, 'r', encoding='utf-8') as f:return yaml.safe_load(f)def _save_xml(self, data: Any, file_path: str):"""保存XML格式(简单实现)"""root = ET.Element("data")self._dict_to_xml(data, root)tree = ET.ElementTree(root)tree.write(file_path, encoding='utf-8', xml_declaration=True)def _dict_to_xml(self, data: Any, parent: ET.Element):"""将字典转换为XML"""if isinstance(data, dict):for key, value in data.items():element = ET.SubElement(parent, str(key))self._dict_to_xml(value, element)elif isinstance(data, list):for i, item in enumerate(data):element = ET.SubElement(parent, f"item_{i}")self._dict_to_xml(item, element)else:parent.text = str(data)def _load_xml(self, file_path: str) -> Dict:"""加载XML格式"""tree = ET.parse(file_path)root = tree.getroot()return self._xml_to_dict(root)def _xml_to_dict(self, element: ET.Element) -> Any:"""将XML转换为字典"""if len(element) == 0:return element.textresult = {}for child in element:if child.tag.startswith('item_'):# 处理列表项if 'list' not in result:result['list'] = []result['list'].append(self._xml_to_dict(child))else:result[child.tag] = self._xml_to_dict(child)return resultdef _save_ini(self, data: Dict, file_path: str):"""保存INI格式"""config = configparser.ConfigParser()for section_name, section_data in data.items():config.add_section(str(section_name))for key, value in section_data.items():config.set(str(section_name), str(key), str(value))with open(file_path, 'w', encoding='utf-8') as f:config.write(f)def _load_ini(self, file_path: str) -> Dict:"""加载INI格式"""config = configparser.ConfigParser()config.read(file_path, encoding='utf-8')result = {}for section_name in config.sections():result[section_name] = dict(config.items(section_name))return resultclass ConfigurationManager:"""配置管理系统 - 像管家一样管理应用的各种配置"""def __init__(self, config_dir: str = "./config"):self.config_dir = Path(config_dir)self.config_dir.mkdir(exist_ok=True)self.converter = UniversalDataConverter()self.configs = {}print(f"⚙️ 配置管理系统初始化完成")print(f" 配置目录: {self.config_dir}")def create_default_config(self, app_name: str) -> Dict:"""创建默认配置"""default_config = {"app": {"name": app_name,"version": "1.0.0","debug": False,"log_level": "INFO"},"database": {"host": "localhost","port": 5432,"name": f"{app_name}_db","user": "admin","password": "changeme"},"api": {"host": "0.0.0.0","port": 8000,"timeout": 30,"max_connections": 100},"cache": {"enabled": True,"ttl": 3600,"max_size": 1000},"logging": {"file": f"logs/{app_name}.log","max_size": "10MB","backup_count": 5,"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"}}return default_configdef save_config(self, app_name: str, config: Dict, format_type: str = 'yaml'):"""保存配置文件"""config_path = self.config_dir / f"{app_name}.{format_type}"self.converter.save_data(config, str(config_path), format_type)self.configs[app_name] = configprint(f"💾 配置已保存: {config_path}")def load_config(self, app_name: str, format_type: str = 'yaml') -> Dict:"""加载配置文件"""config_path = self.config_dir / f"{app_name}.{format_type}"if not config_path.exists():print(f"⚠️ 配置文件不存在,创建默认配置: {config_path}")config = self.create_default_config(app_name)self.save_config(app_name, config, format_type)else:config = self.converter.load_data(str(config_path), format_type)self.configs[app_name] = configreturn configdef get_config_value(self, app_name: str, key_path: str, default=None):"""获取配置值 - 支持点号分隔的路径Args:app_name: 应用名称key_path: 配置键路径,如 'database.host'default: 默认值"""if app_name not in self.configs:self.load_config(app_name)config = self.configs[app_name]keys = key_path.split('.')for key in keys:if isinstance(config, dict) and key in config:config = config[key]else:return defaultreturn configdef set_config_value(self, app_name: str, key_path: str, value: Any):"""设置配置值"""if app_name not in self.configs:self.load_config(app_name)config = self.configs[app_name]keys = key_path.split('.')# 导航到目标位置for key in keys[:-1]:if key not in config:config[key] = {}config = config[key]# 设置值config[keys[-1]] = value# 保存配置self.save_config(app_name, self.configs[app_name])print(f"⚙️ 配置已更新: {key_path} = {value}")# 使用示例def demo_data_serialization():"""数据序列化演示"""print("=== 数据序列化与格式转换演示 ===\n")# 创建演示数据students = [Student(1, "张三", 20, [85.5, 92.0, 78.5], "2023-09-01"),Student(2, "李四", 21, [88.0, 85.5, 91.0], "2023-09-01"),Student(3, "王五", 19, [92.5, 89.0, 94.5], "2023-09-01")]# 创建转换器converter = UniversalDataConverter()# 1. 保存为不同格式print("1. 保存为不同格式")# 转换为字典格式以便序列化students_dict = [asdict(student) for student in students]formats = ['json', 'yaml', 'csv']for fmt in formats:file_path = f"students.{fmt}"converter.save_data(students_dict, file_path, fmt)print(f" ✅ 保存为 {fmt} 格式: {file_path}")print()# 2. 加载并比较数据print("2. 加载并验证数据")for fmt in formats:file_path = f"students.{fmt}"loaded_data = converter.load_data(file_path, fmt)print(f" 📂 从 {fmt} 格式加载了 {len(loaded_data)} 条记录")print()# 3. 格式转换print("3. 格式转换演示")converter.convert_format("students.json", "students_converted.yaml", "json", "yaml")print(" ✅ JSON → YAML 转换完成")# 4. 配置管理演示print("\n4. 配置管理演示")config_manager = ConfigurationManager()# 创建应用配置app_config = config_manager.load_config("my_app")print(f" 📋 应用配置加载完成")# 获取配置值db_host = config_manager.get_config_value("my_app", "database.host")print(f" 🔍 数据库主机: {db_host}")# 修改配置值config_manager.set_config_value("my_app", "database.port", 3306)print(f" ⚙️ 数据库端口已更新")print("\n✅ 数据序列化演示完成")if __name__ == "__main__":demo_data_serialization()
13.4 数据库操作与持久化存储
🎓 核心概念:银行金库管理系统
数据库就像银行的金库管理 系统:
- 数据表 = 不同类型的保险箱(储蓄箱、贵重物品箱等)
- 记录 = 保险箱中的每一个物品
- 字段 = 物品的各种属性(价值、存放时间、所有者等)
- 事务 = 完整的存取操作(要么全部成功,要么全部失败)
📝 企业级数据库管理系统
import sqlite3from contextlib import contextmanagerfrom dataclasses import dataclassfrom typing import List, Optional, Any, Dict, Tuplefrom datetime import datetimeimport json@dataclassclass Transaction:"""交易记录类"""id: Optional[int] = Noneuser_id: int = 0amount: float = 0.0transaction_type: str = "" # income, expensecategory: str = ""description: str = ""date: str = ""def __post_init__(self):if not self.date:self.date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")class DatabaseManager:"""数据库管理器 - 像银行金库管理员一样管理数据"""def __init__(self, db_path: str = "finance.db"):self.db_path = db_pathself.init_database()print(f"🏦 数据库管理器初始化完成: {db_path}")@contextmanagerdef get_connection(self):"""获取数据库连接 - 安全的连接管理"""conn = Nonetry:conn = sqlite3.connect(self.db_path)conn.row_factory = sqlite3.Row # 启用字典式访问yield connexcept sqlite3.Error as e:if conn:conn.rollback()print(f"❌ 数据库操作失败: {e}")raisefinally:if conn:conn.close()def init_database(self):"""初始化数据库表结构"""with self.get_connection() as conn:# 创建用户表conn.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,username TEXT UNIQUE NOT NULL,email TEXT UNIQUE NOT NULL,password_hash TEXT NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')# 创建交易表conn.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY AUTOINCREMENT,user_id INTEGER NOT NULL,amount REAL NOT NULL,transaction_type TEXT NOT NULL CHECK (transaction_type IN ('income', 'expense')),category TEXT NOT NULL,description TEXT,date TIMESTAMP NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,FOREIGN KEY (user_id) REFERENCES users (id))''')# 创建分类表conn.execute('''CREATE TABLE IF NOT EXISTS categories (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT UNIQUE NOT NULL,type TEXT NOT NULL CHECK (type IN ('income', 'expense')),description TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')# 创建预算表conn.execute('''CREATE TABLE IF NOT EXISTS budgets (id INTEGER PRIMARY KEY AUTOINCREMENT,user_id INTEGER NOT NULL,category TEXT NOT NULL,amount REAL NOT NULL,period TEXT NOT NULL CHECK (period IN ('monthly', 'yearly')),start_date DATE NOT NULL,end_date DATE NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,FOREIGN KEY (user_id) REFERENCES users (id))''')conn.commit()print("✅ 数据库表结构初始化完成")def create_user(self, username: str, email: str, password_hash: str) -> int:"""创建用户 - 在银行开户"""with self.get_connection() as conn:cursor = conn.execute('INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)',(username, email, password_hash))conn.commit()user_id = cursor.lastrowidprint(f"👤 用户创建成功: {username} (ID: {user_id})")return user_iddef get_user(self, user_id: int) -> Optional[Dict]:"""获取用户信息"""with self.get_connection() as conn:cursor = conn.execute('SELECT * FROM users WHERE id = ?', (user_id,))row = cursor.fetchone()return dict(row) if row else Nonedef add_transaction(self, transaction: Transaction) -> int:"""添加交易记录 - 在金库中存放新物品"""with self.get_connection() as conn:cursor = conn.execute('''INSERT INTO transactions(user_id, amount, transaction_type, category, description, date)VALUES (?, ?, ?, ?, ?, ?)''', (transaction.user_id,transaction.amount,transaction.transaction_type,transaction.category,transaction.description,transaction.date))conn.commit()transaction_id = cursor.lastrowidprint(f"💰 交易记录添加成功 (ID: {transaction_id})")return transaction_iddef get_transactions(self, user_id: int, limit: int = 100) -> List[Dict]:"""获取交易记录"""with self.get_connection() as conn:cursor = conn.execute('''SELECT * FROM transactionsWHERE user_id = ?ORDER BY date DESCLIMIT ?''', (user_id, limit))return [dict(row) for row in cursor.fetchall()]def get_balance(self, user_id: int) -> Dict[str, float]:"""获取账户余额 - 查看金库总价值"""with self.get_connection() as conn:# 计算收入总额cursor = conn.execute('''SELECT COALESCE(SUM(amount), 0) as total_incomeFROM transactionsWHERE user_id = ? AND transaction_type = 'income'''', (user_id,))total_income = cursor.fetchone()['total_income']# 计算支出总额cursor = conn.execute('''SELECT COALESCE(SUM(amount), 0) as total_expenseFROM transactionsWHERE user_id = ? AND transaction_type = 'expense'''', (user_id,))total_expense = cursor.fetchone()['total_expense']balance = total_income - total_expensereturn {'total_income': total_income,'total_expense': total_expense,'balance': balance}def get_category_summary(self, user_id: int, days: int = 30) -> List[Dict]:"""获取分类汇总 - 按类别统计金库物品"""with self.get_connection() as conn:cursor = conn.execute('''SELECTcategory,transaction_type,COUNT(*) as transaction_count,SUM(amount) as total_amount,AVG(amount) as avg_amountFROM transactionsWHERE user_id = ?AND date >= datetime('now', '-{} days')GROUP BY category, transaction_typeORDER BY total_amount DESC'''.format(days), (user_id,))return [dict(row) for row in cursor.fetchall()]def create_budget(self, user_id: int, category: str, amount: float,period: str = 'monthly') -> int:"""创建预算 - 设置支出限额"""start_date = datetime.now().strftime('%Y-%m-%d')if period == 'monthly':# 计算月末日期end_date = datetime.now().replace(day=28).strftime('%Y-%m-%d')else: # yearlyend_date = datetime.now().replace(month=12, day=31).strftime('%Y-%m-%d')with self.get_connection() as conn:cursor = conn.execute('''INSERT INTO budgets (user_id, category, amount, period, start_date, end_date)VALUES (?, ?, ?, ?, ?, ?)''', (user_id, category, amount, period, start_date, end_date))conn.commit()budget_id = cursor.lastrowidprint(f"📊 预算创建成功: {category} - {amount} ({period})")return budget_iddef check_budget_status(self, user_id: int) -> List[Dict]:"""检查预算状态 - 监控支出是否超限"""with self.get_connection() as conn:cursor = conn.execute('''SELECTb.id,b.category,b.amount as budget_amount,b.period,COALESCE(SUM(t.amount), 0) as spent_amount,(b.amount - COALESCE(SUM(t.amount), 0)) as remaining_amount,CASEWHEN COALESCE(SUM(t.amount), 0) > b.amount THEN 'OVER'WHEN COALESCE(SUM(t.amount), 0) > b.amount * 0.8 THEN 'WARNING'ELSE 'OK'END as statusFROM budgets bLEFT JOIN transactions t ON (t.user_id = b.user_idAND t.category = b.categoryAND t.transaction_type = 'expense'AND t.date >= b.start_dateAND t.date <= b.end_date)WHERE b.user_id = ?GROUP BY b.id, b.category, b.amount, b.period''', (user_id,))return [dict(row) for row in cursor.fetchall()]def backup_database(self, backup_path: str):"""备份数据库 - 制作金库清单副本"""with self.get_connection() as conn:with open(backup_path, 'w', encoding='utf-8') as f:for line in conn.iterdump():f.write('%s\n' % line)print(f"💾 数据库备份完成: {backup_path}")def get_statistics(self, user_id: int) -> Dict:"""获取统计信息 - 生成金库管理报告"""balance_info = self.get_balance(user_id)category_summary = self.get_category_summary(user_id)budget_status = self.check_budget_status(user_id)with self.get_connection() as conn:# 获取交易数量cursor = conn.execute('SELECT COUNT(*) as total_transactions FROM transactions WHERE user_id = ?',(user_id,))total_transactions = cursor.fetchone()['total_transactions']# 获取最近交易cursor = conn.execute('''SELECT * FROM transactionsWHERE user_id = ?ORDER BY date DESCLIMIT 5''', (user_id,))recent_transactions = [dict(row) for row in cursor.fetchall()]return {'balance': balance_info,'total_transactions': total_transactions,'category_summary': category_summary,'budget_status': budget_status,'recent_transactions': recent_transactions,'generated_at': datetime.now().isoformat()}class PersonalFinanceApp:"""个人财务管理应用 - 完整的金库管理系统"""def __init__(self, db_path: str = "personal_finance.db"):self.db = DatabaseManager(db_path)self.current_user_id = Noneprint("💼 个人财务管理应用启动完成")def register_user(self, username: str, email: str, password: str) -> bool:"""用户注册"""try:# 简单的密码哈希(实际应用中应使用更安全的方法)password_hash = hashlib.sha256(password.encode()).hexdigest()user_id = self.db.create_user(username, email, password_hash)# 初始化默认分类self._init_default_categories()return Trueexcept Exception as e:print(f"❌ 用户注册失败: {e}")return Falsedef login(self, username: str, password: str) -> bool:"""用户登录"""password_hash = hashlib.sha256(password.encode()).hexdigest()with self.db.get_connection() as conn:cursor = conn.execute('SELECT id FROM users WHERE username = ? AND password_hash = ?',(username, password_hash))user = cursor.fetchone()if user:self.current_user_id = user['id']print(f"✅ 登录成功: {username}")return Trueelse:print("❌ 用户名或密码错误")return Falsedef add_income(self, amount: float, category: str, description: str = ""):"""添加收入"""if not self.current_user_id:print("❌ 请先登录")returntransaction = Transaction(user_id=self.current_user_id,amount=amount,transaction_type='income',category=category,description=description)self.db.add_transaction(transaction)print(f"💰 收入记录添加成功: +{amount} ({category})")def add_expense(self, amount: float, category: str, description: str = ""):"""添加支出"""if not self.current_user_id:print("❌ 请先登录")returntransaction = Transaction(user_id=self.current_user_id,amount=amount,transaction_type='expense',category=category,description=description)self.db.add_transaction(transaction)print(f"💸 支出记录添加成功: -{amount} ({category})")def show_balance(self):"""显示账户余额"""if not self.current_user_id:print("❌ 请先登录")returnbalance_info = self.db.get_balance(self.current_user_id)print("\n💰 账户余额信息:")print(f" 总收入: ¥{balance_info['total_income']:,.2f}")print(f" 总支出: ¥{balance_info['total_expense']:,.2f}")print(f" 当前余额: ¥{balance_info['balance']:,.2f}")def show_category_summary(self, days: int = 30):"""显示分类汇总"""if not self.current_user_id:print("❌ 请先登录")returnsummary = self.db.get_category_summary(self.current_user_id, days)print(f"\n📊 最近{days}天分类汇总:")for item in summary:type_symbol = "💰" if item['transaction_type'] == 'income' else "💸"print(f" {type_symbol} {item['category']}: ¥{item['total_amount']:,.2f} "f"({item['transaction_count']}笔)")def create_monthly_budget(self, category: str, amount: float):"""创建月度预算"""if not self.current_user_id:print("❌ 请先登录")returnself.db.create_budget(self.current_user_id, category, amount, 'monthly')def check_budgets(self):"""检查预算状态"""if not self.current_user_id:print("❌ 请先登录")returnbudget_status = self.db.check_budget_status(self.current_user_id)print("\n📊 预算状态:")for budget in budget_status:status_symbol = {'OK': '✅','WARNING': '⚠️','OVER': '❌'}.get(budget['status'], '❓')print(f" {status_symbol} {budget['category']}: "f"¥{budget['spent_amount']:,.2f} / ¥{budget['budget_amount']:,.2f} "f"(剩余: ¥{budget['remaining_amount']:,.2f})")def generate_report(self) -> str:"""生成财务报告"""if not self.current_user_id:print("❌ 请先登录")return ""stats = self.db.get_statistics(self.current_user_id)report = f"""# 个人财务报告## 📊 账户概览- **总收入**: ¥{stats['balance']['total_income']:,.2f}- **总支出**: ¥{stats['balance']['total_expense']:,.2f}- **当前余额**: ¥{stats['balance']['balance']:,.2f}- **交易总数**: {stats['total_transactions']}## 💰 分类汇总"""for item in stats['category_summary']:type_name = "收入" if item['transaction_type'] == 'income' else "支出"report += f"- **{item['category']}** ({type_name}): ¥{item['total_amount']:,.2f}\n"report += "\n## 📊 预算状态\n"for budget in stats['budget_status']:status_text = {'OK': '正常','WARNING': '警告','OVER': '超支'}.get(budget['status'], '未知')report += f"- **{budget['category']}**: {status_text} "report += f"(¥{budget['spent_amount']:,.2f} / ¥{budget['budget_amount']:,.2f})\n"report += f"\n---\n*报告生成时间: {stats['generated_at']}*\n"return reportdef _init_default_categories(self):"""初始化默认分类"""default_categories = [('工资', 'income'),('奖金', 'income'),('投资收益', 'income'),('餐饮', 'expense'),('交通', 'expense'),('购物', 'expense'),('娱乐', 'expense'),('医疗', 'expense'),('教育', 'expense'),('住房', 'expense')]with self.db.get_connection() as conn:for name, category_type in default_categories:try:conn.execute('INSERT OR IGNORE INTO categories (name, type) VALUES (?, ?)',(name, category_type))except sqlite3.Error:pass # 忽略重复插入错误conn.commit()# 使用示例def demo_database_operations():"""数据库操作演示"""print("=== 个人财务管理系统演示 ===\n")# 创建应用实例app = PersonalFinanceApp("demo_finance.db")# 1. 用户注册和登录print("1. 用户注册和登录")app.register_user("张三", "zhangsan@example.com", "password123")app.login("张三", "password123")print()# 2. 添加收入记录print("2. 添加收入记录")app.add_income(8000, "工资", "2025年1月工资")app.add_income(1000, "奖金", "项目奖金")print()# 3. 添加支出记录print("3. 添加支出记录")app.add_expense(1200, "餐饮", "日常用餐")app.add_expense(800, "交通", "地铁公交费用")app.add_expense(2000, "购物", "日用品采购")print()# 4. 查看余额print("4. 查看账户余额")app.show_balance()print()# 5. 查看分类汇总print("5. 查看分类汇总")app.show_category_summary()print()# 6. 创建预算print("6. 创建月度预算")app.create_monthly_budget("餐饮", 1500)app.create_monthly_budget("交通", 1000)print()# 7. 检查预算状态print("7. 检查预算状态")app.check_budgets()print()# 8. 生成报告print("8. 生成财务报告")report = app.generate_report()# 保存报告到文件with open("finance_report.md", "w", encoding="utf-8") as f:f.write(report)print("📄 财务报告已保存到 finance_report.md")# 9. 数据库备份print("\n9. 数据库备份")app.db.backup_database("finance_backup.sql")print("\n✅ 数据库操作演示完成")if __name__ == "__main__":demo_database_operations()
🏋️♀️ 实践练习
📝 基础练习(Level 1-2)
练习1:文件信息统计器
创建一个程序,统计指定目录下所有文件的信息:
- 文件总数和目录总数
- 各种文件类型的数量
- 总占用空间
- 最大和最小的文件
# 参考实现思路def analyze_directory(path):"""分析目录统计信息"""# 1. 遍历目录# 2. 统计文件类型和大小# 3. 生成统计报告pass
练习2:文本文件合并工具
编写程序将多个文本文件合并为一个文件:
- 支持指定文件顺序
- 在合并时添加文件分隔标识
- 处理不同编码的文件
练习3:CSV数据处理器
创建一个CSV数据处理工具:
- 读取CSV文件并显示基本统计信息
- 支持数据筛选和排序
- 将处理结果保存为新的CSV文件
🚀 进阶练习(Level 3-4)
练习4:智能备份系统
设计一个智能备份系统:
- 支持增量备份(只备份修改过的文件)
- 压缩备份文件以节省空间
- 生成备份报告和恢复脚本
练习5:多格式数据转换器
扩展UniversalDataConverter类:
- 添加对Excel文件的支持
- 实现数据验证功能
- 支持批量转换多个文件
练习6:个人知识库系统
基于SQLite创建个人知识库:
- 支持文章的分类和标签
- 实现全文搜索功能
- 提供数据导入导出功能
🎯 挑战练 习(Level 5)
练习7:分布式文件同步工具
创建一个文件同步工具:
- 监控文件夹变化
- 支持多设备间同步
- 处理文件冲突和版本控制
练习8:数据库迁移工具
开发数据库迁移工具:
- 支持不同数据库间的数据迁移
- 自动处理数据类型转换
- 提供迁移进度和错误报告
📚 本章总结
🎯 核心知识回顾
通过本章学习,我们掌握了Python中数据持久化的完整技能体系:
1. 文件系统操作 (⭐⭐⭐⭐⭐)
- pathlib模块: 现代化的路径操作方式
- 文件属性管理: 获取和修改文件的各种属性
- 目录操作: 创建、删除、遍历目录结构
- 文件搜索: 使用glob模式进行高效文件查找
2. 文件读写处理 (⭐⭐⭐⭐⭐)
- 编码处理: 正确处理UTF-8、GBK等不同编码
- 性能优化: 使用缓冲和流式处理大文件
- 安全操作: 使用上下文管理器确保资源释放
- 错误处理: 优雅处理文件操作中的各种异常
3. 数据序列化 (⭐⭐⭐⭐⭐)
- JSON格式: 轻量级、跨语言的数据交换格式
- Pickle序列化: Python对象的完整状态保存
- CSV处理: 结构化表格数据的标准格式
- 配置管理: YAML、INI等配置文件的处理