datafusion
DataFrame
DataFrame 是最终结果数据集的一个视图,并不持有数据,也没有执行。当调用一些 action 方法后才会开始真正执行查询。
下面以 collect()
方法为例,过一下整个执行的流程:
pub async fn collect(self) -> Result<Vec<RecordBatch>> {
let task_ctx = Arc::new(self.task_ctx());
let plan = self.create_physical_plan().await?;
collect(plan, task_ctx).await
}
collect
首先会创建 physical plan,然后调用 collect
util 去执行这个 plan,内部就是调了一下 execute_stream
然后消费 stream 聚合数据:
/// Execute the [ExecutionPlan] and return a single stream of results
pub fn execute_stream(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => plan.execute(0, context),
_ => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(plan.clone());
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
plan.execute(0, context)
}
}
}
如果输入是多个 partitions,则 CoalescePartitionExec
会并发在各个 partition 执行查询。因此如果要在多个数据集上同时进行查询,可以考虑把它们拆分到多个 partitions,不过 datafusion 本身的优化器也会在原始数据上进行 repartition 优化,把一个 partition 内的数据拆分到多个 partitions。
ExecutionPlan
这里主要介绍 MemoryExec
和 FilterExec
。
MemoryExec
在 MemTable
上执行查询时, MemoryExec
会作为整个查询计划的叶子节点:
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(if self.index < self.data.len() {
self.index += 1;
let batch = &self.data[self.index - 1];
// return just the columns requested
let batch = match self.projection.as_ref() {
Some(columns) => batch.project(columns)?,
None => batch.clone(),
};
Some(Ok(batch))
} else {
None
})
}
project 可能会被下推到这一层,整个逻辑也很简单,把 projection 选中的列对应 Array 选出来,然后再构造一个只包含这些列的 batch。
FilterExec
FilterExec
对输入侧的数据进行过滤:
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll;
loop {
match self.input.poll_next_unpin(cx) {
Poll::Ready(value) => match value {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = batch_filter(&batch, &self.predicate)?;
// skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
timer.done();
poll = Poll::Ready(Some(Ok(filtered_batch)));
break;
}
_ => {
poll = Poll::Ready(value);
break;
}
},
Poll::Pending => {
poll = Poll::Pending;
break;
}
}
}
self.baseline_metrics.record_poll(poll)
}
过滤的逻辑大致就是对 batch 里的数据做一下迭代,生成一个 bitmap,然后用这个 bitmap 来生成一个 FilterPredicate
,FilterPredicate
会有4种策略:
- All:选中所有数据
- None:滤除所有数据
- Slice:生成若干个区间,选中这些区间覆盖的数据
- Index:生成若干个下标,选中这些下标上的数据
当选中率大于 0.8 时会使用 slice 策略,否则使用 index 策略
过滤之后会生成一个新的 batch,这里会把选中的数据复制一份。