Performance Best Practices
Use the performance guidelines and best practices during planning, experimentation, and performance tuning for an Impala-enabled cluster.
Choose the appropriate file format for the data
Typically, for large volumes of data (multiple gigabytes per table or partition), the Parquet file format performs best because of its combination of columnar storage layout, large I/O request size, and compression and encoding.
Avoid data ingestion processes that produce many small files
When producing data files outside of Impala, prefer either text format or Avro, where you can build up the
files row by row. Once the data is in Impala, you can convert it to the more efficient Parquet format and
split into multiple data files using a single
INSERT ... SELECT statement. Or, if you have
the infrastructure to produce multi-megabyte Parquet files as part of your data preparation process, do
that and skip the conversion step inside Impala.
INSERT ... SELECT to copy significant
volumes of data from table to table within Impala. Avoid
... VALUES for any substantial volume of data or
performance-critical tables, because each such statement produces a
separate tiny data file.
For example, if you have thousands of partitions in a Parquet table, each with less than
256 MB of data, consider partitioning in a less granular way, such as by
year / month rather than year / month / day. If an inefficient data ingestion process produces thousands of
data files in the same table or partition, consider compacting the data by performing an
SELECT to copy all the data to a different table; the data will be reorganized into a smaller
number of larger files by this process.
Choose partitioning granularity based on actual data volume
Partitioning is a technique that physically divides the data based on values of one or more columns, such as by year, month, day, region, city, section of a web site, and so on. When you issue queries that request a specific value or range of values for the partition key columns, Impala can avoid reading the irrelevant data, potentially yielding a huge savings in disk I/O.
When deciding which column(s) to use for partitioning, choose the right level of granularity. For example, should you partition by year, month, and day, or only by year and month? Choose a partitioning strategy that puts at least 256 MB of data in each partition, to take advantage of HDFS bulk I/O and Impala distributed queries.
Over-partitioning can also cause query planning to take longer than necessary, as Impala prunes the unnecessary partitions. Ideally, keep the number of partitions in the table under 30 thousand.
When preparing data files to go in a partition directory, create several large files rather than many small
ones. If you receive data in the form of many small files and have no control over the input format,
consider using the
INSERT ... SELECT syntax to copy data from one table or partition to
another, which compacts the files into a relatively small number (based on the number of nodes in the
If you need to reduce the overall number of partitions and increase the amount of data in each partition, first look for partition key columns that are rarely referenced or are referenced in non-critical queries (not subject to an SLA). For example, your web site log data might be partitioned by year, month, day, and hour, but if most queries roll up the results by day, perhaps you only need to partition by year, month, and day.
If you need to reduce the granularity even more, consider creating
“buckets”, computed values corresponding to different sets of
partition key values. For example, you can use the
TRUNC() function with a
column to group date and time values based on intervals such as week or
Use smallest appropriate integer types for partition key columns
Although it is tempting to use strings for partition key columns, since those values are turned into HDFS
directory names anyway, you can minimize memory usage by using numeric values for common partition key
fields such as
DAY. Use the smallest
integer type that holds the appropriate range of values, typically
EXTRACT() function to pull out individual date and time fields from a
TIMESTAMP value, and
CAST() the return value to the appropriate integer
Choose an appropriate Parquet block size
By default, the Impala
INSERT ... SELECT statement creates Parquet files with a 256 MB
block size. (This default was changed in Impala 2.0. Formerly, the limit was 1 GB, but Impala made
conservative estimates about compression, resulting in files that were smaller than 1 GB.)
Each Parquet file written by Impala is a single block, allowing the whole file to be processed as a unit by a single host.
As you copy Parquet files into HDFS or between HDFS filesystems, use
hdfs dfs -pb to preserve the original
If there is only one or a few data block in your Parquet table, or in a partition that is the only one
accessed by a query, then you might experience a slowdown for a different reason: not enough data to take
advantage of Impala's parallel distributed queries. Each data block is processed by a single core on one of
the DataNodes. In a 100-node cluster of 16-core machines, you could potentially process thousands of data
files simultaneously. You want to find a sweet spot between “many tiny files” and “single giant
file” that balances bulk I/O and parallel processing. You can set the
query option before doing an
INSERT ... SELECT statement to reduce the size of each
generated Parquet file. (Specify the file size as an absolute number of bytes, or in Impala
2.0 and later, in units ending with
m for megabytes or
gigabytes.) Run benchmarks with different file sizes to find the right balance point for your
particular data volume.
Gather statistics for all tables used in performance-critical or high-volume join queries
Gather the statistics with the
Minimize the overhead of transmitting results back to the client
Use techniques such as:
Aggregation. If you need to know how many rows match a condition, the total values of matching values
from some column, the lowest or highest matching value, and so on, call aggregate functions such as
MAX()in the query rather than sending the result set to an application and doing those computations there. Remember that the size of an unaggregated result set could be huge, requiring substantial time to transmit across the network.
Filtering. Use all applicable tests in the
WHEREclause of a query to eliminate rows that are not relevant, rather than producing a big result set and filtering it using application logic.
LIMITclause. If you only need to see a few sample values from a result set, or the top or bottom values from a query using
ORDER BY, include the
LIMITclause to reduce the size of the result set rather than asking for the full result set and then throwing most of the rows away.
- Avoid overhead from pretty-printing the result set and displaying
it on the screen. When you retrieve the results through
impala-shell, use impala-shell
options such as
--output_delimiterto produce results without special formatting, and redirect output to a file rather than printing to the screen. Consider using
INSERT ... SELECTto write the results directly to new files in HDFS.
Verify that your queries are planned in an efficient logical manner
EXPLAIN plan for a query before actually
Verify performance characteristics of queries
Verify that the low-level aspects of I/O, memory usage, network bandwidth, CPU utilization, and so on are within expected ranges by examining the query profile for a query after running it.
In the context of Impala, a hotspot is defined as “an Impala daemon that for a single query or a workload is spending a far greater amount of time processing data relative to its neighbours”.
Before discussing the options to tackle this issue, some background is first required to understand how this problem can occur.
By default, the scheduling of scan based plan fragments is deterministic. This means that for multiple queries needing to read the same block of data, the same node will be picked to host the scan. The default scheduling logic does not take into account node workload from prior queries. The complexity of materializing a tuple depends on a few factors, namely: decoding and decompression. If the tuples are densely packed into data pages due to good encoding/compression ratios, there will be more work required when reconstructing the data. Each compression codec offers different performance tradeoffs and should be considered before writing the data. Due to the deterministic nature of the scheduler, single nodes can become bottlenecks for highly concurrent queries that use the same tables.
If, for example, a Parquet based dataset is tiny, e.g. a small dimension table, such that it fits into a single HDFS block (Impala by default will create 256 MB blocks when Parquet is used, each containing a single row group) then there are a number of options that can be considered to resolve the potential scheduling hotspots when querying this data:
- Te scheduler’s deterministic behaviour can be changed using the
following query options:
- HDFS caching can be used to cache block replicas. This will cause the Impala scheduler to randomly pick a node that is hosting a cached block replica for the scan. Note, although HDFS caching has benefits, it serves only to help with the reading of raw block data and not cached tuple data, but with the right number of cached replicas (by default, HDFS only caches one replica), even load distribution can be achieved for smaller datasets.
- Do not compress the table data. The uncompressed table data spans more nodes and eliminates skew caused by compression.
Reduce the Parquet file size via the
PARQUET_FILE_SIZEquery option when writing the table data. Using this approach the data will span more nodes. However it’s not recommended to drop the size below 32 MB.