Predicate Pushdown in Parquet

When executing queries in the most generic and basic manner, filtering happens very late in the process. Moving filtering to an earlier phase of query execution provides significant performance gains by eliminating non-matches earlier, and therefore saving the cost of processing them at a later stage. This group of optimizations is collectively known as predicate pushdown.

Predicate Pushdown in parquet-mr (Hive, Spark)

When filtering query results, a consumer of the parquet-mr API (for example, Hive or Spark) can fetch all records from the API and then evaluate each record against the predicates of the filtering condition. However, this requires assembling all records in memory, even non-matching ones. With predicate pushdown, these conditions are passed to the parquet-mr library instead, which evaluates the predicates on a lower level and discards non-matching records without assembling them first.

For example, when evaluating the record {title: "The Starry Night", width: 92, height: 74} against the condition height > 80, it is not necessary to assemble the whole record, because it can be discarded solely based on its height attribute. However, while the condition height > width does not match the record either, predicate pushdown cannot be used in this case, because we need multiple fields of the same record to evaluate the predicate.

Additionally, predicate pushdown also allows discarding whole row groups that cannot contain any matches based on their min/max statistics. For example, if the statistics of a row group include {min: 62, max: 78} for the height column and the filtering condition is height > 80, then none of the records in that row group can match, thus the whole row group can be discarded.

Predicate Pushdown in Impala

Predicate pushdown in Impala works slightly differently than in parquet-mr. Impala has a lower-level in-memory data representation, therefore record assembly is not as expensive and it does not justify pushing predicates to a pre-assembly phase. However, moving predicates from the last step of the query execution to an earlier phase still makes sense.

One example when this happens is in queries involving both joins and filters. A condition involving a single table can be pushed down to happen before the join (even if it references multiple columns). For example, without predicate pushdown, the query SELECT * FROM museum m JOIN painting p ON p.museumid = m.id WHERE p.width > 120 AND p.height > 150 joins the two tables, and after that it filters out the non-matching rows. With predicate pushdown, however, non-matching rows of the painting table can be filtered out and the museum table joins to the matching paintings.

In addition, discarding whole row groups based on min/max statistics also works in Impala, similar to parquet-mr.

In Impala, the query plan shows when predicate evaluation happens during query execution, therefore the effect of predicate pushdown can be seen in the query plan.

In the following example, the predicate's presence inside SCAN HDFS shows that predicate pushdown is taking place:

Query: explain select * from alltypes where id < 100

+------------------------------------------------------------------------------------+
| Explain String                                                                     |
+------------------------------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=88.00KB Threads=3                        |
| Per-Host Resource Estimates: Memory=64MB                                           |
|                                                                                    |
| PLAN-ROOT SINK                                                                     |
| |                                                                                  |
| 01:EXCHANGE [UNPARTITIONED]                                                        |
| |                                                                                  |
| 00:SCAN HDFS [functional_parquet.alltypes]                                         |
|    partitions=24/24 files=24 size=198.37KB                                         |
|    predicates: id < 100                                                            |
+------------------------------------------------------------------------------------+

If the predicate is not inside SCAN HDFS in the query plan, predicate pushdown does not take place. The following query plan shows that filtering rows returned by a subquery with a limit clause prevent predicate pushdown:

Query: explain select * from (select * from alltypes limit 10) v where id < 100

+------------------------------------------------------------------------------------+
| Explain String                                                                     |
+------------------------------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=88.00KB Threads=2                        |
| Per-Host Resource Estimates: Memory=16MB                                           |
| Codegen disabled by planner                                                        |
|                                                                                    |
| PLAN-ROOT SINK                                                                     |
| |                                                                                  |
| 01:SELECT                                                                          |
| |  predicates: functional_parquet.alltypes.id < 100                                
| |                                                                                  |
| 00:SCAN HDFS [functional_parquet.alltypes]                                         |
|    partitions=24/24 files=24 size=198.37KB                                         |
|    limit: 10                                                                       |
+------------------------------------------------------------------------------------+

Predicate Pushdown Configuration

Hive

In Hive, the hive.optimize.ppd setting controls whether predicate pushdown optimizations are enabled at all, while the hive.optimize.ppd.storage setting controls whether predicates are pushed to the storage layer (the parquet-mr library). Both settings need to be set to true for discarding individual records and for discarding whole row groups in Parquet. Both settings are true (enabled) by default.

Spark

In spark, the spark.sql.parquet.filterPushdown setting controls pushing down predicates to Parquet for discarding individual records. In addition to this, spark.hadoop.parquet.filter.stats.enabled also must be enabled for discarding whole row groups. Both settings are true (enabled) by default.

Impala

In Impala, predicate pushdown for discarding individual records cannot be disabled. There are several query options to toggle specific optimizations (all are enabled by default):
  • parquet_read_statistics

    If true, whole row groups can be discarded if there is a single-column filter that is not satisfied by any value in the related column chunk according to min/max/null count stats.

  • parquet_dictionary_filtering

    If true, whole row groups can be discarded if there is a single-column filter that is not satisfied by any value in the related column chunk’s dictionary (only applied if all pages are dictionary encoded).

  • disable_row_runtime_filtering

    If true, individual rows can be discarded on the probe side of a join if they do not match the bloom filter created from the build side of the join. The bloom filter is created using all the = relations in join, so this optimization is only applied if there is at least one = relation. Unlike the previous optimizations, where skipping row groups leads to reduced IO/CPU usage, this optimization is mainly used to reduce network usage.

Predicate Pushdown Support by Data Type

Discarding individual records is supported for all data types. Discarding whole row groups depends on statistics support, which is limited for some data types.

Due to a historical issue, the min and max fields for decimal, string, and int96 timestamp types do not follow the correct sort order and are therefore unsuitable for comparisons and automatically get ignored.

To mitigate this issue, new min_value and max_value fields got introduced in CDH 5.13 that correctly handle strings and decimals as well. Int96 timestamps are still not supported. You can read more about these new statistics fields in the blog post Faster Performance for Selective Queries.