Lab 2 图查询处理完整教程¶
概览¶
本教程将带你完整理解 miniGU 图数据库的查询处理流程,从查询语句到最终执行结果的全链路实现。
Lab 2 分为三个递进的实验:
- Lab 2-1:Expand/Project 执行器,实现图遍历和投影计算
- Lab 2-2:Filter 逻辑计划,在逻辑计划中添加 Filter 节点
- Lab 2-3:谓词下推优化,实现查询优化器
学习目标:
- 理解查询处理的完整流程
- 掌握逻辑计划与物理计划的区别
- 实现核心执行器算子
- 学习查询优化技术
1. 查询处理全流程¶
让我们通过一个完整的例子理解查询处理流程:
完整流程:
词法分析 (Lexer)¶
- 输入: "MATCH (n:Person) WHERE n.age > 18 RETURN n.name"
- 输出: [MATCH, LPAREN, IDENT("n"), COLON, ...]
语法分析 (Parser)¶
- 输入: Token 流
- 输出: 抽象语法树 (AST)
语义绑定 (Binder)¶
- 输入: AST
- 输出: 绑定后的语法树 (Bound AST)
- 变量解析: n → Variable(0)
- 类型推导: n.age → Int64
- Schema 构建: [name: String]
逻辑计划生成 (Logical Planner)¶
- 输入: Bound AST
- 输出: 逻辑计划树
查询优化 (Optimizer)¶
- 输入: 逻辑计划
- 输出: 优化后的物理计划
- 优化技术:
- 谓词下推
- 投影下推
- Join 重排序
执行器构建 (Executor Builder)¶
- 输入: 物理计划
- 输出: 执行器树
执行引擎 (Execution Engine)¶
- 输入: 执行器树
- 输出: 结果集 (DataChunk 流)
执行过程 (Pull-based): - 1. ProjectExecutor.next() → 调用子执行器 - 2. FilterExecutor.next() → 调用子执行器 - 3. NodeScanExecutor.next() → 从存储引擎读取数据 → 返回 DataChunk - 4. 应用过滤条件 → FilterExecutor 过滤 → 返回过滤后的 DataChunk - 5. 计算投影表达式 → ProjectExecutor 投影 → 返回最终结果
结果返回¶
- 输出: 格式化的结果
2. 核心概念详解¶
2.1 火山模型 (Volcano Model)¶
miniGU 采用经典的火山模型 (也称迭代器模型):
trait Executor: Iterator<Item = Result<DataChunk>> {
fn next(&mut self) -> Option<Result<DataChunk>>;
}
特点:
- 拉取式 (Pull-based): 父节点主动向子节点请求数据
- 流式处理: 数据以批次 (DataChunk) 流动
- 内存友好: 不需要一次性加载所有数据
- 易于组合: 执行器可以任意组合成树
2.2 DataChunk: 列式批处理¶
DataChunk 是 miniGU 的数据传输单元,基于 Apache Arrow:
pub struct DataChunk {
columns: Vec<ArrayRef>, // 列数组 (Arrow Array)
filter: Option<BooleanArray>, // 过滤位图 (延迟应用)
}
列式存储优势:
- 缓存友好: 连续内存访问
- 向量化计算: 便于使用 SIMD 加速
- 压缩高效: 同类型数据压缩率高
- 零拷贝: 共享内存,避免复制
2.3 表达式求值¶
表达式求值器负责计算各种表达式:
表达式类型:
- 列引用:
n.name→ 从 chunk 中提取列 - 字面量:
42,"hello"→ 常量 - 二元运算:
a + b,a > b→ 向量化计算 - 函数调用:
length(s),abs(x)→ 内置函数
3. Lab 2-1: Expand 与 Project 执行器实现¶
3.1 任务目标¶
实现两个核心执行器:
- Expand: 图遍历,从顶点扩展到邻居
- Project: 投影计算,计算 RETURN 表达式
3.2 Expand 执行器¶
功能: 实现 MATCH (a)-[r]->(b) 中的边遍历
输入: DataChunk 包含起始顶点 ID
输出: DataChunk 包含 [原始列 + 边列 + 邻居列]
实现位置:
- 文件:
execution/src/executor/expand.rs - 函数:
ExpandBuilder::into_executor - 行数: 91-123
核心逻辑:
// 1. compact chunk
chunk = chunk.compact();
// 2. 提取顶点 ID
let vertex_ids = chunk.columns().get(input_column_index)?
.as_primitive::<VertexIdType>().values();
// 3. 扩展每个顶点
for &vid in vertex_ids.iter() {
let (targets, edges) = source.expand_from_vertex(
vid, edge_labels, target_labels
)?;
// 累积结果...
}
// 4. 构建 ListArray 并追加列
chunk = chunk.append_columns(vec![edge_list, target_list]);
yield Ok(chunk);
3.3 Project 执行器¶
功能: 实现 RETURN n.name, n.age + 1 中的表达式计算
输入: DataChunk
输出: DataChunk (计算后的列)
实现位置:
- 文件: execution/src/executor/project.rs
- 函数: ProjectBuilder::into_executor
- 行数: 56-94
核心逻辑:
// 1. 计算新列
let mut new_columns = Vec::new();
for evaluator in evaluators.iter() {
let data_value = evaluator.evaluate(&chunk)?;
new_columns.push(data_value.into_array());
}
// 2. 创建新 chunk
let mut new_chunk = DataChunk::new(new_columns);
// 3. 保留 filter
if let Some(filter) = chunk.filter() {
new_chunk = new_chunk.with_filter(filter.clone());
}
yield Ok(new_chunk);
3.4 测试验证¶
cargo test -p minigu-execution test_expand_executor
cargo test -p minigu-execution test_project_executor
详细文档: Lab 2-1 详细说明
4. Lab 2-2: Filter 逻辑计划实现¶
4.1 任务目标¶
在 MATCH 语句包含 WHERE 子句时,生成 Filter 逻辑计划节点。
输入:
输出 (逻辑计划):
4.2 实现位置¶
文件: planner/src/logical_planner/query.rs
函数: plan_match_statement
行数: 93-127
4.3 实现要点¶
// 1. 检查是否有 WHERE 条件
if let Some(predicate) = binding.pattern.predicate {
// 2. 创建 Filter 节点包装当前 plan
let filter = Filter::new(plan, predicate);
// 3. 更新 plan
plan = PlanNode::LogicalFilter(Arc::new(filter));
}
4.4 测试验证¶
详细文档: Lab 2-2 详细说明
5. Lab 2-3: 谓词下推优化¶
5.1 任务目标¶
实现谓词下推优化,将 Filter 条件推到数据源附近。
优化前:
优化后:
性能提升: 从 O(n) 全表扫描优化为 O(1) 索引查找
5.2 实现位置¶
文件: planner/src/optimizer/mod.rs
函数: create_physical_plan_impl
分支: PlanNode::LogicalFilter
行数: 149-206
5.3 实现步骤¶
// 1. 检查子节点是否为 NodeScan
if let PlanNode::PhysicalNodeScan(node_scan) = &child {
// 2. 分析 predicate 是否为 ID 等值条件
if let Some(id) = try_extract_id_predicate(&filter.predicate, &node_scan.var) {
// 3. 创建优化节点
let scan_by_id = NodeScanById::new(
node_scan.var.clone(),
node_scan.labels.clone(),
id,
);
return Ok(PlanNode::PhysicalNodeScanById(Arc::new(scan_by_id)));
}
}
// 4. 默认: 创建 PhysicalFilter (不优化)
let filter = Filter::new(child, filter.predicate.clone());
Ok(PlanNode::PhysicalFilter(Arc::new(filter)))
5.4 辅助函数¶
需要实现 try_extract_id_predicate 函数:
fn try_extract_id_predicate(
predicate: &BoundExpression,
var: &str,
) -> Option<VertexId> {
// 检查是否为 n.id = 1 或 1 = n.id
if let BoundExpression::BinaryOp { op, left, right } = predicate {
if *op == BinaryOperator::Eq {
// 检查 n.id = 1
if matches_property(left, var, "id") {
return extract_literal(right);
}
// 检查 1 = n.id
if matches_property(right, var, "id") {
return extract_literal(left);
}
}
}
None
}
5.5 测试验证¶
cargo test -p minigu-planner test_predicate_pushdown
# 手动验证
minigu> EXPLAIN PHYSICAL MATCH (n) WHERE n.id = 1 RETURN n;
# 应该看到 PhysicalNodeScanById
详细文档: Lab 2-3 详细说明
6. 完整示例: 端到端查询¶
让我们通过一个完整的例子, 串联所有知识点:
6.1 查询语句¶
MATCH (a:Person WHERE a.id = 1)-[r:KNOWS]->(b:Person)
WHERE b.age > 18
RETURN a.name, b.name, r.since
6.2 逻辑计划 (Lab 2-2)¶
LogicalProject(a.name, b.name, r.since)
↓
LogicalFilter(b.age > 18)
↓
LogicalExpand(r:KNOWS, b:Person)
↓
LogicalFilter(a.id = 1)
↓
LogicalMatch(a:Person)
6.3 物理计划 (Lab 2-3 优化后)¶
PhysicalProject(a.name, b.name, r.since)
↓
PhysicalFilter(b.age > 18)
↓
PhysicalExpand(r:KNOWS, b:Person)
↓
PhysicalNodeScanById(a, id=1) ← 优化: 直接定位
6.4 执行过程 (Lab 2-1)¶
1. NodeScanById 执行:
→ 从存储引擎查找 id=1 的顶点
→ 返回 DataChunk: [a: {id:1, name:"Alice"}]
2. Expand 执行:
→ 调用 source.expand_from_vertex(1, [KNOWS], [Person])
→ 返回 DataChunk: [a, r, b]
[1, 101, 2] // Alice KNOWS Bob
[1, 102, 3] // Alice KNOWS Carol
3. Filter 执行:
→ 计算 b.age > 18
→ 应用过滤位图
→ 返回 DataChunk (过滤后)
4. Project 执行:
→ 计算 a.name, b.name, r.since
→ 返回最终结果:
["Alice", "Bob", "2020-01-01"]
["Alice", "Carol", "2021-06-15"]
6.5 性能分析¶
优化前: - NodeScan: 扫描 10000 个顶点 - Filter: 过滤出 1 个顶点 - Expand: 扩展 1 个顶点的邻居 - 总扫描: 10000 个顶点
优化后: - NodeScanById: 直接定位 1 个顶点 - Expand: 扩展 1 个顶点的邻居 - 总扫描: 1 个顶点
7. 调试技巧¶
查看计划树:
添加日志进行调试:
// 在代码中添加调试输出
println!("Plan: {:#?}", plan);
println!("Chunk: rows={}, cols={}", chunk.num_rows(), chunk.num_columns());
单元测试:
# 运行特定测试
cargo test -p minigu-planner test_plan_match_with_filter -- --nocapture
# 显示输出
cargo test -- --nocapture --test-threads=1
8. 常见问题FAQ¶
Q1: 为什么使用 Arc?¶
Arc<T> 提供共享所有权,允许多个执行器共享同一个子节点,避免深拷贝。
Q2: gen move 是什么?¶
Rust 的 generator 语法,用于实现协程式迭代器。yield 暂停执行并返回值。
Q3: 为什么 Filter 使用位图而不是直接删除行?¶
延迟求值 (lazy evaluation)。位图避免数据复制,只在必要时 (compact) 才真正删除。
Q4: 如何处理 NULL 值?¶
Arrow 数组内置 NULL 支持,通过 validity bitmap 标记。
Q5: 如何优化内存使用?¶
- 使用 Arrow 的零拷贝机制
- 及时 compact 释放过滤行
- 控制 DataChunk 批次大小
- 使用流式处理避免全量加载