Partitioning
Use partitioning to speed up queries that retrieve data based on values from one or more columns.
Partitioning is a technique for physically dividing the data during
loading based on values from one or more columns. For example, with a
school_records
table partitioned on a
year
column, there is a separate data directory for
each different year value, and all the data for that year is stored in a
data file in that directory. A query that includes a
WHERE
condition such as YEAR=1966
,
YEAR IN (1989,1999)
, or YEAR BETWEEN 1984 AND
1989
can examine only the data files from the appropriate
directory or directories, greatly reducing the amount of data to read and
test.
Parquet is a popular format for partitioned Impala tables because it is well suited to handle huge data volumes.
You can add, drop, set the expected file format, or set the HDFS location of the data files for individual partitions within an Impala table.
When to Use Partitioned Tables
Partitioning is typically appropriate for:
- Tables that are very large, where reading the entire data set takes an impractical amount of time.
- Tables that are always or almost always queried with conditions on
the partitioning columns. In our example of a table partitioned by
year,
SELECT COUNT(*) FROM school_records WHERE year = 1985
is efficient, only examining a small fraction of the data; butSELECT COUNT(*) FROM school_records
has to process a separate data file for each year, resulting in more overall work than in an unpartitioned table. You would probably not partition this way if you frequently queried the table based on last name, student ID, and so on without testing the year. - Columns that have reasonable cardinality (number of different
values). If a column only has a small number of values, for example
Male
orFemale
, you do not gain much efficiency by eliminating only about 50% of the data to read for each query. If a column has only a few rows matching each value, the number of directories to process can become a limiting factor, and the data file in each directory could be too small to take advantage of the Hadoop mechanism for transmitting data in multi-megabyte blocks. For example, you might partition census data by year, store sales data by year and month, and web traffic data by year, month, and day. (Some users with high volumes of incoming data might even partition down to the individual hour and minute.) - Data that already passes through an extract, transform, and load (ETL) pipeline. The values of the partitioning columns are stripped from the original data files and represented by directory names, so loading data into a partitioned table involves some sort of transformation or preprocessing.
SQL Statement for Partitioned Tables
In terms of Impala SQL syntax, partitioning affects these statements:
-
CREATE TABLE
: You specify aPARTITIONED BY
clause when creating the table to identify names and data types of the partitioning columns. These columns are not included in the main list of columns for the table. CREATE TABLE AS SELECT
: Use thePARTITIONED BY
clause to create a partitioned table, copy data into it, and create new partitions based on the values in the inserted data.-
ALTER TABLE
: Add or drop partitions, to work with different portions of a huge data set. You can designate the HDFS directory that holds the data files for a specific partition. With data partitioned by date values, you mightage out
data that is no longer relevant.If you are creating a partition for the first time and specifying its location, for maximum efficiency, use a single ALTER TABLE statement including both the ADD PARTITION and LOCATIONclauses, rather than separate statements with ADD PARTITION and SET LOCATION clauses.
-
INSERT
: When you insert data into a partitioned table, you identify the partitioning columns. One or more values from each inserted row are not stored in data files, but instead determine the directory where that row value is stored. You can also specify which partition to load a set of data into, withINSERT OVERWRITE
statements; you can replace the contents of a specific partition but you cannot append data to a specific partition.By default, if an INSERT statement creates any new subdirectories underneath a partitioned table, those subdirectories are assigned default HDFS permissions for the impala user. To make each subdirectory have the same permissions as its parent directory in HDFS, specify the ‑‑insert_inherit_permissions startup option for the Impala Daemon.
- Although the syntax of the
SELECT
statement is the same whether or not the table is partitioned, the way queries interact with partitioned tables can have a dramatic impact on performance and scalability. The mechanism that lets queries skip certain partitions during a query is known as partition pruning. SHOW PARTITIONS
: Displays information about each partition in a table.
Static and Dynamic Partitioning Clauses
Specifying all the partition columns in a SQL statement is called
static partitioning, because the statement affects a
single predictable partition. For example, you use static partitioning
with an ALTER TABLE
statement that affects only one
partition, or with an INSERT
statement that inserts all
values into the same partition:
INSERT INTO t1 PARTITION(x=10, y='a') SELECT c1 FROM some_other_table;
When you specify some partition key columns in an
INSERT
statement, but leave out the values, Impala
determines which partition to insert. This technique is called
dynamic partitioning.
INTSERT INTO t1 PARTITION(x, y='b') SELECT c1, c2 FROM some_other_table;
-- Create new partition if necessary based on variable year, month, and day; insert a single value.
INSERT INTO weather PARTITION (year, month, day) SELECT 'cloudy',2014,4,21;
-- Create new partition if necessary for specified year and month but variable day; insert a single value.
INSERT INTO weather PARTITION (year=2014, month=04, day) SELECT 'sunny',22;
The more key columns you specify in the PARTITION
clause, the fewer columns you need in the SELECT
list.
The trailing columns in the SELECT
list are substituted
in order for the partition key columns with no specified value.
Refreshing a Single Partition
The REFRESH
statement is typically used with
partitioned tables when new data files are loaded into a partition by
some non-Impala mechanism, such as a Hive or Spark job. The
REFRESH
statement makes Impala aware of the new data
files so that they can be used in Impala queries. Because partitioned
tables typically contain a high volume of data, the
REFRESH
operation for a full partitioned table can
take significant time.
You can include a PARTITION
(partition_spec)
clause in the
REFRESH
statement so that only a single partition is
refreshed. For example, REFRESH big_table PARTITION (year=2017,
month=9, day=30)
. The partition spec must include all the
partition key columns.
Partition Key Columns
The columns you choose as the partition keys should be ones that are frequently used to filter query results in important, large-scale queries. Popular examples are some combination of year, month, and day when the data has associated time values, and geographic region when the data is associated with some place.
-
For time-based data, split out the separate parts into their own columns, because Impala cannot partition based on a
TIMESTAMP
column. -
The data type of the partition columns does not have a significant effect on the storage required, because the values from those columns are not stored in the data files, rather they are represented as strings inside HDFS directory names.
-
You can enable the
OPTIMIZE_PARTITION_KEY_SCANS
query option to speed up queries that only refer to partition key columns, such asSELECT MAX(year)
. This setting is not enabled by default because the query behavior is slightly different if the table contains partition directories without actual data inside. -
All the partition key columns must be scalar types.
-
When Impala queries data stored in HDFS, it is most efficient to use multi-megabyte files to take advantage of the HDFS block size. For Parquet tables, the block size (and ideal size of the data files) is 256 MB.. Therefore, avoid specifying too many partition key columns, which could result in individual partitions containing only small amounts of data. For example, if you receive 1 GB of data per day, you might partition by year, month, and day; while if you receive 5 GB of data per minute, you might partition by year, month, day, hour, and minute. If you have data with a geographic component, you might partition based on postal code if you have many megabytes of data for each postal code, but if not, you might partition by some larger region such as city, state, or country. state
If you frequently run aggregate functions such as
MIN()
, MAX()
, and
COUNT(DISTINCT)
on partition key columns, consider
enabling the OPTIMIZE_PARTITION_KEY_SCANS
query option,
which optimizes such queries.
Setting Different File Formats for Partitions in a Table
Partitioned tables have the flexibility to use different file formats for different partitions. For example, if you originally received data in text format, then received new data in RCFile format, and eventually began receiving data in Parquet format, all that data could reside in the same table for queries. You just need to ensure that the table is structured so that the data files that use different file formats reside in separate partitions.
For example, here is how you might switch from text to Parquet data as you receive data for different years:
[localhost:21000] > CREATE TABLE census (name STRING) PARTITIONED BY (year SMALLINT);
[localhost:21000] > ALTER TABLE census ADD PARTITION (year=2012); -- Text format;
[localhost:21000] > ALTER TABLE census ADD PARTITION (year=2013); -- Text format switches to Parquet before data loaded;
[localhost:21000] > ALTER TABLE census PARTITION (year=2013) SET FILEFORMAT PARQUET;
[localhost:21000] > INSERT INTO census PARTITION (year=2012) VALUES ('Smith'),('Jones'),('Lee'),('Singh');
[localhost:21000] > INSERT INTO census PARTITION (year=2013) VALUES ('Flores'),('Bogomolov'),('Cooper'),('Appiah');
At this point, the HDFS directory for year=2012
contains a text-format data file, while the HDFS directory for
year=2013
contains a Parquet data file. As always,
when loading non-trivial data, you would use INSERT ...
SELECT
or LOAD DATA
to import data in large
batches, rather than INSERT ... VALUES
which produces
small files that are inefficient for real-world queries.
For other file types that Impala cannot create natively, you can
switch into Hive and issue the ALTER TABLE ... SET
FILEFORMAT
statements and INSERT
or
LOAD DATA
statements there. After switching back to
Impala, issue a REFRESH table_name
statement so that Impala recognizes any partitions or new data added
through Hive.
Dropping Partitions
What happens to the data files when a partition is dropped depends on whether the partitioned table is designated as internal or external. For an internal (managed) table, the data files are deleted. For example, if data in the partitioned table is a copy of raw data files stored elsewhere, you might save disk space by dropping older partitions that are no longer required for reporting, knowing that the original data is still available if needed later. For an external table, the data files are left alone. For example, dropping a partition without deleting the associated files lets Impala consider a smaller set of partitions, improving query efficiency and reducing overhead for DDL operations on the table; if the data is needed again later, you can add the partition again.
Using Partitioning with Kudu Tables
Kudu tables use a more fine-grained partitioning scheme than tables
containing HDFS data files. You specify a PARTITION BY
clause with the CREATE TABLE
statement to identify how
to divide the values from the partition key columns.
Keeping Statistics Up to Date for Partitioned Tables
Because the COMPUTE STATS
statement can be
resource-intensive to run on a partitioned table as new partitions are
added, Impala includes a variation of this statement that allows
computing statistics on a per-partition basis such that stats can be
incrementally updated when new partitions are added.
The COMPUTE INCREMENTAL STATS
variation computes
statistics only for partitions that were added or changed since the last
COMPUTE INCREMENTAL STATS
statement, rather than the
entire table. It is typically used for tables where a full
COMPUTE STATS
operation takes too long to be
practical each time a partition is added or dropped.