Planner changes to improve cardinality estimation

Changes have been implemented in the query planner to improve cardinality estimation, constituting a pivotal element of workload-aware autoscaling.

In earlier releases, Impala would generate a plan initially and then search for runtime filters based on the entire plan. In this release, selective runtime filters have been integrated. These filters serve to diminish the cardinality estimates of scan nodes and specific join nodes situated above them. This adjustment occurs after the generation of runtime filters and before the computation of resource requirements.

The cardinality reduction is implemented across all execution modes, but its effectiveness is currently contingent on the COMPUTE_PROCESSING_COST option being set to True. This condition exists because the optimal benefits of diminished scan cardinality are realized in scenarios where multiple executor group setups are employed. Reduced scan cardinality estimation has the potential to decrease Processing Costs, diminish scan fragment parallelism, and enhance the likelihood of assigning queries to smaller executor group sets. It's worth noting that other execution modes, such as MT_DOP>0 or the legacy scanner parallelism mode, will not experience any changes in their execution plans.

Before you begin

  • You must obtain the entitlement to use this feature.

Example

The following example demonstrates the execution of TPC-DS Query 3 at a scale factor of 1, utilizing 3 executor nodes, with the COMPUTE_PROCESSING_COST parameter set to True.

 Query:
   use tpcds_partitioned_parquet_snap;
   set COMPUTE_PROCESSING_COST=true;
   set PROCESSING_COST_MIN_THREADS=2;
   set MAX_FRAGMENT_INSTANCES_PER_NODE=16;
   
   -- TPCDS-Q3
   select dt.d_year,
   item.i_brand_id brand_id,
   item.i_brand brand,
   sum(ss_ext_sales_price) sum_agg
   from
   date_dim dt,
   store_sales,
   item
   where
   dt.d_date_sk = store_sales.ss_sold_date_sk
   and store_sales.ss_item_sk = item.i_item_sk
   and item.i_manufact_id = 436
   and dt.d_moy = 12
   group by
   dt.d_year,
   item.i_brand,
   item.i_brand_id
   order by
   dt.d_year,
   sum_agg desc,
   brand_id
   limit 100

Plan:
Max Per-Host Resource Reservation: Memory=33.25MB Threads=14
Per-Host Resource Estimates: Memory=169MB
Analyzed query: SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,
sum(ss_ext_sales_price) sum_agg FROM tpcds_partitioned_parquet_snap.date_dim dt,
tpcds_partitioned_parquet_snap.store_sales, tpcds_partitioned_parquet_snap.item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk AND store_sales.ss_item_sk =
item.i_item_sk AND item.i_manufact_id = CAST(436 AS INT) AND dt.d_moy = CAST(12
AS INT) GROUP BY dt.d_year, item.i_brand, item.i_brand_id ORDER BY dt.d_year
ASC, sum_agg DESC, brand_id ASC LIMIT CAST(100 AS TINYINT)

F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
|  Per-Instance Resources: mem-estimate=4.03MB mem-reservation=4.00MB thread-reservation=1
|  max-parallelism=1 segment-costs=[406] cpu-comparison-result=18 [max(1 (self) vs 18 (sum children))]
PLAN-ROOT SINK
|  output exprs: dt.d_year, item.i_brand_id, item.i_brand, sum(ss_ext_sales_price)
|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=400
|
12:MERGING-EXCHANGE [UNPARTITIONED]
|  order by: dt.d_year ASC, sum(ss_ext_sales_price) DESC, item.i_brand_id ASC
|  limit: 100
|  mem-estimate=34.63KB mem-reservation=0B thread-reservation=0
|  tuple-ids=4 row-size=52B cardinality=100 cost=6
|  in pipelines: 06(GETNEXT)
|
F04:PLAN FRAGMENT [HASH(dt.d_year,item.i_brand,item.i_brand_id)] hosts=3 instances=6 (adjusted from 48)
Per-Instance Resources: mem-estimate=10.38MB mem-reservation=1.94MB thread-reservation=1
max-parallelism=6 segment-costs=[12331, 300, 6] cpu-comparison-result=18 [max(6 (self) vs 18 (sum children))]
06:TOP-N [LIMIT=100]
|  order by: dt.d_year ASC, sum(ss_ext_sales_price) DESC, item.i_brand_id ASC
|  mem-estimate=5.10KB mem-reservation=0B thread-reservation=0
|  tuple-ids=4 row-size=52B cardinality=100 cost=300
|  in pipelines: 06(GETNEXT), 11(OPEN)
|
11:AGGREGATE [FINALIZE]
|  output: sum:merge(ss_ext_sales_price)
|  group by: dt.d_year, item.i_brand, item.i_brand_id
|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
|  tuple-ids=3 row-size=52B cardinality=3.04K cost=12164
|  in pipelines: 11(GETNEXT), 01(OPEN)
|
10:EXCHANGE [HASH(dt.d_year,item.i_brand,item.i_brand_id)]
|  mem-estimate=388.85KB mem-reservation=0B thread-reservation=0
|  tuple-ids=3 row-size=52B cardinality=3.04K cost=167
|  in pipelines: 01(GETNEXT)
|
F03:PLAN FRAGMENT [HASH(store_sales.ss_sold_date_sk)] hosts=3 instances=6 (adjusted from 48)
Per-Instance Resources: mem-estimate=12.17MB mem-reservation=2.00MB thread-reservation=1
max-parallelism=6 segment-costs=[15408, 167] cpu-comparison-result=18 [max(18 (self) vs 11 (sum children))]
05:AGGREGATE [STREAMING]
|  output: sum(ss_ext_sales_price)
|  group by: dt.d_year, item.i_brand, item.i_brand_id
|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
|  tuple-ids=3 row-size=52B cardinality=3.04K cost=12164
|  in pipelines: 01(GETNEXT)
|
04:HASH JOIN [INNER JOIN, PARTITIONED]
|  hash-table-id=00
|  hash predicates: store_sales.ss_sold_date_sk = dt.d_date_sk
|  fk/pk conjuncts: store_sales.ss_sold_date_sk = dt.d_date_sk
|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
|  tuple-ids=1,2,0 row-size=72B cardinality=3.04K cost=3041
|  in pipelines: 01(GETNEXT), 00(OPEN)
|
|--F06:PLAN FRAGMENT [HASH(store_sales.ss_sold_date_sk)] hosts=3 instances=6 (adjusted from 48)
|  |  Per-Instance Resources: mem-estimate=3.02MB mem-reservation=2.94MB thread-reservation=1 runtime-filters-memory=1.00MB
|  |  max-parallelism=6 segment-costs=[6183]
|  JOIN BUILD
|  |  join-table-id=00 plan-id=01 cohort-id=01
|  |  build expressions: dt.d_date_sk
|  |  runtime filters: RF000[bloom] <- dt.d_date_sk, RF001[min_max] <- dt.d_date_sk
|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0 cost=6087
|  |
|  09:EXCHANGE [HASH(dt.d_date_sk)]
|  |  mem-estimate=87.33KB mem-reservation=0B thread-reservation=0
|  |  tuple-ids=0 row-size=12B cardinality=6.09K cost=96
|  |  in pipelines: 00(GETNEXT)
|  |
|  F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
|  Per-Instance Resources: mem-estimate=16.38MB mem-reservation=512.00KB thread-reservation=1
|  max-parallelism=1 segment-costs=[124002]
|  00:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim dt, RANDOM]
|     HDFS partitions=1/1 files=1 size=2.15MB
|     predicates: dt.d_moy = CAST(12 AS INT)
|     stored statistics:
|       table: rows=73.05K size=2.15MB
|       columns: all
|     extrapolated-rows=disabled max-scan-range-rows=73.05K
|     parquet statistics predicates: dt.d_moy = CAST(12 AS INT)
|     parquet dictionary predicates: dt.d_moy = CAST(12 AS INT)
|     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
|     tuple-ids=0 row-size=12B cardinality=6.09K cost=123906
|     in pipelines: 00(GETNEXT)
|
08:EXCHANGE [HASH(store_sales.ss_sold_date_sk)]
|  mem-estimate=877.96KB mem-reservation=0B thread-reservation=0
|  tuple-ids=1,2 row-size=60B cardinality=3.04K cost=203
|  in pipelines: 01(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12 (adjusted from 48)
Per-Host Shared Resources: mem-estimate=2.00MB mem-reservation=2.00MB thread-reservation=0 runtime-filters-memory=2.00MB
Per-Instance Resources: mem-estimate=17.60MB mem-reservation=1.00MB thread-reservation=1
max-parallelism=12 segment-costs=[91203296]
03:HASH JOIN [INNER JOIN, BROADCAST]
|  hash-table-id=01
|  hash predicates: store_sales.ss_item_sk = item.i_item_sk
|  fk/pk conjuncts: store_sales.ss_item_sk = item.i_item_sk
|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
|  tuple-ids=1,2 row-size=60B cardinality=3.04K cost=3045
|  in pipelines: 01(GETNEXT), 02(OPEN)
|
|--F07:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
|  |  Per-Instance Resources: mem-estimate=8.77MB mem-reservation=8.75MB thread-reservation=1 runtime-filters-memory=1.00MB
|  |  max-parallelism=3 segment-costs=[22]
|  JOIN BUILD
|  |  join-table-id=01 plan-id=02 cohort-id=01
|  |  build expressions: item.i_item_sk
|  |  runtime filters: RF002[bloom] <- item.i_item_sk
|  |  mem-estimate=7.75MB mem-reservation=7.75MB spill-buffer=64.00KB thread-reservation=0 cost=19
|  |
|  07:EXCHANGE [BROADCAST]
|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
|  |  tuple-ids=2 row-size=44B cardinality=19 cost=3
|  |  in pipelines: 02(GETNEXT)
|  |
|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
|  Per-Instance Resources: mem-estimate=16.19MB mem-reservation=256.00KB thread-reservation=1
|  max-parallelism=1 segment-costs=[68778]
|  02:SCAN HDFS [tpcds_partitioned_parquet_snap.item, RANDOM]
|     HDFS partitions=1/1 files=1 size=1.73MB
|     predicates: item.i_manufact_id = CAST(436 AS INT)
|     stored statistics:
|       table: rows=18.00K size=1.73MB
|       columns: all
|     extrapolated-rows=disabled max-scan-range-rows=18.00K
|     parquet statistics predicates: item.i_manufact_id = CAST(436 AS INT)
|     parquet dictionary predicates: item.i_manufact_id = CAST(436 AS INT)
|     mem-estimate=16.00MB mem-reservation=256.00KB thread-reservation=0
|     tuple-ids=2 row-size=44B cardinality=19 cost=68777
|     in pipelines: 02(GETNEXT)
|
01:SCAN HDFS [tpcds_partitioned_parquet_snap.store_sales, RANDOM]
   HDFS partitions=1824/1824 files=1824 size=199.36MB
   runtime filters: RF001[min_max] -> store_sales.ss_sold_date_sk, RF000[bloom] -> store_sales.ss_sold_date_sk, RF002[bloom] -> store_sales.ss_item_sk
   stored statistics:
     table: rows=2.88M size=199.36MB
     partitions: 1824/1824 rows=2.88M
     columns: all
   extrapolated-rows=disabled max-scan-range-rows=130.09K
   mem-estimate=16.00MB mem-reservation=1.00MB thread-reservation=0
   tuple-ids=1 row-size=16B cardinality=3.04K(filtered from 2.88M) cost=91200048
   in pipelines: 01(GETNEXT)