datafusion

DataFrame

DataFrame 是最终结果数据集的一个视图,并不持有数据,也没有执行。当调用一些 action 方法后才会开始真正执行查询。

下面以 collect() 方法为例,过一下整个执行的流程:

    pub async fn collect(self) -> Result<Vec<RecordBatch>> {
        let task_ctx = Arc::new(self.task_ctx());
        let plan = self.create_physical_plan().await?;
        collect(plan, task_ctx).await
    }

collect 首先会创建 physical plan,然后调用 collect util 去执行这个 plan,内部就是调了一下 execute_stream 然后消费 stream 聚合数据:

/// Execute the [ExecutionPlan] and return a single stream of results
pub fn execute_stream(
    plan: Arc<dyn ExecutionPlan>,
    context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
    match plan.output_partitioning().partition_count() {
        0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
        1 => plan.execute(0, context),
        _ => {
            // merge into a single partition
            let plan = CoalescePartitionsExec::new(plan.clone());
            // CoalescePartitionsExec must produce a single partition
            assert_eq!(1, plan.output_partitioning().partition_count());
            plan.execute(0, context)
        }
    }
}

如果输入是多个 partitions,则 CoalescePartitionExec 会并发在各个 partition 执行查询。因此如果要在多个数据集上同时进行查询,可以考虑把它们拆分到多个 partitions,不过 datafusion 本身的优化器也会在原始数据上进行 repartition 优化,把一个 partition 内的数据拆分到多个 partitions。

ExecutionPlan

这里主要介绍 MemoryExecFilterExec

MemoryExec

MemTable 上执行查询时, MemoryExec 会作为整个查询计划的叶子节点:

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        Poll::Ready(if self.index < self.data.len() {
            self.index += 1;
            let batch = &self.data[self.index - 1];

            // return just the columns requested
            let batch = match self.projection.as_ref() {
                Some(columns) => batch.project(columns)?,
                None => batch.clone(),
            };

            Some(Ok(batch))
        } else {
            None
        })
    }

project 可能会被下推到这一层,整个逻辑也很简单,把 projection 选中的列对应 Array 选出来,然后再构造一个只包含这些列的 batch。

FilterExec

FilterExec 对输入侧的数据进行过滤:

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let poll;
        loop {
            match self.input.poll_next_unpin(cx) {
                Poll::Ready(value) => match value {
                    Some(Ok(batch)) => {
                        let timer = self.baseline_metrics.elapsed_compute().timer();
                        let filtered_batch = batch_filter(&batch, &self.predicate)?;
                        // skip entirely filtered batches
                        if filtered_batch.num_rows() == 0 {
                            continue;
                        }
                        timer.done();
                        poll = Poll::Ready(Some(Ok(filtered_batch)));
                        break;
                    }
                    _ => {
                        poll = Poll::Ready(value);
                        break;
                    }
                },
                Poll::Pending => {
                    poll = Poll::Pending;
                    break;
                }
            }
        }
        self.baseline_metrics.record_poll(poll)
    }

过滤的逻辑大致就是对 batch 里的数据做一下迭代,生成一个 bitmap,然后用这个 bitmap 来生成一个 FilterPredicateFilterPredicate 会有4种策略:

  • All:选中所有数据
  • None:滤除所有数据
  • Slice:生成若干个区间,选中这些区间覆盖的数据
  • Index:生成若干个下标,选中这些下标上的数据
    当选中率大于 0.8 时会使用 slice 策略,否则使用 index 策略

过滤之后会生成一个新的 batch,这里会把选中的数据复制一份。