跳到主要内容

第44章:数据管道与ETL流程

🌟 章节导入:走进数据工程工厂

亲爱的朋友们,欢迎来到我们的数据工程工厂!这是一个充满数据魅力的智能化数据处理中心,在这里,我们将见证原始数据如何通过ETL流程,转化为有价值的数据资产,就像将原材料加工成精品的现代化工厂一样。

🏭 数据工程工厂全景

想象一下,你正站在一个现代化的数据处理园区门口,眼前是四座风格迥异但又紧密相连的建筑群:

📊 数据工程基础中心

这是我们的第一站,一座标准化的数据工程基础中心。在这里:

  • 数据架构设计室里,架构师们正在设计数据流转的架构蓝图
  • 数据质量管理部的专家们专注于确保数据的准确性和完整性
  • 元数据管理中心如同专业的档案室,管理着所有数据的元信息

🔄 ETL流程车间

这座建筑闪烁着蓝色的光芒,象征着自动化的数据处理流水线

  • 数据抽取工段里,工程师们正在从各种数据源抽取数据
  • 数据转换工段的专家们对数据进行清洗、转换和标准化
  • 数据加载工段负责将处理好的数据加载到目标系统

⏰ 工作流调度中心

这是一座充满节奏感的智能调度中心

  • 任务编排系统如同工厂的生产调度系统,管理着所有数据处理任务
  • 依赖管理引擎确保任务按照正确的顺序执行
  • 错误处理机制自动处理异常情况,确保流程稳定运行

🏢 数据仓库系统

最令人兴奋的是这座未来感十足的数据仓库大厦

  • 维度建模设计如同仓库的货架布局,优化数据存储和查询
  • 增量数据处理确保只处理新增和变更的数据,提高效率
  • 数据血缘追踪记录数据的来源和流转路径,确保可追溯性

🚀 技术革命的见证者

在这个数据工程工厂,我们将见证数据处理的三大革命:

📊 数据工程革命

从原始数据到数据资产,我们将掌握:

  • 数据架构设计和规划
  • 数据质量管理和保障
  • 元数据管理和治理

🔄 ETL流程革命

从手动处理到自动化ETL,我们将实现:

  • 高效的数据抽取技术
  • 智能的数据转换规则
  • 可靠的数据加载策略

⏰ 调度自动化革命

从手动执行到自动化调度,我们将建立:

  • 灵活的工作流编排
  • 智能的依赖管理
  • 完善的错误处理

🎯 学以致用的企业级项目

在本章的最后,我们将综合运用所学的所有技术,构建一个完整的数据仓库系统。这不仅仅是一个学习项目,更是一个具备实际商业价值的企业级应用:

  • 企业应用可以基于这个系统,实现数据的集中管理和分析
  • 数据分析团队可以利用这个系统,进行高效的数据分析
  • 业务团队可以基于这个系统,获得准确的数据支持
  • 技术管理者可以利用这个系统,全面了解数据资产状况

🔥 准备好了吗?

现在,让我们戴上安全帽,穿上工作服,一起走进这个充满数据魅力的数据工程工厂。在这里,我们不仅要学习最前沿的数据处理技术,更要将这些技术转化为真正有价值的数据资产!

准备好迎接这场数据革命了吗?让我们开始这激动人心的学习之旅!


🎯 学习目标(SMART目标)

完成本章学习后,学生将能够:

📚 知识目标

  • 数据工程基础体系:深入理解数据架构设计、数据质量管理、元数据管理等核心概念
  • ETL流程技术:掌握数据抽取、数据转换、数据加载等ETL关键技术
  • 工作流调度技术:理解Airflow框架使用、任务依赖管理、错误处理机制等工作流调度技术
  • 数据仓库理念:综合运用维度建模、增量处理、数据血缘等数据仓库技术

🛠️ 技能目标

  • 数据架构设计能力:能够独立设计数据架构,实现数据的合理流转和存储
  • ETL流程开发能力:具备开发完整ETL流程的实战能力
  • 工作流调度能力:掌握Airflow等工具的使用,实现自动化数据管道
  • 数据仓库构建能力:能够构建企业级数据仓库系统,具备大规模数据处理工程实践能力

💡 素养目标

  • 数据工程思维:培养数据驱动的工程思维模式
  • 数据质量意识:建立数据质量管理和保障的意识
  • 自动化理念:注重数据处理的自动化和可维护性
  • 数据治理意识:理解数据治理和元数据管理的重要性

📝 知识导图


🎓 理论讲解

44.1 数据工程基础

想象一下,您走进了一家现代化的数据处理工厂。首先映入眼帘的是数据工程基础中心——这里的工程师们正在设计数据流转的架构蓝图,确保数据能够高效、可靠地从源头流向目标,就像工厂的生产线设计一样重要。

在数据处理的世界里,数据工程就是我们的"工厂设计图纸"。它定义了数据的来源、流转路径、存储方式和质量标准,确保数据处理的每个环节都有章可循。

📊 数据架构设计

数据架构是数据工程的基础:

# 示例1:数据架构设计系统
"""
数据架构设计
包含:
- 数据源识别
- 数据流转设计
- 存储架构设计
- 计算架构设计
"""
from typing import Dict, List, Set
from dataclasses import dataclass, field
from enum import Enum
class DataSourceType(Enum):
"""数据源类型"""
DATABASE = "数据库"
FILE = "文件"
API = "API接口"
STREAM = "数据流"
CLOUD_STORAGE = "云存储"
class DataStorageType(Enum):
"""存储类型"""
RELATIONAL_DB = "关系型数据库"
NOSQL_DB = "NoSQL数据库"
DATA_WAREHOUSE = "数据仓库"
DATA_LAKE = "数据湖"
OBJECT_STORAGE = "对象存储"
@dataclass
class DataSource:
"""数据源定义"""
name: str
source_type: DataSourceType
connection_info: Dict
schema: Dict = None
update_frequency: str = "daily"
@dataclass
class DataFlow:
"""数据流定义"""
name: str
source: str
target: str
transformation: List[str] = field(default_factory=list)
frequency: str = "daily"
class DataArchitectureDesigner:
"""数据架构设计师"""
def __init__(self):
"""初始化架构设计师"""
self.data_sources: Dict[str, DataSource] = {}
self.data_flows: List[DataFlow] = []
self.storage_systems: Dict[str, DataStorageType] = {}
print("📊 数据架构设计师启动成功!")
def identify_data_sources(self) -> Dict[str, DataSource]:
"""识别数据源"""
print("\n" + "="*60)
print("🔍 数据源识别")
print("="*60)
sources = {
"用户数据库": DataSource(
name="用户数据库",
source_type=DataSourceType.DATABASE,
connection_info={
"type": "PostgreSQL",
"host": "user-db.example.com",
"database": "userdb"
},
update_frequency="real-time"
),
"订单API": DataSource(
name="订单API",
source_type=DataSourceType.API,
connection_info={
"url": "https://api.example.com/orders",
"auth": "OAuth2"
},
update_frequency="hourly"
),
"日志文件": DataSource(
name="日志文件",
source_type=DataSourceType.FILE,
connection_info={
"path": "/var/log/app",
"format": "json"
},
update_frequency="daily"
),
"实时数据流": DataSource(
name="实时数据流",
source_type=DataSourceType.STREAM,
connection_info={
"type": "Kafka",
"topics": ["user-events", "order-events"]
},
update_frequency="real-time"
)
}
for name, source in sources.items():
self.data_sources[name] = source
print(f"\n{name}:")
print(f" 类型: {source.source_type.value}")
print(f" 更新频率: {source.update_frequency}")
return sources
def design_data_flow(self) -> List[DataFlow]:
"""设计数据流"""
print("\n" + "="*60)
print("🔄 数据流设计")
print("="*60)
flows = [
DataFlow(
name="用户数据流",
source="用户数据库",
target="数据仓库",
transformation=["数据清洗", "标准化", "去重"],
frequency="daily"
),
DataFlow(
name="订单数据流",
source="订单API",
target="数据仓库",
transformation=["格式转换", "数据验证", "聚合计算"],
frequency="hourly"
),
DataFlow(
name="日志数据流",
source="日志文件",
target="数据湖",
transformation=["解析", "结构化", "分区"],
frequency="daily"
),
DataFlow(
name="实时事件流",
source="实时数据流",
target="实时分析系统",
transformation=["过滤", "转换", "窗口聚合"],
frequency="real-time"
)
]
self.data_flows = flows
print("📋 数据流列表:")
for flow in flows:
print(f"\n{flow.name}:")
print(f" 源: {flow.source}")
print(f" 目标: {flow.target}")
print(f" 转换: {', '.join(flow.transformation)}")
print(f" 频率: {flow.frequency}")
return flows
def design_storage_architecture(self) -> Dict:
"""设计存储架构"""
print("\n" + "="*60)
print("💾 存储架构设计")
print("="*60)
architecture = {
"数据仓库": {
"type": DataStorageType.DATA_WAREHOUSE,
"technology": "Snowflake/Redshift",
"purpose": "结构化数据,OLAP分析",
"data_types": ["用户数据", "订单数据", "产品数据"]
},
"数据湖": {
"type": DataStorageType.DATA_LAKE,
"technology": "S3/HDFS",
"purpose": "原始数据,大数据分析",
"data_types": ["日志数据", "原始文件", "非结构化数据"]
},
"操作数据库": {
"type": DataStorageType.RELATIONAL_DB,
"technology": "PostgreSQL",
"purpose": "事务处理,实时查询",
"data_types": ["用户信息", "订单信息"]
},
"缓存层": {
"type": DataStorageType.NOSQL_DB,
"technology": "Redis",
"purpose": "快速访问,会话存储",
"data_types": ["热点数据", "会话数据"]
}
}
print("📦 存储架构:")
for name, config in architecture.items():
print(f"\n{name}:")
print(f" 类型: {config['type'].value}")
print(f" 技术: {config['technology']}")
print(f" 用途: {config['purpose']}")
print(f" 数据类型: {', '.join(config['data_types'])}")
return architecture
# 运行演示
if __name__ == "__main__":
designer = DataArchitectureDesigner()
# 识别数据源
sources = designer.identify_data_sources()
# 设计数据流
flows = designer.design_data_flow()
# 设计存储架构
architecture = designer.design_storage_architecture()

运行结果:

📊 数据架构设计师启动成功!

============================================================
🔍 数据源识别
============================================================

用户数据库:
类型: 数据库
更新频率: real-time

订单API:
类型: API接口
更新频率: hourly
...

数据质量管理

数据质量是数据工程的核心:

# 示例2:数据质量管理系统
"""
数据质量管理
包含:
- 数据质量维度
- 质量检测规则
- 质量修复策略
- 质量监控告警
"""
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class QualityDimension(Enum):
"""数据质量维度"""
COMPLETENESS = "完整性"
ACCURACY = "准确性"
CONSISTENCY = "一致性"
TIMELINESS = "及时性"
VALIDITY = "有效性"
UNIQUENESS = "唯一性"
@dataclass
class QualityRule:
"""质量规则"""
name: str
dimension: QualityDimension
rule_type: str # check, fix, alert
rule_definition: str
threshold: float = 0.95
@dataclass
class QualityReport:
"""质量报告"""
data_source: str
timestamp: datetime
overall_score: float
dimension_scores: Dict[QualityDimension, float]
issues: List[Dict]
class DataQualityManager:
"""数据质量管理器"""
def __init__(self):
"""初始化质量管理器"""
self.quality_rules: List[QualityRule] = []
self.quality_reports: List[QualityReport] = []
print("✅ 数据质量管理器启动成功!")
def define_quality_rules(self) -> List[QualityRule]:
"""定义质量规则"""
print("\n" + "="*60)
print("📋 数据质量规则定义")
print("="*60)
rules = [
QualityRule(
name="完整性检查-用户ID",
dimension=QualityDimension.COMPLETENESS,
rule_type="check",
rule_definition="user_id IS NOT NULL",
threshold=1.0
),
QualityRule(
name="准确性检查-邮箱格式",
dimension=QualityDimension.ACCURACY,
rule_type="check",
rule_definition="email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$'",
threshold=0.95
),
QualityRule(
name="唯一性检查-用户ID",
dimension=QualityDimension.UNIQUENESS,
rule_type="check",
rule_definition="COUNT(DISTINCT user_id) = COUNT(user_id)",
threshold=1.0
),
QualityRule(
name="及时性检查-数据延迟",
dimension=QualityDimension.TIMELINESS,
rule_type="alert",
rule_definition="CURRENT_TIMESTAMP - update_time > INTERVAL '24 hours'",
threshold=0.9
)
]
self.quality_rules = rules
print("📋 质量规则列表:")
for rule in rules:
print(f"\n{rule.name}:")
print(f" 维度: {rule.dimension.value}")
print(f" 类型: {rule.rule_type}")
print(f" 阈值: {rule.threshold*100:.0f}%")
return rules
def check_data_quality(self, data_source: str, data: List[Dict]) -> QualityReport:
"""检查数据质量"""
print(f"\n🔍 检查数据质量: {data_source}")
dimension_scores = {}
issues = []
# 完整性检查
total_records = len(data)
if total_records > 0:
complete_records = sum(1 for record in data if record.get('user_id'))
completeness = complete_records / total_records
dimension_scores[QualityDimension.COMPLETENESS] = completeness
if completeness < 1.0:
issues.append({
"dimension": QualityDimension.COMPLETENESS.value,
"issue": f"缺失user_id的记录: {total_records - complete_records}条",
"severity": "high"
})
# 准确性检查(邮箱格式)
if total_records > 0:
import re
email_pattern = r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$'
valid_emails = sum(1 for record in data
if record.get('email') and re.match(email_pattern, record.get('email', '')))
accuracy = valid_emails / total_records if 'email' in data[0] else 1.0
dimension_scores[QualityDimension.ACCURACY] = accuracy
if accuracy < 0.95:
issues.append({
"dimension": QualityDimension.ACCURACY.value,
"issue": f"邮箱格式错误: {total_records - valid_emails}条",
"severity": "medium"
})
# 计算总体质量分数
overall_score = sum(dimension_scores.values()) / len(dimension_scores) if dimension_scores else 0.0
report = QualityReport(
data_source=data_source,
timestamp=datetime.now(),
overall_score=overall_score,
dimension_scores=dimension_scores,
issues=issues
)
self.quality_reports.append(report)
print(f"\n📊 质量报告:")
print(f" 总体分数: {overall_score*100:.1f}%")
for dimension, score in dimension_scores.items():
print(f" {dimension.value}: {score*100:.1f}%")
if issues:
print(f"\n⚠️ 发现 {len(issues)} 个问题:")
for issue in issues:
print(f" - [{issue['severity']}] {issue['dimension']}: {issue['issue']}")
return report
def fix_data_quality_issues(self, report: QualityReport) -> Dict:
"""修复数据质量问题"""
print(f"\n🔧 修复数据质量问题: {report.data_source}")
fixes_applied = []
for issue in report.issues:
if issue['dimension'] == QualityDimension.COMPLETENESS.value:
print(f" 修复: 填充缺失的user_id")
fixes_applied.append("填充缺失值")
elif issue['dimension'] == QualityDimension.ACCURACY.value:
print(f" 修复: 清理无效的邮箱格式")
fixes_applied.append("数据清洗")
return {
"data_source": report.data_source,
"fixes_applied": fixes_applied,
"fixed_at": datetime.now()
}
# 运行演示
if __name__ == "__main__":
manager = DataQualityManager()
# 定义质量规则
rules = manager.define_quality_rules()
# 检查数据质量
sample_data = [
{"user_id": "1", "email": "user1@example.com", "name": "User 1"},
{"user_id": "2", "email": "user2@example.com", "name": "User 2"},
{"user_id": None, "email": "invalid-email", "name": "User 3"}, # 质量问题
{"user_id": "4", "email": "user4@example.com", "name": "User 4"}
]
report = manager.check_data_quality("用户数据", sample_data)
# 修复质量问题
if report.issues:
manager.fix_data_quality_issues(report)

元数据管理系统

元数据是数据的"档案":

# 示例3:元数据管理系统
"""
元数据管理
包含:
- 元数据分类
- 元数据采集
- 元数据存储
- 元数据查询
"""
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class MetadataType(Enum):
"""元数据类型"""
TECHNICAL = "技术元数据"
BUSINESS = "业务元数据"
OPERATIONAL = "操作元数据"
LINEAGE = "血缘元数据"
@dataclass
class Metadata:
"""元数据定义"""
name: str
metadata_type: MetadataType
description: str
schema: Dict = None
source: str = None
target: str = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
tags: List[str] = field(default_factory=list)
class MetadataManager:
"""元数据管理器"""
def __init__(self):
"""初始化元数据管理器"""
self.metadata_registry: Dict[str, Metadata] = {}
print("📚 元数据管理器启动成功!")
def register_metadata(self, metadata: Metadata):
"""注册元数据"""
self.metadata_registry[metadata.name] = metadata
print(f"✅ 元数据已注册: {metadata.name} ({metadata.metadata_type.value})")
def search_metadata(self, keyword: str) -> List[Metadata]:
"""搜索元数据"""
results = []
keyword_lower = keyword.lower()
for name, metadata in self.metadata_registry.items():
if (keyword_lower in name.lower() or
keyword_lower in metadata.description.lower() or
any(keyword_lower in tag.lower() for tag in metadata.tags)):
results.append(metadata)
print(f"\n🔍 搜索 '{keyword}' 找到 {len(results)} 条元数据:")
for metadata in results:
print(f" - {metadata.name}: {metadata.description}")
return results
def get_lineage(self, data_asset: str) -> Dict:
"""获取数据血缘"""
print(f"\n🔗 获取数据血缘: {data_asset}")
# 模拟血缘关系
lineage = {
"asset": data_asset,
"upstream": [
{"name": "用户数据库", "type": "source"},
{"name": "订单API", "type": "source"}
],
"downstream": [
{"name": "数据仓库-用户表", "type": "target"},
{"name": "BI报表-用户分析", "type": "target"}
],
"transformations": [
"数据清洗",
"数据标准化",
"数据聚合"
]
}
print("📊 血缘关系:")
print(" 上游数据源:")
for upstream in lineage["upstream"]:
print(f" - {upstream['name']} ({upstream['type']})")
print(" 下游数据目标:")
for downstream in lineage["downstream"]:
print(f" - {downstream['name']} ({downstream['type']})")
print(" 转换过程:")
for transformation in lineage["transformations"]:
print(f" - {transformation}")
return lineage
# 运行演示
if __name__ == "__main__":
manager = MetadataManager()
# 注册元数据
user_table_metadata = Metadata(
name="用户表",
metadata_type=MetadataType.TECHNICAL,
description="存储用户基本信息的表",
schema={"user_id": "string", "email": "string", "name": "string"},
source="用户数据库",
target="数据仓库",
tags=["用户", "基础数据"]
)
manager.register_metadata(user_table_metadata)
# 搜索元数据
manager.search_metadata("用户")
# 获取血缘
manager.get_lineage("用户表")

44.2 ETL流程设计

欢迎来到我们数据工程工厂的第二站——ETL流程车间!这座现代化的车间专门负责数据的抽取、转换和加载,就像工厂的生产流水线,将原材料加工成成品。

🔄 数据抽取技术

数据抽取是ETL的第一步:

# 示例4:数据抽取系统
"""
数据抽取系统
包含:
- 全量抽取
- 增量抽取
- CDC变更捕获
- 数据源适配
"""
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class ExtractType(Enum):
"""抽取类型"""
FULL = "全量抽取"
INCREMENTAL = "增量抽取"
CDC = "变更数据捕获"
@dataclass
class ExtractConfig:
"""抽取配置"""
source_name: str
extract_type: ExtractType
table_name: str = None
query: str = None
timestamp_column: str = None
last_extract_time: datetime = None
class DataExtractor:
"""数据抽取器"""
def __init__(self):
"""初始化抽取器"""
self.extract_history: List[Dict] = []
print("🔄 数据抽取器启动成功!")
def full_extract(self, config: ExtractConfig) -> List[Dict]:
"""全量抽取"""
print(f"\n📥 全量抽取: {config.source_name}.{config.table_name}")
# 模拟全量抽取
query = f"SELECT * FROM {config.table_name}"
print(f" 执行查询: {query}")
# 模拟数据
data = [
{"id": 1, "name": "User 1", "email": "user1@example.com"},
{"id": 2, "name": "User 2", "email": "user2@example.com"},
{"id": 3, "name": "User 3", "email": "user3@example.com"}
]
print(f" ✅ 抽取完成: {len(data)} 条记录")
self.extract_history.append({
"type": "full",
"source": config.source_name,
"records": len(data),
"timestamp": datetime.now()
})
return data
def incremental_extract(self, config: ExtractConfig) -> List[Dict]:
"""增量抽取"""
print(f"\n📥 增量抽取: {config.source_name}.{config.table_name}")
if config.last_extract_time:
query = f"""
SELECT * FROM {config.table_name}
WHERE {config.timestamp_column} > '{config.last_extract_time}'
"""
print(f" 执行查询: {query}")
print(f" 上次抽取时间: {config.last_extract_time}")
else:
print(" ⚠️ 首次抽取,执行全量抽取")
return self.full_extract(config)
# 模拟增量数据
data = [
{"id": 4, "name": "User 4", "email": "user4@example.com", "updated_at": datetime.now()}
]
print(f" ✅ 增量抽取完成: {len(data)} 条新记录")
self.extract_history.append({
"type": "incremental",
"source": config.source_name,
"records": len(data),
"timestamp": datetime.now()
})
return data
def cdc_extract(self, config: ExtractConfig) -> List[Dict]:
"""变更数据捕获(CDC)"""
print(f"\n📥 CDC抽取: {config.source_name}.{config.table_name}")
print(" 监听数据库变更日志...")
# 模拟CDC变更
changes = [
{
"operation": "INSERT",
"data": {"id": 5, "name": "User 5", "email": "user5@example.com"},
"timestamp": datetime.now()
},
{
"operation": "UPDATE",
"data": {"id": 1, "name": "User 1 Updated", "email": "user1@example.com"},
"timestamp": datetime.now()
},
{
"operation": "DELETE",
"data": {"id": 2},
"timestamp": datetime.now()
}
]
print(f" ✅ 捕获到 {len(changes)} 个变更:")
for change in changes:
print(f" - {change['operation']}: {change['data']}")
return changes
# 运行演示
if __name__ == "__main__":
extractor = DataExtractor()
# 全量抽取
full_config = ExtractConfig(
source_name="用户数据库",
extract_type=ExtractType.FULL,
table_name="users"
)
extractor.full_extract(full_config)
# 增量抽取
incremental_config = ExtractConfig(
source_name="用户数据库",
extract_type=ExtractType.INCREMENTAL,
table_name="users",
timestamp_column="updated_at",
last_extract_time=datetime(2025, 1, 1)
)
extractor.incremental_extract(incremental_config)
# CDC抽取
cdc_config = ExtractConfig(
source_name="用户数据库",
extract_type=ExtractType.CDC,
table_name="users"
)
extractor.cdc_extract(cdc_config)

数据转换规则

数据转换是ETL的核心:

# 示例5:数据转换系统
"""
数据转换系统
包含:
- 数据清洗
- 数据标准化
- 数据计算
- 数据验证
"""
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TransformationRule:
"""转换规则"""
name: str
rule_type: str # clean, standardize, calculate, validate
function: Callable
description: str
class DataTransformer:
"""数据转换器"""
def __init__(self):
"""初始化转换器"""
self.transformation_rules: List[TransformationRule] = []
self._register_default_rules()
print("🔄 数据转换器启动成功!")
def _register_default_rules(self):
"""注册默认转换规则"""
# 数据清洗规则
self.transformation_rules.append(TransformationRule(
name="去除空值",
rule_type="clean",
function=self._remove_null_values,
description="移除包含空值的记录"
))
self.transformation_rules.append(TransformationRule(
name="去除重复",
rule_type="clean",
function=self._remove_duplicates,
description="基于主键去除重复记录"
))
# 数据标准化规则
self.transformation_rules.append(TransformationRule(
name="标准化邮箱",
rule_type="standardize",
function=self._standardize_email,
description="将邮箱转换为小写"
))
self.transformation_rules.append(TransformationRule(
name="标准化日期",
rule_type="standardize",
function=self._standardize_date,
description="统一日期格式为YYYY-MM-DD"
))
# 数据计算规则
self.transformation_rules.append(TransformationRule(
name="计算年龄",
rule_type="calculate",
function=self._calculate_age,
description="根据出生日期计算年龄"
))
def _remove_null_values(self, data: List[Dict], key: str) -> List[Dict]:
"""去除空值"""
return [record for record in data if record.get(key) is not None]
def _remove_duplicates(self, data: List[Dict], key: str) -> List[Dict]:
"""去除重复"""
seen = set()
result = []
for record in data:
if record.get(key) not in seen:
seen.add(record.get(key))
result.append(record)
return result
def _standardize_email(self, data: List[Dict]) -> List[Dict]:
"""标准化邮箱"""
for record in data:
if 'email' in record and record['email']:
record['email'] = record['email'].lower().strip()
return data
def _standardize_date(self, data: List[Dict], date_column: str) -> List[Dict]:
"""标准化日期"""
for record in data:
if date_column in record and record[date_column]:
# 简化处理,实际需要更复杂的日期解析
if isinstance(record[date_column], str):
record[date_column] = record[date_column][:10] # 取前10个字符
return data
def _calculate_age(self, data: List[Dict], birth_date_column: str) -> List[Dict]:
"""计算年龄"""
current_year = datetime.now().year
for record in data:
if birth_date_column in record and record[birth_date_column]:
# 简化处理,实际需要更复杂的日期计算
birth_year = int(str(record[birth_date_column])[:4])
record['age'] = current_year - birth_year
return data
def transform(self, data: List[Dict], rules: List[str] = None) -> List[Dict]:
"""执行数据转换"""
print(f"\n🔄 开始数据转换: {len(data)} 条记录")
transformed_data = data.copy()
# 应用所有规则或指定规则
rules_to_apply = [r for r in self.transformation_rules
if rules is None or r.name in rules]
for rule in rules_to_apply:
print(f" 应用规则: {rule.name} ({rule.rule_type})")
if rule.rule_type == "clean":
if "去除空值" in rule.name:
transformed_data = rule.function(transformed_data, "id")
elif "去除重复" in rule.name:
transformed_data = rule.function(transformed_data, "id")
elif rule.rule_type == "standardize":
if "邮箱" in rule.name:
transformed_data = rule.function(transformed_data)
elif "日期" in rule.name:
transformed_data = rule.function(transformed_data, "created_at")
elif rule.rule_type == "calculate":
if "年龄" in rule.name:
transformed_data = rule.function(transformed_data, "birth_date")
print(f" ✅ 转换完成: {len(transformed_data)} 条记录")
return transformed_data
# 运行演示
if __name__ == "__main__":
transformer = DataTransformer()
# 示例数据
sample_data = [
{"id": 1, "email": "USER1@EXAMPLE.COM", "name": "User 1", "created_at": "2025-01-01"},
{"id": 2, "email": "user2@example.com", "name": "User 2", "created_at": "2025-01-02"},
{"id": 1, "email": "user1@example.com", "name": "User 1", "created_at": "2025-01-01"}, # 重复
{"id": None, "email": "user3@example.com", "name": "User 3", "created_at": "2025-01-03"} # 空值
]
# 执行转换
transformed = transformer.transform(sample_data)
print(f"\n转换后数据: {transformed}")

数据加载策略

数据加载是ETL的最后一步:

# 示例6:数据加载系统
"""
数据加载系统
包含:
- 全量加载
- 增量加载
- 批量加载
- 实时加载
"""
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class LoadStrategy(Enum):
"""加载策略"""
FULL_LOAD = "全量加载"
INCREMENTAL_LOAD = "增量加载"
UPSERT = "更新插入"
APPEND = "追加"
@dataclass
class LoadConfig:
"""加载配置"""
target_name: str
table_name: str
strategy: LoadStrategy
batch_size: int = 1000
class DataLoader:
"""数据加载器"""
def __init__(self):
"""初始化加载器"""
self.load_history: List[Dict] = []
print("📤 数据加载器启动成功!")
def full_load(self, data: List[Dict], config: LoadConfig) -> bool:
"""全量加载"""
print(f"\n📤 全量加载: {config.target_name}.{config.table_name}")
print(f" 策略: 清空表后插入")
print(f" 数据量: {len(data)} 条记录")
# 模拟加载过程
print(" 步骤1: 清空目标表")
print(" 步骤2: 批量插入数据")
batch_size = config.batch_size
total_batches = (len(data) + batch_size - 1) // batch_size
for i in range(total_batches):
batch = data[i * batch_size:(i + 1) * batch_size]
print(f" 批次 {i+1}/{total_batches}: 插入 {len(batch)} 条记录")
print(f" ✅ 全量加载完成: {len(data)} 条记录")
self.load_history.append({
"strategy": "full_load",
"target": config.target_name,
"records": len(data),
"timestamp": datetime.now()
})
return True
def incremental_load(self, data: List[Dict], config: LoadConfig) -> bool:
"""增量加载"""
print(f"\n📤 增量加载: {config.target_name}.{config.table_name}")
print(f" 策略: 仅插入新记录")
print(f" 数据量: {len(data)} 条记录")
# 模拟增量加载
print(" 步骤1: 检查现有记录")
print(" 步骤2: 过滤新记录")
print(" 步骤3: 插入新记录")
new_records = len(data) # 简化处理
print(f" ✅ 增量加载完成: {new_records} 条新记录")
self.load_history.append({
"strategy": "incremental_load",
"target": config.target_name,
"records": new_records,
"timestamp": datetime.now()
})
return True
def upsert_load(self, data: List[Dict], config: LoadConfig) -> bool:
"""更新插入加载"""
print(f"\n📤 更新插入加载: {config.target_name}.{config.table_name}")
print(f" 策略: 存在则更新,不存在则插入")
print(f" 数据量: {len(data)} 条记录")
# 模拟UPSERT
updated = 0
inserted = 0
for record in data:
# 简化处理,实际需要查询数据库
if record.get('id') in [1, 2]: # 假设这些记录已存在
updated += 1
else:
inserted += 1
print(f" ✅ 更新插入完成: 更新 {updated} 条,插入 {inserted} 条")
return True
# 运行演示
if __name__ == "__main__":
loader = DataLoader()
# 示例数据
sample_data = [
{"id": 1, "name": "User 1", "email": "user1@example.com"},
{"id": 2, "name": "User 2", "email": "user2@example.com"},
{"id": 3, "name": "User 3", "email": "user3@example.com"}
]
# 全量加载
full_config = LoadConfig(
target_name="数据仓库",
table_name="users",
strategy=LoadStrategy.FULL_LOAD
)
loader.full_load(sample_data, full_config)
# 增量加载
incremental_config = LoadConfig(
target_name="数据仓库",
table_name="users",
strategy=LoadStrategy.INCREMENTAL_LOAD
)
loader.incremental_load(sample_data, incremental_config)
# 更新插入
upsert_config = LoadConfig(
target_name="数据仓库",
table_name="users",
strategy=LoadStrategy.UPSERT
)
loader.upsert_load(sample_data, upsert_config)

44.3 工作流调度

欢迎来到我们数据工程工厂的第三站——工作流调度中心!这座现代化的调度中心专门负责管理和调度所有的数据处理任务,就像工厂的生产调度系统,确保任务按照正确的顺序和时间执行。

⏰ Airflow框架使用

Airflow是业界领先的工作流调度工具:

# 示例7:Airflow工作流系统
"""
Airflow工作流系统
包含:
- DAG定义
- 任务定义
- 调度配置
- 监控管理
"""
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Task:
"""任务定义"""
task_id: str
task_type: str
command: str
dependencies: List[str] = None
retries: int = 3
retry_delay: int = 300
@dataclass
class DAG:
"""DAG定义"""
dag_id: str
description: str
schedule_interval: str
start_date: datetime
tasks: List[Task] = None
default_args: Dict = None
class AirflowWorkflowManager:
"""Airflow工作流管理器"""
def __init__(self):
"""初始化工作流管理器"""
self.dags: Dict[str, DAG] = {}
self.task_executions: List[Dict] = []
print("⏰ Airflow工作流管理器启动成功!")
def create_dag(self, dag: DAG):
"""创建DAG"""
self.dags[dag.dag_id] = dag
print(f"\n✅ DAG创建成功: {dag.dag_id}")
print(f" 描述: {dag.description}")
print(f" 调度间隔: {dag.schedule_interval}")
print(f" 任务数: {len(dag.tasks) if dag.tasks else 0}")
def define_etl_dag(self) -> DAG:
"""定义ETL DAG"""
print("\n" + "="*60)
print("📋 定义ETL工作流")
print("="*60)
tasks = [
Task(
task_id="extract_users",
task_type="PythonOperator",
command="extract_user_data()",
dependencies=[]
),
Task(
task_id="extract_orders",
task_type="PythonOperator",
command="extract_order_data()",
dependencies=[]
),
Task(
task_id="transform_users",
task_type="PythonOperator",
command="transform_user_data()",
dependencies=["extract_users"]
),
Task(
task_id="transform_orders",
task_type="PythonOperator",
command="transform_order_data()",
dependencies=["extract_orders"]
),
Task(
task_id="load_to_warehouse",
task_type="PythonOperator",
command="load_to_data_warehouse()",
dependencies=["transform_users", "transform_orders"]
)
]
dag = DAG(
dag_id="etl_pipeline",
description="用户和订单数据ETL流程",
schedule_interval="0 2 * * *", # 每天凌晨2点
start_date=datetime(2025, 1, 1),
tasks=tasks,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True
}
)
return dag
def visualize_dag(self, dag_id: str):
"""可视化DAG"""
if dag_id not in self.dags:
print(f"❌ DAG {dag_id} 不存在")
return
dag = self.dags[dag_id]
print(f"\n📊 DAG可视化: {dag_id}")
print("\n任务依赖关系:")
# 构建依赖图
task_map = {task.task_id: task for task in dag.tasks}
# 找出没有依赖的任务(起始任务)
start_tasks = [task for task in dag.tasks if not task.dependencies]
def print_task_hierarchy(task: Task, level: int = 0):
indent = " " * level
print(f"{indent}├─ {task.task_id} ({task.task_type})")
# 找出依赖此任务的任务
dependent_tasks = [t for t in dag.tasks if task.task_id in (t.dependencies or [])]
for dep_task in dependent_tasks:
print_task_hierarchy(dep_task, level + 1)
for start_task in start_tasks:
print_task_hierarchy(start_task)
def execute_dag(self, dag_id: str, execution_date: datetime = None) -> Dict:
"""执行DAG"""
if dag_id not in self.dags:
print(f"❌ DAG {dag_id} 不存在")
return {}
dag = self.dags[dag_id]
execution_date = execution_date or datetime.now()
print(f"\n🚀 执行DAG: {dag_id}")
print(f" 执行时间: {execution_date}")
execution_result = {
"dag_id": dag_id,
"execution_date": execution_date,
"status": "running",
"tasks": {}
}
# 按依赖顺序执行任务
task_map = {task.task_id: task for task in dag.tasks}
executed = set()
def can_execute(task: Task) -> bool:
if not task.dependencies:
return True
return all(dep in executed for dep in task.dependencies)
while len(executed) < len(dag.tasks):
ready_tasks = [task for task in dag.tasks
if task.task_id not in executed and can_execute(task)]
if not ready_tasks:
print(" ⚠️ 检测到循环依赖或阻塞")
break
for task in ready_tasks:
print(f" ▶ 执行任务: {task.task_id}")
print(f" 命令: {task.command}")
# 模拟任务执行
import random
success = random.random() > 0.1 # 90%成功率
if success:
execution_result["tasks"][task.task_id] = {
"status": "success",
"duration": random.uniform(10, 60)
}
print(f" ✅ 任务完成")
else:
execution_result["tasks"][task.task_id] = {
"status": "failed",
"duration": random.uniform(5, 30)
}
print(f" ❌ 任务失败")
executed.add(task.task_id)
execution_result["status"] = "success" if all(
t["status"] == "success" for t in execution_result["tasks"].values()
) else "failed"
self.task_executions.append(execution_result)
print(f"\n 📊 执行结果: {execution_result['status']}")
print(f" 完成任务: {sum(1 for t in execution_result['tasks'].values() if t['status'] == 'success')}")
print(f" 失败任务: {sum(1 for t in execution_result['tasks'].values() if t['status'] == 'failed')}")
return execution_result
# Airflow DAG定义示例(Python代码)
airflow_dag_example = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'etl_pipeline',
default_args=default_args,
description='用户和订单数据ETL流程',
schedule_interval='0 2 * * *', # 每天凌晨2点
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['etl', 'data-warehouse']
)
# 数据抽取任务
extract_users = PythonOperator(
task_id='extract_users',
python_callable=extract_user_data,
dag=dag
)
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=extract_order_data,
dag=dag
)
# 数据转换任务
transform_users = PythonOperator(
task_id='transform_users',
python_callable=transform_user_data,
dag=dag
)
transform_orders = PythonOperator(
task_id='transform_orders',
python_callable=transform_order_data,
dag=dag
)
# 数据加载任务
load_to_warehouse = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_data_warehouse,
dag=dag
)
# 定义任务依赖
[extract_users, extract_orders] >> [transform_users, transform_orders] >> load_to_warehouse
'''
# 运行演示
if __name__ == "__main__":
manager = AirflowWorkflowManager()
# 定义ETL DAG
etl_dag = manager.define_etl_dag()
manager.create_dag(etl_dag)
# 可视化DAG
manager.visualize_dag("etl_pipeline")
# 执行DAG
manager.execute_dag("etl_pipeline")

任务依赖管理

任务依赖是工作流的核心:

# 示例8:任务依赖管理系统
"""
任务依赖管理
包含:
- 依赖关系定义
- 并行执行控制
- 优先级管理
- 资源分配
"""
from typing import Dict, List, Set
from dataclasses import dataclass
from enum import Enum
class TaskPriority(Enum):
"""任务优先级"""
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class TaskDependency:
"""任务依赖"""
task_id: str
depends_on: List[str]
priority: TaskPriority = TaskPriority.MEDIUM
max_parallel: int = 1
class DependencyManager:
"""依赖管理器"""
def __init__(self):
"""初始化依赖管理器"""
self.task_dependencies: Dict[str, TaskDependency] = {}
print("🔗 依赖管理器启动成功!")
def add_task_dependency(self, dependency: TaskDependency):
"""添加任务依赖"""
self.task_dependencies[dependency.task_id] = dependency
print(f"✅ 任务依赖已添加: {dependency.task_id}")
if dependency.depends_on:
print(f" 依赖: {', '.join(dependency.depends_on)}")
def validate_dependencies(self) -> bool:
"""验证依赖关系"""
print("\n🔍 验证依赖关系...")
# 检查循环依赖
def has_cycle(task_id: str, visited: Set[str], rec_stack: Set[str]) -> bool:
visited.add(task_id)
rec_stack.add(task_id)
if task_id in self.task_dependencies:
for dep in self.task_dependencies[task_id].depends_on:
if dep not in visited:
if has_cycle(dep, visited, rec_stack):
return True
elif dep in rec_stack:
return True
rec_stack.remove(task_id)
return False
visited = set()
for task_id in self.task_dependencies:
if task_id not in visited:
if has_cycle(task_id, visited, set()):
print(" ❌ 检测到循环依赖")
return False
print(" ✅ 依赖关系验证通过")
return True
def get_execution_order(self) -> List[List[str]]:
"""获取执行顺序(拓扑排序)"""
print("\n📋 计算执行顺序...")
# 构建依赖图
in_degree = {task_id: 0 for task_id in self.task_dependencies}
graph = {task_id: [] for task_id in self.task_dependencies}
for task_id, dep in self.task_dependencies.items():
for dep_task in dep.depends_on:
if dep_task in self.task_dependencies:
graph[dep_task].append(task_id)
in_degree[task_id] += 1
# 拓扑排序
execution_order = []
queue = [task_id for task_id, degree in in_degree.items() if degree == 0]
while queue:
# 按优先级排序
queue.sort(key=lambda x: self.task_dependencies[x].priority.value, reverse=True)
current_level = []
next_queue = []
for task_id in queue:
current_level.append(task_id)
for next_task in graph[task_id]:
in_degree[next_task] -= 1
if in_degree[next_task] == 0:
next_queue.append(next_task)
execution_order.append(current_level)
queue = next_queue
print("📊 执行顺序:")
for i, level in enumerate(execution_order, 1):
print(f" 阶段 {i}: {', '.join(level)}")
return execution_order
# 运行演示
if __name__ == "__main__":
manager = DependencyManager()
# 添加任务依赖
manager.add_task_dependency(TaskDependency(
task_id="extract_users",
depends_on=[],
priority=TaskPriority.HIGH
))
manager.add_task_dependency(TaskDependency(
task_id="extract_orders",
depends_on=[],
priority=TaskPriority.HIGH
))
manager.add_task_dependency(TaskDependency(
task_id="transform_users",
depends_on=["extract_users"],
priority=TaskPriority.MEDIUM
))
manager.add_task_dependency(TaskDependency(
task_id="transform_orders",
depends_on=["extract_orders"],
priority=TaskPriority.MEDIUM
))
manager.add_task_dependency(TaskDependency(
task_id="load_to_warehouse",
depends_on=["transform_users", "transform_orders"],
priority=TaskPriority.CRITICAL
))
# 验证依赖
manager.validate_dependencies()
# 获取执行顺序
manager.get_execution_order()

错误处理机制

错误处理确保工作流的稳定性:

# 示例9:错误处理系统
"""
错误处理系统
包含:
- 重试策略
- 告警通知
- 失败处理
- 数据回滚
"""
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
class RetryStrategy(Enum):
"""重试策略"""
FIXED = "固定间隔"
EXPONENTIAL = "指数退避"
LINEAR = "线性增长"
@dataclass
class ErrorHandler:
"""错误处理器"""
task_id: str
max_retries: int = 3
retry_strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
retry_delay: int = 60
alert_on_failure: bool = True
rollback_on_failure: bool = False
class ErrorHandlingManager:
"""错误处理管理器"""
def __init__(self):
"""初始化错误处理管理器"""
self.error_handlers: Dict[str, ErrorHandler] = {}
self.failure_history: List[Dict] = []
print("🛡️ 错误处理管理器启动成功!")
def register_error_handler(self, handler: ErrorHandler):
"""注册错误处理器"""
self.error_handlers[handler.task_id] = handler
print(f"✅ 错误处理器已注册: {handler.task_id}")
def handle_task_failure(self, task_id: str, error: Exception,
attempt: int) -> Dict:
"""处理任务失败"""
print(f"\n❌ 任务失败: {task_id} (尝试 {attempt})")
print(f" 错误: {error}")
handler = self.error_handlers.get(task_id)
if not handler:
print(" ⚠️ 未找到错误处理器")
return {"action": "fail", "retry": False}
# 检查是否应该重试
if attempt < handler.max_retries:
# 计算重试延迟
if handler.retry_strategy == RetryStrategy.FIXED:
delay = handler.retry_delay
elif handler.retry_strategy == RetryStrategy.EXPONENTIAL:
delay = handler.retry_delay * (2 ** (attempt - 1))
else: # LINEAR
delay = handler.retry_delay * attempt
print(f" 🔄 将在 {delay} 秒后重试")
return {
"action": "retry",
"retry": True,
"delay": delay,
"next_attempt": attempt + 1
}
else:
# 达到最大重试次数
print(f" ⛔ 达到最大重试次数 ({handler.max_retries})")
# 发送告警
if handler.alert_on_failure:
self.send_alert(task_id, error)
# 执行回滚
if handler.rollback_on_failure:
self.rollback_task(task_id)
# 记录失败
self.failure_history.append({
"task_id": task_id,
"error": str(error),
"attempts": attempt,
"timestamp": datetime.now()
})
return {
"action": "fail",
"retry": False,
"alert_sent": handler.alert_on_failure,
"rollback_executed": handler.rollback_on_failure
}
def send_alert(self, task_id: str, error: Exception):
"""发送告警"""
print(f" 📧 发送告警: {task_id}")
print(f" 告警内容: 任务 {task_id} 执行失败")
print(f" 错误信息: {error}")
# 实际实现中会发送邮件、短信、钉钉等通知
def rollback_task(self, task_id: str):
"""回滚任务"""
print(f" ↶ 执行回滚: {task_id}")
print(f" 步骤1: 清理已处理的数据")
print(f" 步骤2: 恢复原始状态")
print(f" 步骤3: 记录回滚日志")
# 运行演示
if __name__ == "__main__":
manager = ErrorHandlingManager()
# 注册错误处理器
handler = ErrorHandler(
task_id="load_to_warehouse",
max_retries=3,
retry_strategy=RetryStrategy.EXPONENTIAL,
retry_delay=60,
alert_on_failure=True,
rollback_on_failure=True
)
manager.register_error_handler(handler)
# 模拟任务失败
try:
raise Exception("数据库连接超时")
except Exception as e:
result = manager.handle_task_failure("load_to_warehouse", e, attempt=1)
print(f"\n处理结果: {result}")

44.4 综合项目:数据仓库系统

在本章的最后,我们将综合运用所学的所有技术,构建一个完整的数据仓库系统。这个系统将整合数据架构设计、ETL流程、工作流调度等所有功能。

项目概述

项目名称:企业级数据仓库系统

项目目标

  • 实现维度建模设计
  • 提供增量数据处理能力
  • 构建数据血缘追踪系统
  • 支持完整的数据仓库流程

技术栈

  • 数据仓库(Snowflake/Redshift)
  • ETL工具(Airflow)
  • 元数据管理(自定义)
  • 数据质量(自定义)

项目架构设计

# 示例10:数据仓库系统完整实现
"""
数据仓库系统完整实现
包含:
- 维度建模设计
- 增量数据处理
- 数据血缘追踪
"""
# 维度建模设计
class DimensionalModeling:
"""维度建模"""
def __init__(self):
"""初始化维度建模"""
self.fact_tables = {}
self.dimension_tables = {}
print("📊 维度建模系统启动成功!")
def design_star_schema(self):
"""设计星型模型"""
print("\n" + "="*60)
print("⭐ 星型模型设计")
print("="*60)
# 事实表
fact_table = {
"name": "sales_fact",
"granularity": "订单级别",
"measures": [
{"name": "sales_amount", "type": "sum", "description": "销售金额"},
{"name": "quantity", "type": "sum", "description": "销售数量"},
{"name": "discount_amount", "type": "sum", "description": "折扣金额"}
],
"dimension_keys": [
"date_key",
"product_key",
"customer_key",
"store_key"
]
}
# 维度表
dimension_tables = [
{
"name": "dim_date",
"attributes": [
"date_key", "date", "year", "quarter", "month", "week", "day_of_week"
],
"hierarchies": [
{"name": "时间层次", "levels": ["年", "季度", "月", "日"]}
]
},
{
"name": "dim_product",
"attributes": [
"product_key", "product_id", "product_name", "category", "brand", "price"
],
"hierarchies": [
{"name": "产品层次", "levels": ["类别", "品牌", "产品"]}
]
},
{
"name": "dim_customer",
"attributes": [
"customer_key", "customer_id", "customer_name", "region", "segment"
]
},
{
"name": "dim_store",
"attributes": [
"store_key", "store_id", "store_name", "city", "state", "country"
],
"hierarchies": [
{"name": "地理层次", "levels": ["国家", "州", "城市", "门店"]}
]
}
]
print("📋 事实表:")
print(f" 名称: {fact_table['name']}")
print(f" 粒度: {fact_table['granularity']}")
print(f" 度量: {len(fact_table['measures'])} 个")
print(f" 维度键: {len(fact_table['dimension_keys'])} 个")
print("\n📋 维度表:")
for dim in dimension_tables:
print(f" {dim['name']}: {len(dim['attributes'])} 个属性")
return {
"fact_table": fact_table,
"dimension_tables": dimension_tables
}
def design_incremental_processing(self):
"""设计增量处理"""
print("\n" + "="*60)
print("🔄 增量处理设计")
print("="*60)
incremental_strategy = {
"识别方式": "基于时间戳",
"时间戳字段": "updated_at",
"处理逻辑": [
"1. 查询源表中 updated_at > last_process_time 的记录",
"2. 对于已存在的记录,执行更新操作",
"3. 对于新记录,执行插入操作",
"4. 更新 last_process_time"
],
"历史数据管理": {
"SCD类型": "SCD Type 2",
"说明": "保留历史版本,通过生效时间和失效时间管理"
}
}
print("📋 增量处理策略:")
for key, value in incremental_strategy.items():
if isinstance(value, list):
print(f" {key}:")
for item in value:
print(f" {item}")
elif isinstance(value, dict):
print(f" {key}:")
for k, v in value.items():
print(f" {k}: {v}")
else:
print(f" {key}: {value}")
return incremental_strategy
def design_data_lineage(self):
"""设计数据血缘"""
print("\n" + "="*60)
print("🔗 数据血缘设计")
print("="*60)
lineage = {
"sales_fact": {
"upstream": [
{
"source": "订单数据库.orders",
"type": "table",
"extraction": "增量抽取",
"transformation": ["数据清洗", "数据标准化"]
},
{
"source": "订单数据库.order_items",
"type": "table",
"extraction": "增量抽取",
"transformation": ["数据清洗", "关联订单表"]
}
],
"downstream": [
{
"target": "BI报表-销售分析",
"type": "report",
"usage": "销售趋势分析"
},
{
"target": "数据挖掘-客户分析",
"type": "analysis",
"usage": "客户行为分析"
}
],
"transformations": [
"关联订单和订单项",
"计算销售金额",
"关联维度表",
"数据质量检查"
]
}
}
print("📊 数据血缘关系:")
for asset, info in lineage.items():
print(f"\n{asset}:")
print(" 上游数据源:")
for upstream in info["upstream"]:
print(f" - {upstream['source']} ({upstream['type']})")
print(" 下游数据目标:")
for downstream in info["downstream"]:
print(f" - {downstream['target']} ({downstream['type']})")
print(" 转换过程:")
for transformation in info["transformations"]:
print(f" - {transformation}")
return lineage
# 完整的数据仓库系统
class DataWarehouseSystem:
"""数据仓库系统"""
def __init__(self):
"""初始化数据仓库系统"""
self.modeling = DimensionalModeling()
self.etl_pipeline = None
self.lineage_tracker = None
print("🏢 数据仓库系统启动成功!")
def build_data_warehouse(self):
"""构建数据仓库"""
print("\n" + "="*60)
print("🏗️ 构建数据仓库系统")
print("="*60)
# 1. 维度建模
print("\n步骤1: 维度建模设计")
schema = self.modeling.design_star_schema()
# 2. 增量处理设计
print("\n步骤2: 增量处理设计")
incremental = self.modeling.design_incremental_processing()
# 3. 数据血缘设计
print("\n步骤3: 数据血缘设计")
lineage = self.modeling.design_data_lineage()
# 4. ETL流程设计
print("\n步骤4: ETL流程设计")
print(" - 数据抽取: 从源系统抽取数据")
print(" - 数据转换: 清洗、标准化、计算")
print(" - 数据加载: 加载到数据仓库")
# 5. 工作流调度
print("\n步骤5: 工作流调度")
print(" - 使用Airflow调度ETL任务")
print(" - 配置任务依赖关系")
print(" - 设置错误处理和重试")
print("\n✅ 数据仓库系统构建完成!")
return {
"schema": schema,
"incremental": incremental,
"lineage": lineage
}
# 运行演示
if __name__ == "__main__":
system = DataWarehouseSystem()
system.build_data_warehouse()

💡 代码示例(可运行)

示例1:数据架构设计

# 运行示例1的代码
designer = DataArchitectureDesigner()
sources = designer.identify_data_sources()
flows = designer.design_data_flow()

运行结果:

📊 数据架构设计师启动成功!

============================================================
🔍 数据源识别
============================================================
...

示例2:ETL流程

# 运行示例4-6的代码
extractor = DataExtractor()
transformer = DataTransformer()
loader = DataLoader()

运行结果:

🔄 数据抽取器启动成功!
📥 全量抽取: 用户数据库.users
...

🎯 实践练习

基础练习

练习1:设计数据架构

为一个电商系统设计数据架构。

# 练习代码框架
# 要求:
# 1. 识别数据源
# 2. 设计数据流
# 3. 设计存储架构
# 4. 定义数据质量标准

练习2:实现ETL流程

实现一个简单的ETL流程,从数据库抽取数据,转换后加载到数据仓库。

# 练习代码框架
# 要求:
# 1. 实现数据抽取
# 2. 实现数据转换
# 3. 实现数据加载
# 4. 处理错误情况

中级练习

练习3:使用Airflow调度ETL

使用Airflow创建一个ETL工作流。

# 练习代码框架
# 要求:
# 1. 定义DAG
# 2. 定义任务和依赖
# 3. 配置调度时间
# 4. 实现错误处理

练习4:实现增量数据处理

实现增量数据处理逻辑,只处理新增和变更的数据。

# 练习代码框架
# 要求:
# 1. 识别增量数据
# 2. 处理数据变更
# 3. 合并到目标表
# 4. 管理历史数据

挑战练习

练习5:构建完整的数据仓库系统

综合运用本章所学知识,构建一个完整的数据仓库系统,包括:

  • 维度建模设计
  • ETL流程实现
  • 工作流调度
  • 数据血缘追踪
# 练习代码框架
# 要求:
# 1. 设计星型模型
# 2. 实现完整ETL流程
# 3. 配置Airflow工作流
# 4. 实现数据血缘追踪
# 5. 实现数据质量管理

🤔 本章思考题

1. 概念理解题

  1. 数据架构设计的关键要素是什么?

    • 请分析数据源、数据流、存储架构的设计原则
    • 讨论不同存储方案的适用场景
  2. ETL流程中,增量抽取和全量抽取的优缺点是什么?

    • 对比两种抽取方式的性能和维护成本
    • 讨论在不同场景下的选择策略
  3. 工作流调度中,如何避免任务之间的循环依赖?

    • 解释依赖关系验证的方法
    • 讨论任务执行顺序的优化策略

2. 应用分析题

  1. 如何设计一个高效的数据质量管理系统?

    • 分析数据质量维度的定义
    • 设计质量检测和修复流程
  2. 在数据仓库中,如何实现SCD(缓慢变化维度)?

    • 分析SCD Type 1、Type 2、Type 3的区别
    • 设计SCD Type 2的实现方案
  3. 如何建立完善的数据血缘追踪系统?

    • 设计血缘关系的存储模型
    • 实现血缘查询和影响分析

3. 编程实践题

  1. 实现一个通用的ETL框架

    • 支持多种数据源
    • 支持可配置的转换规则
    • 支持多种加载策略
  2. 构建一个数据质量检测工具

    • 实现质量规则引擎
    • 实现质量报告生成
    • 实现质量问题修复
  3. 开发一个数据血缘可视化系统

    • 实现血缘关系采集
    • 实现血缘关系可视化
    • 实现影响分析功能

📖 拓展阅读

在线资源

  1. Apache Airflow官方文档

  2. 数据仓库设计模式

  3. ETL最佳实践

  4. 数据治理框架

推荐书籍

  1. 《数据仓库工具箱》

    • 作者:Ralph Kimball
    • 深入讲解维度建模和数据仓库设计
  2. 《数据工程手册》

    • 作者:Andreas Kretz
    • 全面了解数据工程实践
  3. 《数据管道手册》

    • 学习数据管道设计和实现

开源项目

  1. Apache Airflow

  2. Great Expectations

  3. DataHub


📋 本章检查清单

在进入下一章之前,请确保你已经:

理论掌握 ✅

  • 理解数据架构设计的原则和方法
  • 掌握数据质量管理的维度和方法
  • 理解元数据管理的重要性
  • 掌握ETL流程的设计和实现
  • 理解工作流调度的原理和方法
  • 了解数据仓库的维度建模方法

实践能力 ✅

  • 能够设计数据架构和数据流
  • 能够实现ETL流程
  • 能够使用Airflow创建工作流
  • 能够实现增量数据处理
  • 能够设计维度模型
  • 能够实现数据血缘追踪

项目经验 ✅

  • 完成数据架构设计项目
  • 实现完整的ETL流程
  • 构建Airflow工作流
  • 完成数据仓库系统项目
  • 实现数据血缘追踪系统

下一章预告:第45章《Apache Spark大数据处理》将介绍Spark核心概念、流式数据处理、机器学习管道,以及如何构建实时推荐系统。