本文将从一个简单的 SQL 来分析查询的整个流程。

# 服务启动

Impala 启动时需要启动 3 个服务,impalad、catalogd 和 statestored,在上篇 SQL 查询查询流程中我们知道,客户端查询的 SQL 会提交给 impalad。
impalad 启动时会为客户端启动几个端口。

be/src/service/impalad-main.cc
std::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
  Status status = impala_server->Start(FLAGS_beeswax_port, FLAGS_hs2_port,
      FLAGS_hs2_http_port, FLAGS_external_fe_port);

# 查询流程

这里以 hs2 接口为例,仅粘贴关键代码,省略部分注释作了简要说明。

首先,server 接到客户端查询请求,开始执行请求。

be/src/service/impala-hs2-server.cc
void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
    const TExecuteStatementReq& request) {
  // log
  ExecuteStatementCommon(return_val, request);
}
void ImpalaServer::ExecutePlannedStatement(
      TExecuteStatementResp& return_val,
      const TExecutePlannedStatementReq& request) {
  // log and check
  ExecuteStatementCommon(return_val, request.statementReq, &request.plan);
}
void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val,
    const TExecuteStatementReq& request, const TExecRequest* external_exec_request) {
  // log and init context/session....
  status = Execute(&query_ctx, session, &query_handle, external_exec_request);
  // handle result
}

所有的接口协议最终都会执行到 impala-server,这里 RunFrontendPlanner 调用 fe 生成执行计划,然后开始执行,Exec 是 c++ 部分。

be/src/service/impala-server.cc
Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> session_state,
    QueryHandle* query_handle,
    const TExecRequest* external_exec_request) {
  // log and metrics
  Status status = ExecuteInternal(*query_ctx, external_exec_request, session_state,
      &registered_query, query_handle);
  // handle result
  return status;
}
Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
    const TExecRequest* external_exec_request, shared_ptr<SessionState> session_state,
    bool* registered_query, QueryHandle* query_handle) {
  // check and init request
  RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx));
  // handle result
  RETURN_IF_ERROR((*query_handle)->Exec());
}

# 生成查询计划

后面可以看到,server 通过 JNI 调用 Java 执行请求。

be/src/runtime/query-driver.cc
Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) {
  // check
  RETURN_IF_ERROR(client_request_state_->UpdateQueryStatus(
      ExecEnv::GetInstance()->frontend()->GetExecRequest(
          query_ctx, exec_request_.get())));
  return Status::OK();
}

这里封装了一层调 Java 接口的方法。

be/src/service/frontend.cc
Status Frontend::GetExecRequest(
    const TQueryCtx& query_ctx, TExecRequest* result) {
  return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result);
}

接下来就是 Java 部分了。

fe/src/main/java/org/apache/impala/service/JniFrontend.java
public byte[] createExecRequest(byte[] thriftQueryContext)
    throws ImpalaException {
  // check and init planCtx
  TExecRequest result = frontend_.createExecRequest(planCtx);
  // log and handle result
}

下面是 Planner 开始工作,分别经历 parse、analyze、rewrite、plan 阶段。

fe/src/main/java/org/apache/impala/service/Frontend.java
/**
 * 这个方法主要就是向 executor group 提交执行
 */
public TExecRequest createExecRequest(PlanCtx planCtx)
    throws ImpalaException {
  // timing
  TExecRequest result = getTExecRequest(planCtx, timeline);
  // hadndle result
}
private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline)
    throws ImpalaException {
  // lookup for executor groups
  for (int i = 0; i < num_executor_group_sets; i++) {
    req = doCreateExecRequest(planCtx, timeline);
  }
  // return req
}
private TExecRequest doCreateExecRequest(PlanCtx planCtx,
    EventSequence timeline) throws ImpalaException {
  TQueryCtx queryCtx = planCtx.getQueryContext();
  // 解析 SQL 语句,具体实现见下篇博客
  StatementBase stmt = Parser.parse(
      queryCtx.client_request.stmt, queryCtx.client_request.query_options);
  // ...
  // 具体实现在后面分析,就是分析了整个 SQL,也会进行重写
  AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(stmt, stmtTableCache,
      authzChecker_.get(), planCtx.compilationState_.disableAuthorization());
  
  // ...
  // 先创建一个执行请求,就是根据分析结果创建了一个 request 对象
  TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
  try {
    // DDL
    if (analysisResult.isCatalogOp()) {
      result.stmt_type = TStmtType.DDL;
      // 向 catalogd 发起 ddl 操作
      createCatalogOpRequest(analysisResult, result);
      // ...
    }
    // ...
    //plan 阶段,构造执行计划,后面也会分析
    TQueryExecRequest queryExecRequest =
        getPlannedExecRequest(planCtx, analysisResult, timeline);
    // ...
    // explain
    if (analysisResult.isExplainStmt()) {
      createExplainRequest(planCtx.getExplainString(), result);
      return result;
    }
    // ...
    return result;
  } catch (Exception e) {
    // handle exception
  }
}

# parse

impala 并没有实现自己的 parser,而是借助 cup 和 jflex 实现的。这部分后面博客会讲。

# analyze

这部分主要是分析整个 SQL,为后续生成执行计划做铺垫。

fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
public AnalysisResult analyzeAndAuthorize(StatementBase stmt,
    StmtTableCache stmtTableCache, AuthorizationChecker authzChecker,
    boolean disableAuthorization) throws ImpalaException {
  // ...
  // 分析
  analyze(stmtTableCache, authzCtx);
  // ...
  return analysisResult_;
}
private void analyze(StmtTableCache stmtTableCache, AuthorizationContext authzCtx)
    throws AnalysisException {
  // ...
  // 为当前语句创建一个 Analyzer
  analysisResult_.analyzer_ = createAnalyzer(stmtTableCache, authzCtx);
  // 分析语句,我的查询语句是 select * from test,对应的 stmt 是 SelectStmt,下面来看下它的 analyze 方法
  analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
  
  // 下面是 rewrite,会进行重写,重写完要重新分析
  ExprRewriter rewriter = analysisResult_.analyzer_.getExprRewriter();
  // ...
  analysisResult_.stmt_.rewriteExprs(rewriter);``
  // ...
  // 重新分析
  reAnalyze(stmtTableCache, authzCtx, origResultTypes, origColLabels,
      /*collectPrivileges*/ false);
  // ...
}

这里就到了具体的语句,SelectStmt 是具体的语句,它继承自 QueryStmt,QueryStmt 继承自 StatementBase,StatementBase 是所有语句的父类,然而它又继承自 StmtNode,StmtNode 继承自 ParseNode。
类关系图如图:
edA9jF

首先是最上层 SelectStmt。

fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
  //analyzer_ 初始为空
  if (isAnalyzed()) return;
  // 进入父类 QueryStmt 的方法,这里就是分析了下查询语句公共的 limit 和 with
  super.analyze(analyzer);
  // 开始 Select 语句分析
  new SelectAnalyzer(analyzer).analyze();
  // 针对 iceberg 表优化 count (*)
  this.optimizePlainCountStarQuery();
}
private class SelectAnalyzer {
  private void analyze() throws AnalysisException {
    // 分析 from 语句
    fromClause_.analyze(analyzer_);
    // 注册所有的结构化 SlotRef,不包括普通字段
    registerStructSlotRefPathsWithAnalyzer();
    // 分析 select 语句,收集所有的 Expr
    analyzeSelectClause();
    // 校验合法性
    verifyResultExprs();
    registerViewColumnPrivileges();
    // 分析 where 语句
    analyzeWhereClause();
    // sort
    createSortInfo(analyzer_);
    // ...
    // 创建分析函数信息
    createAnalyticInfo();
    // 创建排序相关信息
    if (evaluateOrderBy_) createSortTupleInfo(analyzer_);
    // ...
  }
}

然后是所有查询类的抽象类。

fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
  //analyzer_ 初始为空
  if (isAnalyzed()) return;
  // 进入 StatementBase 的方法
  super.analyze(analyzer);
  // 分析 limit 和 with 语句,可见这俩包装在所有查询语句之外,是所有查询语句共有的
  analyzeLimit(analyzer);
  if (hasWithClause()) withClause_.analyze(analyzer);
}
private void analyzeLimit(Analyzer analyzer) throws AnalysisException {
  // 限制 limit 语句必须有 order by,不然这里查询结果其实每次都会不一致,也没有意义
  if (limitElement_.getOffsetExpr() != null && !hasOrderByClause()) {
    throw new AnalysisException("OFFSET requires an ORDER BY clause: " +
        limitElement_.toSql().trim());
  }
  // 分析 limit,这里不再往下看了
  limitElement_.analyze(analyzer);
}

接下来是所有 SQL 语句的抽象类。

fe/src/main/java/org/apache/impala/analysis/StatementBase.java
/**
 * 基础类中其实没有做分析逻辑,只是设置了下 analyzer。
 */
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
  //analyzer_ 初始为空
  if (isAnalyzed()) return;
  //explain 语句
  if (isExplain_) analyzer.setIsExplain();
  // 设置 analyzer
  analyzer_ = analyzer;
}

SelectStmt 中首先分析 from 语句。FromClause 继承 StmtNode,实现 Iterable<TableRef>,其实就是一个 TableRef 列表。From 语句的分析,其实就是 TableRef 的分析。

fe/src/main/java/org/apache/impala/analysis/FromClause.java
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
  // ...
  // 遍历所有的 tableRef select * from test 语句,只有一个 tableRef
  for (int i = 0; i < tableRefs_.size(); ++i) {
    TableRef tblRef = tableRefs_.get(i);
    // 解析 TableRef,结果为 BaseTableRef
    tblRef = analyzer.resolveTableRef(tblRef);
    tableRefs_.set(i, Preconditions.checkNotNull(tblRef));
    tblRef.setLeftTblRef(leftTblRef);
    // 这里会注册 join 等
    tblRef.analyze(analyzer);
    // ...
  }
  // ...

解析 TableRef。

fe/src/main/java/org/apache/impala/analysis/Analyzer.java
public TableRef resolveTableRef(TableRef tableRef)
    throws AnalysisException {
  // ...
  // 这个 local view 应该是 with 语句中的查询
  // 获取 rawPath,表示数据表,如果是 db.tb,那么 size 为 2,元素分别是 db 和 tb,如果没有 db 前缀,size 为 1
  if (tableRef.getPath().size() == 1) {
    String viewAlias = tableRef.getPath().get(0).toLowerCase();
    Analyzer analyzer = this;
    do {
      FeView localView = analyzer.localViews_.get(viewAlias);
      if (localView != null) return new InlineViewRef(localView, tableRef);
      analyzer = (analyzer.ancestors_.isEmpty() ? null : analyzer.ancestors_.get(0));
    } while (analyzer != null);
  }
  // 解析表名,会调用 catalog 获取 table
  List<String> rawPath = tableRef.getPath();
  // ...
  resolvedPath = resolvePathWithMasking(rawPath, PathType.TABLE_REF);
  // ...
  // 转化为目标 table BaseTableRef 或 CollectionTableRef
  Preconditions.checkNotNull(resolvedPath);
  if (resolvedPath.destTable() != null) {
    FeTable table = resolvedPath.destTable();
    if (table instanceof FeView) return new InlineViewRef((FeView) table, tableRef);
    // ...
    return new BaseTableRef(tableRef, resolvedPath);
  } else {
    return new CollectionTableRef(tableRef, resolvedPath, false);
  }
}

从上面方法解析出来的 TableRef,现在开始分析。

fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
public void analyze(Analyzer analyzer) throws AnalysisException {
  if (isAnalyzed_) return;
  // 注册权限
  analyzer.registerAuthAndAuditEvent(resolvedPath_.getRootTable(), priv_,
      requireGrantOption_);
  // 注册 TableRef
  desc_ = analyzer.registerTableRef(this);
  isAnalyzed_ = true;
  // 表权限
  analyzer.checkTableCapability(getTable(), Analyzer.OperationType.ANY);
  // 开始分析
  analyzeTableSample(analyzer);
  analyzeTimeTravel(analyzer);
  analyzeHints(analyzer);
  analyzeJoin(analyzer);
  analyzeSkipHeaderLineCount();
}

# rewrite

就简单看下 SelectStmt 的 rewrite。

fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
  Preconditions.checkState(isAnalyzed());
  // 重写 selectList
  selectList_.rewriteExprs(rewriter, analyzer_);
  // 重写 tableRef
  for (TableRef ref: fromClause_.getTableRefs()) ref.rewriteExprs(rewriter, analyzer_);
  List<Subquery> subqueryExprs = new ArrayList<>();
  // 重写 where
  if (whereClause_ != null) {
    whereClause_ = rewriter.rewrite(whereClause_, analyzer_);
    whereClause_.collect(Subquery.class, subqueryExprs);
  }
  // 重写 having
  if (havingClause_ != null) {
    havingClause_ = rewriteCheckOrdinalResult(rewriter, havingClause_);
    havingClause_.collect(Subquery.class, subqueryExprs);
  }
  // 重写子查询
  for (Subquery s : subqueryExprs) s.getStatement().rewriteExprs(rewriter);
  // 重写 group by
  if (groupingExprs_ != null) {
    for (int i = 0; i < groupingExprs_.size(); ++i) {
      groupingExprs_.set(i, rewriteCheckOrdinalResult(
          rewriter, groupingExprs_.get(i)));
    }
  }
  // 重写 order by
  if (orderByElements_ != null) {
    for (OrderByElement orderByElem: orderByElements_) {
      orderByElem.setExpr(rewriteCheckOrdinalResult(rewriter, orderByElem.getExpr()));
    }
  }
}

# plan

接下来到了真正生成查询计划的阶段。

fe/src/main/java/org/apache/impala/service/Frontend.java
private TQueryExecRequest getPlannedExecRequest(PlanCtx planCtx,
    AnalysisResult analysisResult, EventSequence timeline)
    throws ImpalaException {
  // ...
  // 初始化一个 planner
  Planner planner = new Planner(analysisResult, queryCtx, timeline);
  // 核心逻辑
  TQueryExecRequest queryExecRequest = createExecRequest(planner, planCtx);
  
  // ...
  return queryExecRequest;
}
private TQueryExecRequest createExecRequest(
    Planner planner, PlanCtx planCtx) throws ImpalaException {
  // 生成计划
  List<PlanFragment> planRoots = planner.createPlans();
  // 预估资源消耗
  Planner.computeResourceReqs(planRoots, queryCtx, result,
      planner.getPlannerCtx(), planner.getAnalysisResult().isQueryStmt());
  // ...
  return result;
}

开始生成计划片段,根据配置会生成单节点计划、分布式计划或并行计划。

fe/src/main/java/org/apache/impala/planner/Planner.java
public List<PlanFragment> createPlans() throws ImpalaException {
  // 生成分布式计划片段
  List<PlanFragment> distrPlan = createPlanFragments();
  Preconditions.checkNotNull(distrPlan);
  // 没有开启并行,开启并行的话 mt_dop > 0 且 num_nodes != 1
  if (!useParallelPlan(ctx_)) {
    return Collections.singletonList(distrPlan.get(0));
  }
  // 开启并行
  ParallelPlanner planner = new ParallelPlanner(ctx_);
  List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
  ctx_.getTimeline().markEvent("Parallel plans created");
  return parallelPlans;
}
private List<PlanFragment> createPlanFragments() throws ImpalaException {
  // 先生成单节点计划,再生成分布式计划
  SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
  DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
  PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
  
  // ...
  if (ctx_.isSingleNodeExec()) {
    // 单节点计划
    fragments = Lists.newArrayList(new PlanFragment(
        ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
  } else {
    // 分布式计划
    fragments = distributedPlanner.createPlanFragments(singleNodePlan);
  }
  // ...
  return fragments;
}

下面是单节点计划。

fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
public PlanNode createSingleNodePlan() throws ImpalaException {
  // ...
  PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
      ctx_.getQueryOptions().isDisable_outermost_topn());c
  // ...
  return singleNodePlan;
}
private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, boolean disableTopN)
    throws ImpalaException {
  // 空结果
  if (analyzer.hasEmptyResultSet()) return createEmptyNode(stmt, analyzer);
  PlanNode root;
  if (stmt instanceof SelectStmt) {
    SelectStmt selectStmt = (SelectStmt) stmt;
    root = createSelectPlan(selectStmt, analyzer);
    // ...
  } else {
    Preconditions.checkState(stmt instanceof UnionStmt);
    root = createUnionPlan((UnionStmt) stmt, analyzer);
  }
  // ...
  return root;
}
private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer)
    throws ImpalaException {
  // 没有 from 语句,生成一个 UnionNode
  if (selectStmt.getTableRefs().isEmpty()) {
    return createConstantSelectPlan(selectStmt, analyzer);
  }
  // 简单地理解成标记需要的字段
  selectStmt.materializeRequiredSlots(analyzer);
  List<TupleId> rowTuples = new ArrayList<>();
  // 子节点 tuple
  for (TableRef tblRef: selectStmt.getTableRefs()) {
    rowTuples.addAll(tblRef.getMaterializedTupleIds());
  }
  // ...
  // 计算父子节点
  List<TableRef> parentRefs = new ArrayList<>();
  List<SubplanRef> subplanRefs = new ArrayList<>();
  computeParentAndSubplanRefs(
      selectStmt.getTableRefs(), analyzer.isStraightJoin(), parentRefs, subplanRefs);
  // 根据父子节点生成一个 PlanNode
  MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo();
  PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, multiAggInfo, analyzer);
  // ...
  return root;
}
private PlanNode createTableRefsPlan(List<TableRef> parentRefs,
    List<SubplanRef> subplanRefs, MultiAggregateInfo aggInfo, Analyzer analyzer)
    throws ImpalaException {
  List<Pair<TableRef, PlanNode>> parentRefPlans = new ArrayList<>();
  // 这步做了个啥没看懂,先不看了,对于当前的语句其实是没用实质用处的
  List<CollectionTableRef> unnestCollectionRefs =
      extractZippingUnnestTableRefs(parentRefs);
  reduceUnnestCollectionRefs(parentRefs, unnestCollectionRefs);
  // 生成 PlanNode
  for (TableRef ref: parentRefs) {
    // 根节点
    PlanNode root = createTableRefNode(ref, aggInfo, analyzer, unnestCollectionRefs);
    Preconditions.checkNotNull(root);
    // 子节点
    root = createSubplan(root, subplanRefs, true, analyzer);
    parentRefPlans.add(new Pair<TableRef, PlanNode>(ref, root));
  }
  // ...
  PlanNode root = null;
  if (!analyzer.isStraightJoin()) {
    // 优化 Join 连接顺序
    root = createCheapestJoinPlan(analyzer, parentRefPlans, subplanRefs);
    // ...
  }
  // ...
  return root;
}
private PlanNode createTableRefNode(TableRef tblRef, MultiAggregateInfo aggInfo,
    Analyzer analyzer, List<CollectionTableRef> collectionRefsToZip)
    throws ImpalaException {
  PlanNode result = null;
  if (tblRef instanceof BaseTableRef) {
    //select * from test 会走到这里,
    result = createScanNode(tblRef, aggInfo, analyzer);
  } 
  // 其他情况会构造相对应的 Node
  return result;
}
private PlanNode createScanNode(TableRef tblRef, MultiAggregateInfo aggInfo,
    Analyzer analyzer) throws ImpalaException {
  // ...
  FeTable table = tblRef.getTable();
  if (table instanceof FeFsTable) {
    if (table instanceof FeIcebergTable) {
      IcebergScanPlanner icebergPlanner = new IcebergScanPlanner(analyzer, ctx_, tblRef,
          conjuncts, aggInfo);
      // 构造一个 Iceberg ScanPlan
      return icebergPlanner.createIcebergScanPlan();
    }
    // 构造一个 Hdfs ScanPlan,再往下就先不看了
    return createHdfsScanPlan(tblRef, aggInfo, conjuncts, analyzer);
  } 
  // ...
}

到此生成了一个单节点计划 singleNodePlan,它其实是一个 HdfsScanNode。下面是分布式计划。

public List<PlanFragment> createPlanFragments(
    PlanNode singleNodePlan) throws ImpalaException {
  // ...
  createPlanFragments(singleNodePlan, isPartitioned, fragments);
  return fragments;
}
public PlanFragment createPlanFragments(
    PlanNode root, boolean isPartitioned, List<PlanFragment> fragments)
    throws ImpalaException {
  // 递归创建片段,对于 select * from test,没有子节点
  for (PlanNode child: root.getChildren()) {
      boolean childIsPartitioned = child.allowPartitioned();
      if (root instanceof SubplanNode && child == root.getChild(1)) continue;
      childFragments.add(createPlanFragments(child, childIsPartitioned, fragments));
    }
  // ...
  PlanFragment result = null;
  if (root instanceof ScanNode) {
    // 创建扫描片段
    result = createScanFragment(root);
    fragments.add(result);
  }
  // 其他 Fragment 如 HashJoin、Aggregate 等
  // 合并,isPartitioned 在 select 语句中是 false,Fragment 有一个 DataPartition 字段,它的 type 分为 UNPARTITIONED、RANDOM、HASH_PARTITIONED、RANGE_PARTITIONED、KUDU,只要不是 UNPARTITIONED 就返回 true,此时 type 是 Random
  // 这种情况下,要将结果汇总
  if (!isPartitioned && result.isPartitioned()) {
    result = createMergeFragment(result);
    fragments.add(result);
  }
  return result;
}
private PlanFragment createScanFragment(PlanNode node) {
  // 这里就创建了 RANDOM 类型的片段
  return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM);
}
fe/src/main/java/org/apache/impala/planner/PlanFragment.java
public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) {
  fragmentId_ = id;
  planRoot_ = root;
  dataPartition_ = partition;
  outputPartition_ = DataPartition.UNPARTITIONED;
  // 设置执行片段
  setFragmentInPlanTree(planRoot_);
}
public void setFragmentInPlanTree(PlanNode node) {
  if (node == null) return;
  node.setFragment(this);
  // 到 ExchangeNode 停止,不再找子节点,select * from test 中,没有 ExchangeNode,也没有子节点,方法执行一遍
  if (node instanceof ExchangeNode) return;
  for (PlanNode child : node.getChildren()) setFragmentInPlanTree(child);
}
private PlanFragment createMergeFragment(PlanFragment inputFragment)
      throws ImpalaException {
  // 这里创建了一个 ExchangeNode
  ExchangeNode mergePlan =
      new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot());
  mergePlan.init(ctx_.getRootAnalyzer());
  // 再创建一个 UNPARTITIONED 的计划片段
  PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan,
      DataPartition.UNPARTITIONED);
  // 连接两个片段
  inputFragment.setDestination(mergePlan);
  return fragment;
}

# 执行计划

执行计划生成后,就到了 Executor 执行。

be/src/service/client-request-state.cc
Status ClientRequestState::Exec() {
  // profile
  switch (exec_request_->stmt_type) {
    case TStmtType::QUERY:
    case TStmtType::DML:
      RETURN_IF_ERROR(
          ExecQueryOrDmlRequest(exec_request_->query_exec_request, true /*async*/));
      break;
    // other case
  }
  // 这里将状态改为 RUNNING
  UpdateNonErrorExecState(ExecState::RUNNING);
  return Status::OK();
}
Status ClientRequestState::ExecQueryOrDmlRequest(
    const TQueryExecRequest& query_exec_request, bool isAsync) {
  // profile
  if (isAsync) {
    // 异步执行,这里将状态改为 PENDING
    UpdateNonErrorExecState(ExecState::PENDING);
    RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
        &ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_,
        true));
  } else {
    // 同步执行
    FinishExecQueryOrDmlRequest();
    return query_status_;
  }
  return Status::OK();
}
void ClientRequestState::FinishExecQueryOrDmlRequest() {
  // 准入
  Status admit_status = admission_control_client_->SubmitForAdmission(
      {query_id_pb, ExecEnv::GetInstance()->backend_id(),
          exec_request_->query_exec_request, exec_request_->query_options,
          summary_profile_, blacklisted_executor_addresses_},
      query_events_, &schedule_);
  
  // ...
  // 初始化 Coordinator,并执行
  coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_));
  Status exec_status = coord_->Exec();
  // ...
}

# 准入

下面是向本地提交准入请求的过程,准入会对系统配置、资源等进行预估与检查,直到符合要求才会提交查询。

be/src/scheduling/local-admission-control-client.cc
Status LocalAdmissionControlClient::SubmitForAdmission(
    const AdmissionController::AdmissionRequest& request,
    RuntimeProfile::EventSequence* query_events,
    std::unique_ptr<QuerySchedulePB>* schedule_result) {
  // ...
  bool queued;
  // 准入提交
  Status status = ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
      request, &admit_outcome_, schedule_result, queued);
  if (queued) {
    // ...
    // 排队等待
    status = ExecEnv::GetInstance()->admission_controller()->WaitOnQueued(
        request.query_id, schedule_result);
  }
  return status;
}
be/src/scheduling/admission-controller.cc
Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
    Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
    unique_ptr<QuerySchedulePB>* schedule_result, bool& queued,
    std::string* request_pool) {
  
  // ...
  {
    // 锁住
    lock_guard<mutex> lock(admission_ctrl_lock_);
    // 提交或拒绝
    bool must_reject =
        !FindGroupToAdmitOrReject(membership_snapshot, queue_node->pool_cfg,
            /* admit_from_queue=*/false, stats, queue_node, unused_bool);
    // 拒绝
    if (must_reject) {
      // ...
      return Status::Expected(rejected_msg);
    }
    if (queue_node->admitted_schedule.get() != nullptr) {
      // ...
      // 提交查询
      AdmitQuery(queue_node, false);
      
      // ...
      return Status::OK();
    }
    // ...
    // 入队
    stats->Queue();
    queue->Enqueue(queue_node);
    // ...
  }
  // ...
  return Status::OK();
}
bool AdmissionController::FindGroupToAdmitOrReject(
    ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config,
    bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node,
    bool& coordinator_resource_limited) {
  // 检查资源池状态,最大最小内存配置,请求数量,是否已满等
  if (RejectForCluster(pool_name, pool_config, admit_from_queue, &rejection_reason)) {
    DCHECK(!rejection_reason.empty());
    queue_node->not_admitted_reason = rejection_reason;
    return false;
  }
  // 计算调度状态
  Status ret = ComputeGroupScheduleStates(membership_snapshot, queue_node);
  if (!ret.ok()) {
    DCHECK(queue_node->not_admitted_reason.empty());
    queue_node->not_admitted_reason = Substitute(REASON_SCHEDULER_ERROR, ret.GetDetail());
    return false;
  }
  if (queue_node->group_states.empty()) {
    DCHECK(!queue_node->not_admitted_reason.empty());
    return true;
  }
  // 遍历所有的 group,检查剩余资源是否足够
  for (GroupScheduleState& group_state : queue_node->group_states) {
    const ExecutorGroup& executor_group = group_state.executor_group;
    ScheduleState* state = group_state.state.get();
    // 更新内存资源使用
    state->UpdateMemoryRequirements(pool_config);
    // ...
    // 有任何一个 group 拒绝调度都会拒绝执行,这里面包括剩余 slot、剩余内存等
    if (RejectForSchedule(*state, pool_config, &rejection_reason)) {
      DCHECK(!rejection_reason.empty());
      queue_node->not_admitted_reason = rejection_reason;
      return false;
    }
    // 检查资源池资源
    if (CanAdmitRequest(*state, pool_config, admit_from_queue,
            &queue_node->not_admitted_reason, &queue_node->not_admitted_details,
            coordinator_resource_limited)) {
      // 提交任务
      queue_node->admitted_schedule = std::move(group_state.state);
      return true;
    } else {
      VLOG_RPC << "Cannot admit query " << queue_node->admission_request.query_id
               << " to group " << group_name << ": " << queue_node->not_admitted_reason
               << " Details:" << queue_node->not_admitted_details;
    }
  }
  return true;
}

# 执行

准入通过之后就提交给各个 Executor 执行。

be/src/runtime/coordinator.cc
Status Coordinator::Exec() {
  // ...
  // 初始化状态
  InitFragmentStats();
  InitBackendStates();
  exec_summary_.Init(exec_params_);
  if (filter_mode_ != TRuntimeFilterMode::OFF) {
    InitFilterRoutingTable();
  }
  // 开始执行
  RETURN_IF_ERROR(StartBackendExec());
  // 结束执行
  RETURN_IF_ERROR(FinishBackendStartup());
  // ...
  return Status::OK();
}
Status Coordinator::StartBackendExec() {
  int num_backends = backend_states_.size();
  // 多个 Executor 协同,ExecAsync 中每个 Executor 执行完会 notify 下
  backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
  // ...
  for (BackendState* backend_state: backend_states_) {
    // ...
    // 异步执行
    backend_state->ExecAsync(debug_options, *filter_routing_table_, query_ctx_slice,
        &exec_rpcs_status_barrier_);
  }
  // 等待所有 Executor 执行结束
  Status exec_rpc_status = exec_rpcs_status_barrier_.Wait();
  if (!exec_rpc_status.ok()) {
    // 有 Executor 执行失败,取消执行
    CancelBackends(/*fire_and_forget=*/ true);
    // 等待锁释放
    WaitOnExecRpcs();
    // ...
    return UpdateExecState(exec_rpc_status, nullptr, FLAGS_hostname);
  }
  // 等待锁释放
  WaitOnExecRpcs();
  
  // ...
  return Status::OK();
}

接下来 Coordinator 像各个 Executor 提交查询请求。

be/src/runtime/coordinator-backend-state.cc
void Coordinator::BackendState::ExecAsync(const DebugOptions& debug_options,
    const FilterRoutingTable& filter_routing_table,
    const kudu::Slice& serialized_query_ctx,
    TypedCountingBarrier<Status>* exec_status_barrier) {
  {
    lock_guard<mutex> l(lock_);
    // ...
    // 异步执行查询
    proxy->ExecQueryFInstancesAsync(request, &exec_response_, &exec_rpc_controller_,
        std::bind(&Coordinator::BackendState::ExecCompleteCb, this, exec_status_barrier,
            MonotonicMillis()));
    return;
  }
done:
  // 唤醒等待的 Coordinator 
  exec_done_cv_.NotifyAll();
}

代理调用 rpc 接口,向 Executor 发起请求。

be/generated-sources/gen-cpp/control_service.proxy.cc
void ControlServiceProxy::ExecQueryFInstancesAsync(
    const ExecQueryFInstancesRequestPB& req,
    ExecQueryFInstancesResponsePB* resp,
    ::kudu::rpc::RpcController* controller,
    const ::kudu::rpc::ResponseCallback& callback) {
  static const std::string kRpcName = "ExecQueryFInstances";
  // 异步 rpc 请求 ExecQueryFInstances 接口
  AsyncRequest(kRpcName, req, resp, controller, callback);
}

这里到了 Executor 的接口。

be/src/service/control-service.cc
void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request,
    ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
  // ...
  // 执行
  Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
      request, query_ctx, fragment_info);
  // ...
  // 返回结果
  RespondAndReleaseRpc(resp_status, response, rpc_context);
}

创建线程执行查询片段。

be/src/runtime/query-exec-mgr.cc
Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
    const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
  // ...
  // 创建线程执行
  unique_ptr<Thread> t;
  status = Thread::Create("query-exec-mgr",
      Substitute("query-state-$0", PrintId(query_id)),
          &QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
  // ...
  return Status::OK();
}
void QueryExecMgr::ExecuteQueryHelper(QueryState* qs) {
  // 执行片段,这里也不再继续往下看了
  if (LIKELY(qs->StartFInstances())) qs->MonitorFInstances();
  // ...
}

# 总结

整个流程对应 SQL 查询流程 中的图。
首先,Client 向其中一个 impalad 提交一个查询 SQL;然后通过 JNI 调用 Query Planner 接口生成查询计划,其中会经历 parse、analysis、rewrite 以及 plan 阶段;在 plan 阶段会生成单机执行计划、分布式执行计划和并行执行计划;拿到执行计划后,Query Coordinator 通过 rpc 接口调用 Query Executor 真正执行每个计划片段;然后执行结果返回给 Query Coordinator,最终由它汇总返回给 Client。