微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Impala fe query plan

query paln

一 基础知识

Impala(be->fe->be)(这里讲解不够细节后面详细展开,这里不赘述)

Be->Coordinator(Fe->be)->Be Executor->Coordinator

1 PlanNode

(ExchangeNode 、 ScanNode、HashJoinNode 、AggregationNode)

ScanNode
ExchangeNode
HashJoinNode
AggregationNode
...

2 PlanFragemnt

单个PlanFragement 可独立执行,虽PlanFragement 之间有依赖,Impala并发也是以Fragement 做并发

调用

impala fe 接受 be 请求
public byte[] createExecRequest(byte[] thriftQueryContext)
    TExecRequest result = frontend_.createExecRequest(planCtx);
       TExecRequest result = getTExecRequest(planCtx, timeline);
            TExecRequest req = doCreateExecRequest(planCtx, timeline);
                TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
                TQueryExecRequest queryExecRequest = getPlannedExecRequest(planCtx, analysisResult, timeline);
                    TQueryExecRequest queryExecRequest = createExecRequest(planner, planCtx);
                    List<PlanFragment> planRoots = planner.createPlans();(这里便是查询计划的生成)
                result.setQuery_exec_request(queryExecRequest);
​
​
public List<PlanFragment> createPlans() throws ImpalaException {
createPlans
List<PlanFragment> distrPlan = createPlanFragments();
    createPlanFragments
    SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
    distributedplanner distributedplanner = new distributedplanner(ctx_);
    PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
    createSingleNodePlan
        createqueryPlan
            createSelectPlan
            createCheapestJoinPlan
                createJoinPlan(获得最优的SingleNodePlan,CBO)
​
​
    根据singleNodePlan 生成分布式fragemnts 
    fragments = distributedplanner.createPlanFragments(singleNodePlan);
    createPlanFragments(createPlanFragments 递归)
        createHashJoinFragment/createScanFragment(这里为关联的节点构建ExchangeNode)
​
​
    rootFragment.verifyTreeCtx(ctx_);
    (这里有一些比较关键的逻辑
    1. ExchangeNode.size == ChildrenNode.size
    2. collectPlanNodes 获取当前PlanNode 及Children的所有Node
​
​
ParallelPlanner planner = new ParallelPlanner(ctx_);
List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
    createPlans
    createBuildplans(root, null);
        createBuildplans
            1. collectJoins()
            2. list join join and createBuildplan (create Join build by ExChangeNode)
            3. list children and createBuildplan(children)(遍历子节点的)

三 重要调用

createJoinPlan

创建非并发的PlanNode

computeParentAndSubplanRefs 统计所有表及表相相关的查询信息,比如join 
​
parentRefPlans 存储了所有表的信息,保存在remainingRefs,createJoinPlan 会遍历所有的表,根据 candidate 排序,选择最大的一个表作为根
  // consumption of the materialized hash tables required for the join sequence
  Collections.sort(candidates,
                   new Comparator<Pair<TableRef, Long>>() {
                     @Override
                     public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
                       long diff = b.second - a.second;
                       return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
                     }
                   });
​
遍历跟当前表所有相关的表,计算他们相关的计算比如HashJoin 的代价 Cardinality,选择最小的Cardinality 作为优先的Join 顺序.遍历完成当前表,则进入下一张表,从remainingRefs 剔除已经计算过的表.直至从remainingRefs 为空,死循环结束
  
while(true)
{
   getMinCardinary();
   remove(minentry)
}

createHashJoinFragment

创建 Hash Join Fragement

​
1. broadcast join 
2. shuffle join 
​
​
执行流程
1. 获取左子树 lhsTree getCardinality
2. 获取右子树 rhsTree getCardinality
​
3. 推测join 算法,构建Exchange Node,
4. 在 lhsTree, rhsTree 选择Cardinality大的问题左子树,即根节点
​
5. Partiton Hash Join 
(推测表是否为Partiton 分区,优化查询计划)

verifyTreeCtx

检查PlanNode 的完整性 ,主要检查ExchangNode 与Children Node 的数量

1. coll   public List<PlanNode> collectPlanNodes() {
    List<PlanNode> nodes = new ArrayList<>();
    collectPlanNodesHelper(planRoot_, Predicates.alwaysTrue(), nodes);
    return nodes;
  }

createBuildplans

根据ExchangeNode 创建Join Builder

   */
  private void createBuildplans(PlanFragment fragment, CohortId buildCohortId) {
    List<JoinNode> joins = new ArrayList<>();
    collectJoins(fragment.getPlanRoot(), joins);
    if (!joins.isEmpty()) {
      if (buildCohortId == null) buildCohortId = cohortIdGenerator_.getNextId();
      for (JoinNode join: joins) createBuildplan(join, buildCohortId);
    }
​
    for (PlanFragment child: fragment.getChildren()) {
      // We already recursed on the join build fragment in createBuildplan().
      if (child.getSink() instanceof JoinBuildSink) continue;
      // Propagate the plan and cohort IDs to children that are part of the same plan.
      child.setPlanId(fragment.getPlanId());
      child.setCohortId(fragment.getCohortId());
      createBuildplans(child, buildCohortId);
    }
  }
​

下一篇 讲impala be query plan

原文地址:https://cloud.tencent.com/developer/article/2126266

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐