本文将从一个简单的 SQL 来分析查询的整个流程。
# 服务启动
Impala 启动时需要启动 3 个服务,impalad、catalogd 和 statestored,在上篇 SQL 查询查询流程中我们知道,客户端查询的 SQL 会提交给 impalad。
impalad 启动时会为客户端启动几个端口。
std::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env)); | |
Status status = impala_server->Start(FLAGS_beeswax_port, FLAGS_hs2_port, | |
FLAGS_hs2_http_port, FLAGS_external_fe_port); |
# 查询流程
这里以 hs2 接口为例,仅粘贴关键代码,省略部分注释作了简要说明。
首先,server 接到客户端查询请求,开始执行请求。
void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, | |
const TExecuteStatementReq& request) { | |
// log | |
ExecuteStatementCommon(return_val, request); | |
} | |
void ImpalaServer::ExecutePlannedStatement( | |
TExecuteStatementResp& return_val, | |
const TExecutePlannedStatementReq& request) { | |
// log and check | |
ExecuteStatementCommon(return_val, request.statementReq, &request.plan); | |
} | |
void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val, | |
const TExecuteStatementReq& request, const TExecRequest* external_exec_request) { | |
// log and init context/session.... | |
status = Execute(&query_ctx, session, &query_handle, external_exec_request); | |
// handle result | |
} |
所有的接口协议最终都会执行到 impala-server,这里 RunFrontendPlanner 调用 fe 生成执行计划,然后开始执行,Exec 是 c++ 部分。
Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> session_state, | |
QueryHandle* query_handle, | |
const TExecRequest* external_exec_request) { | |
// log and metrics | |
Status status = ExecuteInternal(*query_ctx, external_exec_request, session_state, | |
®istered_query, query_handle); | |
// handle result | |
return status; | |
} | |
Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx, | |
const TExecRequest* external_exec_request, shared_ptr<SessionState> session_state, | |
bool* registered_query, QueryHandle* query_handle) { | |
// check and init request | |
RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx)); | |
// handle result | |
RETURN_IF_ERROR((*query_handle)->Exec()); | |
} |
# 生成查询计划
后面可以看到,server 通过 JNI 调用 Java 执行请求。
Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) { | |
// check | |
RETURN_IF_ERROR(client_request_state_->UpdateQueryStatus( | |
ExecEnv::GetInstance()->frontend()->GetExecRequest( | |
query_ctx, exec_request_.get()))); | |
return Status::OK(); | |
} |
这里封装了一层调 Java 接口的方法。
Status Frontend::GetExecRequest( | |
const TQueryCtx& query_ctx, TExecRequest* result) { | |
return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result); | |
} |
接下来就是 Java 部分了。
public byte[] createExecRequest(byte[] thriftQueryContext) | |
throws ImpalaException { | |
// check and init planCtx | |
TExecRequest result = frontend_.createExecRequest(planCtx); | |
// log and handle result | |
} |
下面是 Planner 开始工作,分别经历 parse、analyze、rewrite、plan 阶段。
/** | |
* 这个方法主要就是向 executor group 提交执行 | |
*/ | |
public TExecRequest createExecRequest(PlanCtx planCtx) | |
throws ImpalaException { | |
// timing | |
TExecRequest result = getTExecRequest(planCtx, timeline); | |
// hadndle result | |
} | |
private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) | |
throws ImpalaException { | |
// lookup for executor groups | |
for (int i = 0; i < num_executor_group_sets; i++) { | |
req = doCreateExecRequest(planCtx, timeline); | |
} | |
// return req | |
} | |
private TExecRequest doCreateExecRequest(PlanCtx planCtx, | |
EventSequence timeline) throws ImpalaException { | |
TQueryCtx queryCtx = planCtx.getQueryContext(); | |
// 解析 SQL 语句,具体实现见下篇博客 | |
StatementBase stmt = Parser.parse( | |
queryCtx.client_request.stmt, queryCtx.client_request.query_options); | |
// ... | |
// 具体实现在后面分析,就是分析了整个 SQL,也会进行重写 | |
AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(stmt, stmtTableCache, | |
authzChecker_.get(), planCtx.compilationState_.disableAuthorization()); | |
// ... | |
// 先创建一个执行请求,就是根据分析结果创建了一个 request 对象 | |
TExecRequest result = createBaseExecRequest(queryCtx, analysisResult); | |
try { | |
// DDL | |
if (analysisResult.isCatalogOp()) { | |
result.stmt_type = TStmtType.DDL; | |
// 向 catalogd 发起 ddl 操作 | |
createCatalogOpRequest(analysisResult, result); | |
// ... | |
} | |
// ... | |
//plan 阶段,构造执行计划,后面也会分析 | |
TQueryExecRequest queryExecRequest = | |
getPlannedExecRequest(planCtx, analysisResult, timeline); | |
// ... | |
// explain | |
if (analysisResult.isExplainStmt()) { | |
createExplainRequest(planCtx.getExplainString(), result); | |
return result; | |
} | |
// ... | |
return result; | |
} catch (Exception e) { | |
// handle exception | |
} | |
} |
# parse
impala 并没有实现自己的 parser,而是借助 cup 和 jflex 实现的。这部分后面博客会讲。
# analyze
这部分主要是分析整个 SQL,为后续生成执行计划做铺垫。
public AnalysisResult analyzeAndAuthorize(StatementBase stmt, | |
StmtTableCache stmtTableCache, AuthorizationChecker authzChecker, | |
boolean disableAuthorization) throws ImpalaException { | |
// ... | |
// 分析 | |
analyze(stmtTableCache, authzCtx); | |
// ... | |
return analysisResult_; | |
} | |
private void analyze(StmtTableCache stmtTableCache, AuthorizationContext authzCtx) | |
throws AnalysisException { | |
// ... | |
// 为当前语句创建一个 Analyzer | |
analysisResult_.analyzer_ = createAnalyzer(stmtTableCache, authzCtx); | |
// 分析语句,我的查询语句是 select * from test,对应的 stmt 是 SelectStmt,下面来看下它的 analyze 方法 | |
analysisResult_.stmt_.analyze(analysisResult_.analyzer_); | |
// 下面是 rewrite,会进行重写,重写完要重新分析 | |
ExprRewriter rewriter = analysisResult_.analyzer_.getExprRewriter(); | |
// ... | |
analysisResult_.stmt_.rewriteExprs(rewriter);`` | |
// ... | |
// 重新分析 | |
reAnalyze(stmtTableCache, authzCtx, origResultTypes, origColLabels, | |
/*collectPrivileges*/ false); | |
// ... | |
} |
这里就到了具体的语句,SelectStmt 是具体的语句,它继承自 QueryStmt,QueryStmt 继承自 StatementBase,StatementBase 是所有语句的父类,然而它又继承自 StmtNode,StmtNode 继承自 ParseNode。
类关系图如图:
首先是最上层 SelectStmt。
@Override | |
public void analyze(Analyzer analyzer) throws AnalysisException { | |
//analyzer_ 初始为空 | |
if (isAnalyzed()) return; | |
// 进入父类 QueryStmt 的方法,这里就是分析了下查询语句公共的 limit 和 with | |
super.analyze(analyzer); | |
// 开始 Select 语句分析 | |
new SelectAnalyzer(analyzer).analyze(); | |
// 针对 iceberg 表优化 count (*) | |
this.optimizePlainCountStarQuery(); | |
} | |
private class SelectAnalyzer { | |
private void analyze() throws AnalysisException { | |
// 分析 from 语句 | |
fromClause_.analyze(analyzer_); | |
// 注册所有的结构化 SlotRef,不包括普通字段 | |
registerStructSlotRefPathsWithAnalyzer(); | |
// 分析 select 语句,收集所有的 Expr | |
analyzeSelectClause(); | |
// 校验合法性 | |
verifyResultExprs(); | |
registerViewColumnPrivileges(); | |
// 分析 where 语句 | |
analyzeWhereClause(); | |
// sort | |
createSortInfo(analyzer_); | |
// ... | |
// 创建分析函数信息 | |
createAnalyticInfo(); | |
// 创建排序相关信息 | |
if (evaluateOrderBy_) createSortTupleInfo(analyzer_); | |
// ... | |
} | |
} |
然后是所有查询类的抽象类。
@Override | |
public void analyze(Analyzer analyzer) throws AnalysisException { | |
//analyzer_ 初始为空 | |
if (isAnalyzed()) return; | |
// 进入 StatementBase 的方法 | |
super.analyze(analyzer); | |
// 分析 limit 和 with 语句,可见这俩包装在所有查询语句之外,是所有查询语句共有的 | |
analyzeLimit(analyzer); | |
if (hasWithClause()) withClause_.analyze(analyzer); | |
} | |
private void analyzeLimit(Analyzer analyzer) throws AnalysisException { | |
// 限制 limit 语句必须有 order by,不然这里查询结果其实每次都会不一致,也没有意义 | |
if (limitElement_.getOffsetExpr() != null && !hasOrderByClause()) { | |
throw new AnalysisException("OFFSET requires an ORDER BY clause: " + | |
limitElement_.toSql().trim()); | |
} | |
// 分析 limit,这里不再往下看了 | |
limitElement_.analyze(analyzer); | |
} |
接下来是所有 SQL 语句的抽象类。
/** | |
* 基础类中其实没有做分析逻辑,只是设置了下 analyzer。 | |
*/ | |
@Override | |
public void analyze(Analyzer analyzer) throws AnalysisException { | |
//analyzer_ 初始为空 | |
if (isAnalyzed()) return; | |
//explain 语句 | |
if (isExplain_) analyzer.setIsExplain(); | |
// 设置 analyzer | |
analyzer_ = analyzer; | |
} |
SelectStmt 中首先分析 from 语句。FromClause 继承 StmtNode,实现 Iterable<TableRef>,其实就是一个 TableRef 列表。From 语句的分析,其实就是 TableRef 的分析。
@Override | |
public void analyze(Analyzer analyzer) throws AnalysisException { | |
// ... | |
// 遍历所有的 tableRef select * from test 语句,只有一个 tableRef | |
for (int i = 0; i < tableRefs_.size(); ++i) { | |
TableRef tblRef = tableRefs_.get(i); | |
// 解析 TableRef,结果为 BaseTableRef | |
tblRef = analyzer.resolveTableRef(tblRef); | |
tableRefs_.set(i, Preconditions.checkNotNull(tblRef)); | |
tblRef.setLeftTblRef(leftTblRef); | |
// 这里会注册 join 等 | |
tblRef.analyze(analyzer); | |
// ... | |
} | |
// ... |
解析 TableRef。
public TableRef resolveTableRef(TableRef tableRef) | |
throws AnalysisException { | |
// ... | |
// 这个 local view 应该是 with 语句中的查询 | |
// 获取 rawPath,表示数据表,如果是 db.tb,那么 size 为 2,元素分别是 db 和 tb,如果没有 db 前缀,size 为 1 | |
if (tableRef.getPath().size() == 1) { | |
String viewAlias = tableRef.getPath().get(0).toLowerCase(); | |
Analyzer analyzer = this; | |
do { | |
FeView localView = analyzer.localViews_.get(viewAlias); | |
if (localView != null) return new InlineViewRef(localView, tableRef); | |
analyzer = (analyzer.ancestors_.isEmpty() ? null : analyzer.ancestors_.get(0)); | |
} while (analyzer != null); | |
} | |
// 解析表名,会调用 catalog 获取 table | |
List<String> rawPath = tableRef.getPath(); | |
// ... | |
resolvedPath = resolvePathWithMasking(rawPath, PathType.TABLE_REF); | |
// ... | |
// 转化为目标 table BaseTableRef 或 CollectionTableRef | |
Preconditions.checkNotNull(resolvedPath); | |
if (resolvedPath.destTable() != null) { | |
FeTable table = resolvedPath.destTable(); | |
if (table instanceof FeView) return new InlineViewRef((FeView) table, tableRef); | |
// ... | |
return new BaseTableRef(tableRef, resolvedPath); | |
} else { | |
return new CollectionTableRef(tableRef, resolvedPath, false); | |
} | |
} |
从上面方法解析出来的 TableRef,现在开始分析。
public void analyze(Analyzer analyzer) throws AnalysisException { | |
if (isAnalyzed_) return; | |
// 注册权限 | |
analyzer.registerAuthAndAuditEvent(resolvedPath_.getRootTable(), priv_, | |
requireGrantOption_); | |
// 注册 TableRef | |
desc_ = analyzer.registerTableRef(this); | |
isAnalyzed_ = true; | |
// 表权限 | |
analyzer.checkTableCapability(getTable(), Analyzer.OperationType.ANY); | |
// 开始分析 | |
analyzeTableSample(analyzer); | |
analyzeTimeTravel(analyzer); | |
analyzeHints(analyzer); | |
analyzeJoin(analyzer); | |
analyzeSkipHeaderLineCount(); | |
} |
# rewrite
就简单看下 SelectStmt 的 rewrite。
public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException { | |
Preconditions.checkState(isAnalyzed()); | |
// 重写 selectList | |
selectList_.rewriteExprs(rewriter, analyzer_); | |
// 重写 tableRef | |
for (TableRef ref: fromClause_.getTableRefs()) ref.rewriteExprs(rewriter, analyzer_); | |
List<Subquery> subqueryExprs = new ArrayList<>(); | |
// 重写 where | |
if (whereClause_ != null) { | |
whereClause_ = rewriter.rewrite(whereClause_, analyzer_); | |
whereClause_.collect(Subquery.class, subqueryExprs); | |
} | |
// 重写 having | |
if (havingClause_ != null) { | |
havingClause_ = rewriteCheckOrdinalResult(rewriter, havingClause_); | |
havingClause_.collect(Subquery.class, subqueryExprs); | |
} | |
// 重写子查询 | |
for (Subquery s : subqueryExprs) s.getStatement().rewriteExprs(rewriter); | |
// 重写 group by | |
if (groupingExprs_ != null) { | |
for (int i = 0; i < groupingExprs_.size(); ++i) { | |
groupingExprs_.set(i, rewriteCheckOrdinalResult( | |
rewriter, groupingExprs_.get(i))); | |
} | |
} | |
// 重写 order by | |
if (orderByElements_ != null) { | |
for (OrderByElement orderByElem: orderByElements_) { | |
orderByElem.setExpr(rewriteCheckOrdinalResult(rewriter, orderByElem.getExpr())); | |
} | |
} | |
} |
# plan
接下来到了真正生成查询计划的阶段。
private TQueryExecRequest getPlannedExecRequest(PlanCtx planCtx, | |
AnalysisResult analysisResult, EventSequence timeline) | |
throws ImpalaException { | |
// ... | |
// 初始化一个 planner | |
Planner planner = new Planner(analysisResult, queryCtx, timeline); | |
// 核心逻辑 | |
TQueryExecRequest queryExecRequest = createExecRequest(planner, planCtx); | |
// ... | |
return queryExecRequest; | |
} | |
private TQueryExecRequest createExecRequest( | |
Planner planner, PlanCtx planCtx) throws ImpalaException { | |
// 生成计划 | |
List<PlanFragment> planRoots = planner.createPlans(); | |
// 预估资源消耗 | |
Planner.computeResourceReqs(planRoots, queryCtx, result, | |
planner.getPlannerCtx(), planner.getAnalysisResult().isQueryStmt()); | |
// ... | |
return result; | |
} |
开始生成计划片段,根据配置会生成单节点计划、分布式计划或并行计划。
public List<PlanFragment> createPlans() throws ImpalaException { | |
// 生成分布式计划片段 | |
List<PlanFragment> distrPlan = createPlanFragments(); | |
Preconditions.checkNotNull(distrPlan); | |
// 没有开启并行,开启并行的话 mt_dop > 0 且 num_nodes != 1 | |
if (!useParallelPlan(ctx_)) { | |
return Collections.singletonList(distrPlan.get(0)); | |
} | |
// 开启并行 | |
ParallelPlanner planner = new ParallelPlanner(ctx_); | |
List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0)); | |
ctx_.getTimeline().markEvent("Parallel plans created"); | |
return parallelPlans; | |
} | |
private List<PlanFragment> createPlanFragments() throws ImpalaException { | |
// 先生成单节点计划,再生成分布式计划 | |
SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_); | |
DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_); | |
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan(); | |
// ... | |
if (ctx_.isSingleNodeExec()) { | |
// 单节点计划 | |
fragments = Lists.newArrayList(new PlanFragment( | |
ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED)); | |
} else { | |
// 分布式计划 | |
fragments = distributedPlanner.createPlanFragments(singleNodePlan); | |
} | |
// ... | |
return fragments; | |
} |
下面是单节点计划。
public PlanNode createSingleNodePlan() throws ImpalaException { | |
// ... | |
PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer, | |
ctx_.getQueryOptions().isDisable_outermost_topn());c | |
// ... | |
return singleNodePlan; | |
} | |
private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, boolean disableTopN) | |
throws ImpalaException { | |
// 空结果 | |
if (analyzer.hasEmptyResultSet()) return createEmptyNode(stmt, analyzer); | |
PlanNode root; | |
if (stmt instanceof SelectStmt) { | |
SelectStmt selectStmt = (SelectStmt) stmt; | |
root = createSelectPlan(selectStmt, analyzer); | |
// ... | |
} else { | |
Preconditions.checkState(stmt instanceof UnionStmt); | |
root = createUnionPlan((UnionStmt) stmt, analyzer); | |
} | |
// ... | |
return root; | |
} | |
private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer) | |
throws ImpalaException { | |
// 没有 from 语句,生成一个 UnionNode | |
if (selectStmt.getTableRefs().isEmpty()) { | |
return createConstantSelectPlan(selectStmt, analyzer); | |
} | |
// 简单地理解成标记需要的字段 | |
selectStmt.materializeRequiredSlots(analyzer); | |
List<TupleId> rowTuples = new ArrayList<>(); | |
// 子节点 tuple | |
for (TableRef tblRef: selectStmt.getTableRefs()) { | |
rowTuples.addAll(tblRef.getMaterializedTupleIds()); | |
} | |
// ... | |
// 计算父子节点 | |
List<TableRef> parentRefs = new ArrayList<>(); | |
List<SubplanRef> subplanRefs = new ArrayList<>(); | |
computeParentAndSubplanRefs( | |
selectStmt.getTableRefs(), analyzer.isStraightJoin(), parentRefs, subplanRefs); | |
// 根据父子节点生成一个 PlanNode | |
MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo(); | |
PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, multiAggInfo, analyzer); | |
// ... | |
return root; | |
} | |
private PlanNode createTableRefsPlan(List<TableRef> parentRefs, | |
List<SubplanRef> subplanRefs, MultiAggregateInfo aggInfo, Analyzer analyzer) | |
throws ImpalaException { | |
List<Pair<TableRef, PlanNode>> parentRefPlans = new ArrayList<>(); | |
// 这步做了个啥没看懂,先不看了,对于当前的语句其实是没用实质用处的 | |
List<CollectionTableRef> unnestCollectionRefs = | |
extractZippingUnnestTableRefs(parentRefs); | |
reduceUnnestCollectionRefs(parentRefs, unnestCollectionRefs); | |
// 生成 PlanNode | |
for (TableRef ref: parentRefs) { | |
// 根节点 | |
PlanNode root = createTableRefNode(ref, aggInfo, analyzer, unnestCollectionRefs); | |
Preconditions.checkNotNull(root); | |
// 子节点 | |
root = createSubplan(root, subplanRefs, true, analyzer); | |
parentRefPlans.add(new Pair<TableRef, PlanNode>(ref, root)); | |
} | |
// ... | |
PlanNode root = null; | |
if (!analyzer.isStraightJoin()) { | |
// 优化 Join 连接顺序 | |
root = createCheapestJoinPlan(analyzer, parentRefPlans, subplanRefs); | |
// ... | |
} | |
// ... | |
return root; | |
} | |
private PlanNode createTableRefNode(TableRef tblRef, MultiAggregateInfo aggInfo, | |
Analyzer analyzer, List<CollectionTableRef> collectionRefsToZip) | |
throws ImpalaException { | |
PlanNode result = null; | |
if (tblRef instanceof BaseTableRef) { | |
//select * from test 会走到这里, | |
result = createScanNode(tblRef, aggInfo, analyzer); | |
} | |
// 其他情况会构造相对应的 Node | |
return result; | |
} | |
private PlanNode createScanNode(TableRef tblRef, MultiAggregateInfo aggInfo, | |
Analyzer analyzer) throws ImpalaException { | |
// ... | |
FeTable table = tblRef.getTable(); | |
if (table instanceof FeFsTable) { | |
if (table instanceof FeIcebergTable) { | |
IcebergScanPlanner icebergPlanner = new IcebergScanPlanner(analyzer, ctx_, tblRef, | |
conjuncts, aggInfo); | |
// 构造一个 Iceberg ScanPlan | |
return icebergPlanner.createIcebergScanPlan(); | |
} | |
// 构造一个 Hdfs ScanPlan,再往下就先不看了 | |
return createHdfsScanPlan(tblRef, aggInfo, conjuncts, analyzer); | |
} | |
// ... | |
} |
到此生成了一个单节点计划 singleNodePlan,它其实是一个 HdfsScanNode。下面是分布式计划。
public List<PlanFragment> createPlanFragments( | |
PlanNode singleNodePlan) throws ImpalaException { | |
// ... | |
createPlanFragments(singleNodePlan, isPartitioned, fragments); | |
return fragments; | |
} | |
public PlanFragment createPlanFragments( | |
PlanNode root, boolean isPartitioned, List<PlanFragment> fragments) | |
throws ImpalaException { | |
// 递归创建片段,对于 select * from test,没有子节点 | |
for (PlanNode child: root.getChildren()) { | |
boolean childIsPartitioned = child.allowPartitioned(); | |
if (root instanceof SubplanNode && child == root.getChild(1)) continue; | |
childFragments.add(createPlanFragments(child, childIsPartitioned, fragments)); | |
} | |
// ... | |
PlanFragment result = null; | |
if (root instanceof ScanNode) { | |
// 创建扫描片段 | |
result = createScanFragment(root); | |
fragments.add(result); | |
} | |
// 其他 Fragment 如 HashJoin、Aggregate 等 | |
// 合并,isPartitioned 在 select 语句中是 false,Fragment 有一个 DataPartition 字段,它的 type 分为 UNPARTITIONED、RANDOM、HASH_PARTITIONED、RANGE_PARTITIONED、KUDU,只要不是 UNPARTITIONED 就返回 true,此时 type 是 Random | |
// 这种情况下,要将结果汇总 | |
if (!isPartitioned && result.isPartitioned()) { | |
result = createMergeFragment(result); | |
fragments.add(result); | |
} | |
return result; | |
} | |
private PlanFragment createScanFragment(PlanNode node) { | |
// 这里就创建了 RANDOM 类型的片段 | |
return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM); | |
} |
public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) { | |
fragmentId_ = id; | |
planRoot_ = root; | |
dataPartition_ = partition; | |
outputPartition_ = DataPartition.UNPARTITIONED; | |
// 设置执行片段 | |
setFragmentInPlanTree(planRoot_); | |
} | |
public void setFragmentInPlanTree(PlanNode node) { | |
if (node == null) return; | |
node.setFragment(this); | |
// 到 ExchangeNode 停止,不再找子节点,select * from test 中,没有 ExchangeNode,也没有子节点,方法执行一遍 | |
if (node instanceof ExchangeNode) return; | |
for (PlanNode child : node.getChildren()) setFragmentInPlanTree(child); | |
} | |
private PlanFragment createMergeFragment(PlanFragment inputFragment) | |
throws ImpalaException { | |
// 这里创建了一个 ExchangeNode | |
ExchangeNode mergePlan = | |
new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot()); | |
mergePlan.init(ctx_.getRootAnalyzer()); | |
// 再创建一个 UNPARTITIONED 的计划片段 | |
PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan, | |
DataPartition.UNPARTITIONED); | |
// 连接两个片段 | |
inputFragment.setDestination(mergePlan); | |
return fragment; | |
} |
# 执行计划
执行计划生成后,就到了 Executor 执行。
Status ClientRequestState::Exec() { | |
// profile | |
switch (exec_request_->stmt_type) { | |
case TStmtType::QUERY: | |
case TStmtType::DML: | |
RETURN_IF_ERROR( | |
ExecQueryOrDmlRequest(exec_request_->query_exec_request, true /*async*/)); | |
break; | |
// other case | |
} | |
// 这里将状态改为 RUNNING | |
UpdateNonErrorExecState(ExecState::RUNNING); | |
return Status::OK(); | |
} | |
Status ClientRequestState::ExecQueryOrDmlRequest( | |
const TQueryExecRequest& query_exec_request, bool isAsync) { | |
// profile | |
if (isAsync) { | |
// 异步执行,这里将状态改为 PENDING | |
UpdateNonErrorExecState(ExecState::PENDING); | |
RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread", | |
&ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_, | |
true)); | |
} else { | |
// 同步执行 | |
FinishExecQueryOrDmlRequest(); | |
return query_status_; | |
} | |
return Status::OK(); | |
} | |
void ClientRequestState::FinishExecQueryOrDmlRequest() { | |
// 准入 | |
Status admit_status = admission_control_client_->SubmitForAdmission( | |
{query_id_pb, ExecEnv::GetInstance()->backend_id(), | |
exec_request_->query_exec_request, exec_request_->query_options, | |
summary_profile_, blacklisted_executor_addresses_}, | |
query_events_, &schedule_); | |
// ... | |
// 初始化 Coordinator,并执行 | |
coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_)); | |
Status exec_status = coord_->Exec(); | |
// ... | |
} |
# 准入
下面是向本地提交准入请求的过程,准入会对系统配置、资源等进行预估与检查,直到符合要求才会提交查询。
Status LocalAdmissionControlClient::SubmitForAdmission( | |
const AdmissionController::AdmissionRequest& request, | |
RuntimeProfile::EventSequence* query_events, | |
std::unique_ptr<QuerySchedulePB>* schedule_result) { | |
// ... | |
bool queued; | |
// 准入提交 | |
Status status = ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission( | |
request, &admit_outcome_, schedule_result, queued); | |
if (queued) { | |
// ... | |
// 排队等待 | |
status = ExecEnv::GetInstance()->admission_controller()->WaitOnQueued( | |
request.query_id, schedule_result); | |
} | |
return status; | |
} |
Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, | |
Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome, | |
unique_ptr<QuerySchedulePB>* schedule_result, bool& queued, | |
std::string* request_pool) { | |
// ... | |
{ | |
// 锁住 | |
lock_guard<mutex> lock(admission_ctrl_lock_); | |
// 提交或拒绝 | |
bool must_reject = | |
!FindGroupToAdmitOrReject(membership_snapshot, queue_node->pool_cfg, | |
/* admit_from_queue=*/false, stats, queue_node, unused_bool); | |
// 拒绝 | |
if (must_reject) { | |
// ... | |
return Status::Expected(rejected_msg); | |
} | |
if (queue_node->admitted_schedule.get() != nullptr) { | |
// ... | |
// 提交查询 | |
AdmitQuery(queue_node, false); | |
// ... | |
return Status::OK(); | |
} | |
// ... | |
// 入队 | |
stats->Queue(); | |
queue->Enqueue(queue_node); | |
// ... | |
} | |
// ... | |
return Status::OK(); | |
} | |
bool AdmissionController::FindGroupToAdmitOrReject( | |
ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config, | |
bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node, | |
bool& coordinator_resource_limited) { | |
// 检查资源池状态,最大最小内存配置,请求数量,是否已满等 | |
if (RejectForCluster(pool_name, pool_config, admit_from_queue, &rejection_reason)) { | |
DCHECK(!rejection_reason.empty()); | |
queue_node->not_admitted_reason = rejection_reason; | |
return false; | |
} | |
// 计算调度状态 | |
Status ret = ComputeGroupScheduleStates(membership_snapshot, queue_node); | |
if (!ret.ok()) { | |
DCHECK(queue_node->not_admitted_reason.empty()); | |
queue_node->not_admitted_reason = Substitute(REASON_SCHEDULER_ERROR, ret.GetDetail()); | |
return false; | |
} | |
if (queue_node->group_states.empty()) { | |
DCHECK(!queue_node->not_admitted_reason.empty()); | |
return true; | |
} | |
// 遍历所有的 group,检查剩余资源是否足够 | |
for (GroupScheduleState& group_state : queue_node->group_states) { | |
const ExecutorGroup& executor_group = group_state.executor_group; | |
ScheduleState* state = group_state.state.get(); | |
// 更新内存资源使用 | |
state->UpdateMemoryRequirements(pool_config); | |
// ... | |
// 有任何一个 group 拒绝调度都会拒绝执行,这里面包括剩余 slot、剩余内存等 | |
if (RejectForSchedule(*state, pool_config, &rejection_reason)) { | |
DCHECK(!rejection_reason.empty()); | |
queue_node->not_admitted_reason = rejection_reason; | |
return false; | |
} | |
// 检查资源池资源 | |
if (CanAdmitRequest(*state, pool_config, admit_from_queue, | |
&queue_node->not_admitted_reason, &queue_node->not_admitted_details, | |
coordinator_resource_limited)) { | |
// 提交任务 | |
queue_node->admitted_schedule = std::move(group_state.state); | |
return true; | |
} else { | |
VLOG_RPC << "Cannot admit query " << queue_node->admission_request.query_id | |
<< " to group " << group_name << ": " << queue_node->not_admitted_reason | |
<< " Details:" << queue_node->not_admitted_details; | |
} | |
} | |
return true; | |
} |
# 执行
准入通过之后就提交给各个 Executor 执行。
Status Coordinator::Exec() { | |
// ... | |
// 初始化状态 | |
InitFragmentStats(); | |
InitBackendStates(); | |
exec_summary_.Init(exec_params_); | |
if (filter_mode_ != TRuntimeFilterMode::OFF) { | |
InitFilterRoutingTable(); | |
} | |
// 开始执行 | |
RETURN_IF_ERROR(StartBackendExec()); | |
// 结束执行 | |
RETURN_IF_ERROR(FinishBackendStartup()); | |
// ... | |
return Status::OK(); | |
} | |
Status Coordinator::StartBackendExec() { | |
int num_backends = backend_states_.size(); | |
// 多个 Executor 协同,ExecAsync 中每个 Executor 执行完会 notify 下 | |
backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends)); | |
// ... | |
for (BackendState* backend_state: backend_states_) { | |
// ... | |
// 异步执行 | |
backend_state->ExecAsync(debug_options, *filter_routing_table_, query_ctx_slice, | |
&exec_rpcs_status_barrier_); | |
} | |
// 等待所有 Executor 执行结束 | |
Status exec_rpc_status = exec_rpcs_status_barrier_.Wait(); | |
if (!exec_rpc_status.ok()) { | |
// 有 Executor 执行失败,取消执行 | |
CancelBackends(/*fire_and_forget=*/ true); | |
// 等待锁释放 | |
WaitOnExecRpcs(); | |
// ... | |
return UpdateExecState(exec_rpc_status, nullptr, FLAGS_hostname); | |
} | |
// 等待锁释放 | |
WaitOnExecRpcs(); | |
// ... | |
return Status::OK(); | |
} |
接下来 Coordinator 像各个 Executor 提交查询请求。
void Coordinator::BackendState::ExecAsync(const DebugOptions& debug_options, | |
const FilterRoutingTable& filter_routing_table, | |
const kudu::Slice& serialized_query_ctx, | |
TypedCountingBarrier<Status>* exec_status_barrier) { | |
{ | |
lock_guard<mutex> l(lock_); | |
// ... | |
// 异步执行查询 | |
proxy->ExecQueryFInstancesAsync(request, &exec_response_, &exec_rpc_controller_, | |
std::bind(&Coordinator::BackendState::ExecCompleteCb, this, exec_status_barrier, | |
MonotonicMillis())); | |
return; | |
} | |
done: | |
// 唤醒等待的 Coordinator | |
exec_done_cv_.NotifyAll(); | |
} |
代理调用 rpc 接口,向 Executor 发起请求。
void ControlServiceProxy::ExecQueryFInstancesAsync( | |
const ExecQueryFInstancesRequestPB& req, | |
ExecQueryFInstancesResponsePB* resp, | |
::kudu::rpc::RpcController* controller, | |
const ::kudu::rpc::ResponseCallback& callback) { | |
static const std::string kRpcName = "ExecQueryFInstances"; | |
// 异步 rpc 请求 ExecQueryFInstances 接口 | |
AsyncRequest(kRpcName, req, resp, controller, callback); | |
} |
这里到了 Executor 的接口。
void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request, | |
ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context) { | |
// ... | |
// 执行 | |
Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery( | |
request, query_ctx, fragment_info); | |
// ... | |
// 返回结果 | |
RespondAndReleaseRpc(resp_status, response, rpc_context); | |
} |
创建线程执行查询片段。
Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request, | |
const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) { | |
// ... | |
// 创建线程执行 | |
unique_ptr<Thread> t; | |
status = Thread::Create("query-exec-mgr", | |
Substitute("query-state-$0", PrintId(query_id)), | |
&QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true); | |
// ... | |
return Status::OK(); | |
} | |
void QueryExecMgr::ExecuteQueryHelper(QueryState* qs) { | |
// 执行片段,这里也不再继续往下看了 | |
if (LIKELY(qs->StartFInstances())) qs->MonitorFInstances(); | |
// ... | |
} |
# 总结
整个流程对应 SQL 查询流程 中的图。
首先,Client 向其中一个 impalad 提交一个查询 SQL;然后通过 JNI 调用 Query Planner 接口生成查询计划,其中会经历 parse、analysis、rewrite 以及 plan 阶段;在 plan 阶段会生成单机执行计划、分布式执行计划和并行执行计划;拿到执行计划后,Query Coordinator 通过 rpc 接口调用 Query Executor 真正执行每个计划片段;然后执行结果返回给 Query Coordinator,最终由它汇总返回给 Client。