Runtime filtering is a wide-ranging optimization feature available in Impala. When only a fraction of the data in a table is needed for a query against a partitioned table or to evaluate a join condition, Impala determines the appropriate conditions while the query is running, and broadcasts that information to all the impalad nodes that are reading the table so that they can avoid unnecessary I/O to read partition data, and avoid unnecessary network transmission by sending only the subset of rows that match the join keys across the network.
Runtime filtering is primarily used:
- To optimize queries against large partitioned tables (under the name dynamic partition pruning)
- To optimize joins of large tables
- plan fragment
- Impala decomposes each query into smaller units of work that are distributed across the cluster. Wherever possible, a data block is read, filtered, and aggregated by plan fragments executing on the same host. For some operations, such as joins and combining intermediate results into a final result set, data is transmitted across the network from one Impala daemon to another.
HASH JOINplan nodes
In the Impala query plan, a scan node performs the I/O to read from the underlying data files. Although this is an expensive operation from the traditional database perspective, Hadoop clusters and Impala are optimized to do this kind of I/O in a highly parallel fashion. The major potential cost savings come from using the columnar Parquet format (where Impala can avoid reading data for unneeded columns) and partitioned tables (where Impala can avoid reading data for unneeded partitions).
Most Impala joins use the hash join mechanism. (It is only fairly recently that Impala started using the nested-loop join technique, for certain kinds of non-equijoin queries.) In a hash join, when evaluating join conditions from two tables, Impala constructs a hash table in memory with all the different column values from the table on one side of the join. Then, for each row from the table on the other side of the join, Impala tests whether the relevant column values are in this hash table or not.
- hash join
- In a hash join, when evaluating join conditions from two tables,
Impala constructs a hash table in memory with all the different
column values from the table on one side of the join. Then, for each
row from the table on the other side of the join, Impala tests
whether the relevant column values are in this hash table or not.
A hash join node constructs such an in-memory hash table, then performs the comparisons to identify which rows match the relevant join conditions and should be included in the result set (or at least sent on to the subsequent intermediate stage of query processing). Because some of the input for a hash join might be transmitted across the network from another host, it is especially important from a performance perspective to prune out ahead of time any data that is known to be irrelevant.
The more distinct values are in the columns used as join keys, the larger the in-memory hash table and thus the more memory required to process the query.
- broadcast join vs shuffle join
- In a broadcast join, the table from one side of the join (typically the smaller table) is sent in its entirety to all the hosts involved in the query. Then each host can compare its portion of the data from the other (larger) table against the full set of possible join keys. In a shuffle join, there is no obvious “smaller” table, and so the contents of both tables are divided up, and corresponding portions of the data are transmitted to each host involved in the query.
- A shuffle join is sometimes referred to in Impala as a partitioned join.
- build and probe phases
- When Impala processes a join query, the build phase is where the rows containing the join key columns, typically for the smaller table, are transmitted across the network and built into an in-memory hash table data structure on one or more destination nodes. The probe phase is where data is read locally (typically from the larger table) and the join key columns are compared to the values in the in-memory hash table. The corresponding input sources (tables, subqueries, and so on) for these phases are referred to as the build side and the probe side.
The filter that is transmitted between plan fragments is essentially a list of values for join key columns. When this list of values is transmitted in time to a scan node, Impala can filter out non-matching values immediately after reading them, rather than transmitting the raw data to another host to compare against the in-memory hash table on that host.
- Bloom filter: For HDFS-based tables, the Bloom filter uses a probability-based algorithm to determine all possible matching values. The probability-based aspects means that the filter might include some non-matching values, but if so, that does not cause any inaccuracy in the final results.
- Min-max filter: The filter is a data structure representing a minimum and maximum value. These filters are passed to Kudu to reduce the number of rows returned to Impala when scanning the probe side of the join. This filter currently only applies to Kudu tables.
- Broadcast filter: A broadcast filter reflects the
complete list of relevant values and can be immediately evaluated by
a scan node.
Broadcast filters are also classified as local or global. With a local broadcast filter, the information in the filter is used by a subsequent query fragment that is running on the same host that produced the filter. A non-local broadcast filter must be transmitted across the network to a query fragment that is running on a different host. Impala designates 3 hosts to each produce non-local broadcast filters, to guard against the possibility of a single slow host taking too long. Depending on the setting of the
RUNTIME_FILTER_MODEquery option (
GLOBAL), Impala either uses a conservative optimization strategy where filters are only consumed on the same host that produced them, or a more aggressive strategy where filters are eligible to be transmitted across the network. The default for runtime filtering is the
- Partitioned filter: A partitioned filter reflects only the values processed by one host in the cluster; all the partitioned filters must be combined into one (by the coordinator node) before the scan nodes can use the results to accurately filter the data as it is read from storage.
File Format Considerations for Runtime Filtering
Parquet tables get the most benefit from the runtime filtering optimizations. Runtime filtering can speed up join queries against partitioned or unpartitioned Parquet tables, and single-table queries against partitioned Parquet tables.
For other file formats (text, Avro, RCFile, and SequenceFile), runtime filtering speeds up queries against partitioned tables only. Because partitioned tables can use a mixture of formats, Impala produces the filters in all cases, even if they are not ultimately used to optimize the query.
Wait Intervals for Runtime Filters
Because it takes time to produce runtime filters, especially for
partitioned filters that must be combined by the coordinator node, there
is a time interval above which it is more efficient for the scan nodes
to go ahead and construct their intermediate result sets, even if that
intermediate data is larger than optimal. If it only takes a few seconds
to produce the filters, it is worth the extra time if pruning the
unnecessary data can save minutes in the overall query time. You can
specify the maximum wait time in milliseconds using the
RUNTIME_FILTER_WAIT_TIME_MS query option.
By default, each scan node waits for up to 1 second (1000 milliseconds) for filters to arrive. If all filters have not arrived within the specified interval, the scan node proceeds, using whatever filters did arrive to help avoid reading unnecessary data. If a filter arrives after the scan node begins reading data, the scan node applies that filter to the data that is read after the filter arrives, but not to the data that was already read.
If the cluster is relatively busy and your workload contains many resource-intensive or long-running queries, consider increasing the wait time so that complicated queries do not miss opportunities for optimization. If the cluster is lightly loaded and your workload contains many small queries taking only a few seconds, consider decreasing the wait time to avoid the 1 second delay for each query.
Query Options for Runtime Filtering
The following query options control runtime filtering.
This query option controls how extensively the filters are transmitted between hosts. By default, it is set to the highest level (
The other query options are tuning knobs that you typically only adjust after doing performance testing, and that you might want to change only for the duration of a single expensive query.
Runtime Filtering and Query Plans
In the same way the query plan displayed by the
EXPLAIN statement includes information about
predicates used by each plan fragment, it also includes annotations
showing whether a plan fragment produces or consumes a runtime filter.
- A plan fragment that produces a filter includes an annotation such
runtime filters: filter_id <- table.column
- A plan fragment that consumes a filter includes an annotation such
runtime filters: filter_id -> table.column
EXPLAIN_LEVEL=2adds additional annotations showing the type of the filter:
filter_id[bloom](for HDFS-based tables)
filter_id[min_max](for Kudu tables)
The query profile (displayed by the
in impala-shell) contains both the
EXPLAIN plan and more detailed information about the
internal workings of the query. The profile output includes the
Filter routing table section with information about
each filter based on its ID.
Tuning and Troubleshooting Queries that Use Runtime Filtering
These tuning and troubleshooting procedures apply to queries that are resource-intensive enough, long-running enough, and frequent enough that you can devote special attention to optimizing them individually.
EXPLAIN statement and examine the
runtime filters: lines to determine whether runtime
filters are being applied to the
WHERE predicates and
join clauses that you expect. For example, runtime filtering does not
apply to queries that use the nested loop join mechanism due to
Make sure statistics are up-to-date for all tables involved in the
queries. Use the
COMPUTE STATS statement after loading
data into non-partitioned tables, and
STATS after adding new partitions to partitioned tables.
If join queries involving large tables use unique columns as the join
keys, for example joining a primary key column with a foreign key
column, the overhead of producing and transmitting the filter might
outweigh the performance benefit because not much data could be pruned
during the early stages of the query. For such queries, consider setting
the query option