Using Parquet Data Files
Impala allows you to create, manage, and query Parquet tables. Parquet is a column-oriented binary file format intended to be highly efficient for the types of large-scale queries.
 Parquet is suitable for queries scanning particular columns within a
      table, for example, to query wide
 tables with many columns, or to
      perform aggregation operations such as SUM() and
        AVG() that need to process most or all of the values
      from a column. 
Each Parquet data file written by Impala contains the values for a set of
      rows (referred to as the row group
). Within a data file, the values
      from each column are organized so that they are all adjacent, enabling
      good compression for the values from that column. Queries against a
      Parquet table can retrieve and analyze these values from any column
      quickly and with minimal I/O. 
Creating Parquet Tables
 To create a table in the Parquet format, use the STORED AS
          PARQUET clause in the CREATE TABLE
        statement. For example:
CREATE TABLE parquet_table_name (x INT, y STRING) STORED AS PARQUET; Or, to clone the column names and data types of an existing table, use
        the LIKE with the STORED AS PARQUET
        clause. For example:
CREATE TABLE parquet_table_name LIKE other_table_name STORED AS PARQUET;You can derive column definitions from a raw Parquet data file, even without an existing Impala table. For example, you can create an external table pointing to an HDFS directory, and base the column definitions on one of the files in that directory:
CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
  STORED AS PARQUET
  LOCATION '/user/etl/destination';
 Or, you can refer to an existing data file and create a new empty
        table with suitable column definitions. Then you can use
          INSERT to create new data files or LOAD
          DATA to transfer existing data files into the new table. 
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
  STORED AS PARQUET;
 In this example, the new table is partitioned by year, month, and day.
        These partition key columns are not part of the data file, so you
        specify them in the CREATE TABLE statement: 
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
  PARTITION (year INT, month TINYINT, day TINYINT)
  STORED AS PARQUET;
 If the Parquet table has a different number of columns or different
        column names than the other table, specify the names of columns from the
        other table rather than * in the
          SELECT statement. 
Data Type Considerations for Parquet Tables
The Parquet format defines a set of data types whose names differ from the names of the corresponding Impala data types. If you are preparing Parquet files using other Hadoop components such as Pig or MapReduce, you might need to work with the type names defined by Parquet. The following tables list the Parquet-defined types and the equivalent types in Impala.
Primitive types
| Parquet type | Impala type | 
|---|---|
| BINARY | STRING | 
| BOOLEAN | BOOLEAN | 
| DOUBLE | DOUBLE | 
| FLOAT | FLOAT | 
| INT32 | INT | 
| INT64 | BIGINT | 
| INT96 | TIMESTAMP | 
Logical types
Parquet uses type annotations to extend the types that it can store, by specifying how the primitive types should be interpreted.
| Parquet primitive type and annotation | Impala type | 
|---|---|
| BINARY annotated with the UTF8 OriginalType | STRING | 
| BINARY annotated with the STRING LogicalType | STRING | 
| BINARY annotated with the ENUM OriginalType | STRING | 
| BINARY annotated with the DECIMAL OriginalType | DECIMAL | 
| INT64 annotated with the TIMESTAMP_MILLIS OriginalType | TIMESTAMP orBIGINT (for backward compatibility) | 
| INT64 annotated with the TIMESTAMP_MICROS OriginalType | TIMESTAMP orBIGINT (for backward compatibility) | 
| INT64 annotated with the TIMESTAMP LogicalType | TIMESTAMP orBIGINT (for backward compatibility) | 
Complex types:
 Impala only supports queries against the complex types
          (ARRAY, MAP, and
          STRUCT) in Parquet tables. 
Loading Data into Parquet Tables
Choose from the following process to load data into Parquet tables based on whether the original data is already in an Impala table, or exists as raw data files outside Impala.
If you already have data in an Impala or Hive table, perhaps in a different file format or partitioning scheme:
- Transfer the data to a Parquet table using the Impala
            INSERT...SELECTstatement.For example:INSERT OVERWRITE TABLE parquet_table_name SELECT * FROM other_table_name;You can convert, filter, repartition, and do other things to the data as part of this same INSERTstatement.When inserting into partitioned tables, especially using the Parquet file format, you can include a hint in the INSERTstatement to fine-tune the overall performance of the operation and its resource usage.
 Any INSERT statement
        for a Parquet table requires enough free space in the HDFS filesystem to
        write one block. Because Parquet data files use a block size of 1 GB by
        default, an INSERT might fail (even for a very small
        amount of data) if your HDFS is running low on space. 
 Avoid the INSERT...VALUES syntax for Parquet tables,
        because INSERT...VALUES produces a separate tiny data
        file for each INSERT...VALUES statement, and the
        strength of Parquet is in its handling of data (compressing,
        parallelizing, and so on) in large
        chunks. 
If you have one or more Parquet data files produced outside of Impala, you can quickly make the data query-able through Impala by one of the following methods:
-  The LOAD DATAstatement moves a single data file or a directory full of data files into the data directory for an Impala table. It does no validation or conversion of the data.The original data files must be somewhere in HDFS, not the local filesystem. 
-  The CREATE TABLEstatement with theLOCATIONclause creates a table where the data continues to reside outside the Impala data directory.The original data files must be somewhere in HDFS, not the local filesystem. For extra safety, if the data is intended to be long-lived and reused by other applications, you can use the CREATE EXTERNAL TABLEsyntax so that the data files are not deleted by an ImpalaDROP TABLEstatement.
-  If the Parquet table already exists, you can copy Parquet data
          files directly into it using the hadoop distcp -pbcommand, then use theREFRESHstatement to make Impala recognize the newly added data.You must preserve the block size of the Parquet data files by using the hadoop distcp -pbcommand rather than a-putor-cpoperation on the Parquet files.
 Recent versions of Sqoop can produce Parquet output files using the
          --as-parquetfile option. 
 If the data exists outside Impala and is in some other format, combine
        both of the preceding techniques. First, use a LOAD
          DATA or CREATE EXTERNAL TABLE ... LOCATION
        statement to bring the data into an Impala table that uses the
        appropriate file format. Then, use an INSERT...SELECT
        statement to copy the data to the Parquet table, converting to Parquet
        format as part of the process. 
Loading data into Parquet tables is a memory-intensive operation, because the incoming data is buffered until it reaches one data block in size, then that chunk of data is organized and compressed in memory before being written out. The memory consumption can be larger when inserting data into partitioned Parquet tables, because a separate data file is written for each combination of partition key column values, potentially requiring several large chunks to be manipulated in memory at once.
 When inserting into a partitioned Parquet table, Impala redistributes
        the data among the nodes to reduce memory consumption. You might still
        need to temporarily increase the memory dedicated to Impala during the
        insert operation, or break up the load operation into several
          INSERT statements, or both. 
Query Performance for Parquet Tables
 Query performance for Parquet tables depends on the number of columns
        needed to process the SELECT list and
          WHERE clauses of the query, the way data is divided
        into large data files with block size equal
          to file size, the reduction in I/O by reading the data for each
        column in compressed format, which data files can be skipped (for
        partitioned tables), and the CPU overhead of decompressing the data for
        each column. 
SELECT AVG(income) FROM census_data WHERE state = 'CA';STATE
        column, it is even more efficient because the query only has to read and
        decode 1 column from each data file, and it can read only the data files
        in the partition directory for the state 'CA', skipping
        the data files for all the other states, which will be physically
        located in other directories. SELECT * FROM census_data; Impala can optimize queries on Parquet tables, especially join
        queries, better when statistics are available for all the tables. Issue
        the COMPUTE STATS statement for each table after
        substantial amounts of data are loaded into or appended to it.
The runtime filtering feature works best with Parquet tables. The per-row filtering aspect only applies to Parquet tables.
Impala queries are optimized for files stored
        in Amazon S3. For Impala tables that use the Parquet file formats, the
          PARQUET_OBJECT_STORE_SPLIT_SIZE query option
        determines how Impala divides the I/O work of reading the data files. By
        default, this value is 256 MB. Impala parallelizes S3 read operations on
        the files as if they were made up of 256 MB blocks to match the row
        group size produced by Impala. 
Parquet files written by Impala include embedded
        metadata specifying the minimum and maximum values for each column,
        within each row group and each data page within the row group.
        Impala-written Parquet files typically contain a single row group; a row
        group can contain many data pages. Impala uses this information
        (currently, only the metadata for each row group) when reading each
        Parquet data file during a query, to quickly determine whether each row
        group within the file potentially includes any rows that match the
        conditions in the WHERE clause. 
For example, if the column X within
        a particular Parquet file has a minimum value of 1 and a maximum value
        of 100, then a query including the clause WHERE x >
          200 can quickly determine that it is safe to skip that
        particular file, instead of scanning all the associated column values. 
This optimization technique is especially effective
        for tables that use the SORT BY clause for the columns
        most frequently checked in WHERE clauses, because any
          INSERT operation on such tables produces Parquet data
        files with relatively narrow ranges of column values within each file.
      
To disable Impala from writing the Parquet page index when creating
        Parquet files, set the PARQUET_WRITE_PAGE_INDEX query
        option to FALSE.
Partitioning for Parquet Tables
Partitioning is an important performance technique for Impala generally. This section explains some of the performance considerations for partitioned Parquet tables.
 The Parquet file format is ideal for tables containing many columns,
        where most queries only refer to a small subset of the columns. The
        physical layout of Parquet data files lets Impala read only a small
        fraction of the data for many queries. The performance benefits of this
        approach are amplified when you use Parquet tables in combination with
        partitioning. Impala can skip the data files for certain partitions
        entirely, based on the comparisons in the WHERE clause
        that refer to the partition key columns. For example, queries on
        partitioned tables often analyze data for time intervals based on
        columns such as YEAR, MONTH, and/or
          DAY, or for geographic regions. 
As Parquet data files use a large block size, when deciding how finely to partition the data, try to find a granularity where each partition contains 256 MB or more of data, rather than creating a large number of smaller files split among many partitions.
 Inserting into a partitioned Parquet table can be a resource-intensive
        operation, because each Impala node could potentially be writing a
        separate data file to HDFS for each combination of different values for
        the partition key columns. The large number of simultaneous open files
        could exceed the HDFS transceivers
 limit. To avoid exceeding this
        limit, consider the following techniques: 
-  Load different subsets of data using separate
            INSERTstatements with specific values for thePARTITIONclause, such asPARTITION (year=2010).
-  Increase the transceivers value for HDFS, sometimes spelledxcievers (sic). The property value in the hdfs-site.xml configuration file isdfs.datanode.max.transfer.threads.For example, if you were loading 12 years of data partitioned by year, month, and day, even a value of 4096 might not be high enough. 
-  Use the COMPUTE STATSstatement to collect column statistics on the source table from which data is being copied, so that the Impala query can estimate the number of different values in the partition key columns and distribute the work accordingly.
Enabling Compression for Parquet Tables
 When Impala writes Parquet data files using the
          INSERT statement, the underlying compression is
        controlled by the COMPRESSION_CODEC query option. The
        allowed values for this query option are snappy (the
        default), gzip, zstd,
          lz4, and none, the compression
        codecs that Impala supports for Parquet.
- Snappy
- By default, the underlying data files for a Parquet table are compressed with Snappy. The combination of fast compression and decompression makes it a good choice for many data sets.
- GZip
- If you need more intensive compression (at the expense of more CPU
            cycles for uncompressing during queries), set the
              COMPRESSION_CODECquery option togzipbefore inserting the data.
- Zstd
- Zstd is a real-time compression algorithm offering a tradeoff between speed and ratio of compression. Compression levels from 1 up to 22 are supported. The lower the level, the faster the speed at the cost of compression ratio.
- Lz4
- Lz4 is a lossless compression algorithm providing extremely fast and scalable compression and decompression.
- None
- If your data compresses very poorly, or you want to avoid the CPU
            overhead of compression and decompression entirely, set the
              COMPRESSION_CODECquery option tononebefore inserting the data.
The actual compression ratios, and relative insert and query speeds, will vary depending on the characteristics of the actual data.
Because Parquet data files are typically large, each directory will have a different number of data files and the row groups will be arranged differently.
At the same time, the less aggressive the compression, the faster the data can be decompressed.
For example, using a table with a billion rows, switching from Snappy to GZip compression shrinks the data by an additional 40% or so, while switching from Snappy compression to no compression expands the data also by about 40%. A query that evaluates all the values for a particular column runs faster with no compression than with Snappy compression, and faster with Snappy compression than with Gzip compression.
The data files using the various compression codecs are all compatible
        with each other for read operations. The metadata about the compression
        format is written into each data file, and can be decoded during queries
        regardless of the COMPRESSION_CODEC setting in effect
        at the time. 
Exchanging Parquet Data Files with Other Cloudera Components
You can read and write Parquet data files from other Cloudera components, such as Hive.
Impala supports the scalar data types that you can encode in a Parquet data file, but not composite or nested types such as maps or arrays. Impala can query Parquet data files that include composite or nested types, as long as the query only refers to columns with scalar types.
 If you copy Parquet data files between nodes, or even between
        different directories on the same node, make sure to preserve the block
        size by using the command hadoop distcp -pb. To verify
        that the block size was preserved, issue the command hdfs fsck
          -blocks HDFS_path_of_impala_table_dir and
        check that the average block size is at or near 256 MB (or whatever other size is defined by
          the PARQUET_FILE_SIZE query option).. (The
          hadoop distcp operation typically leaves some
        directories behind, with names matching
          _distcp_logs_*, that you can delete from the
        destination directory afterward.)
Issue the command hadoop distcp for details about distcp command syntax.
 Impala can query Parquet files that use the
          PLAIN, PLAIN_DICTIONARY, BIT_PACKED,
        and RLE encodings. Starting with this release, Impala supports decoding
          RLE_DICTIONARY encoded pages. This encoding is identical to the
        already-supported PLAIN_DICTIONARY encoding but the PLAIN enum value is used for the
        dictionary pages and the RLE_DICTIONARY enum value is used for the data pages. When creating
        files outside of Impala for use by Impala, make sure to use one of the supported encodings. 
In particular, for MapReduce jobs,
          parquet.writer.version must not be defined
        (especially as PARQUET_2_0) for writing the
        configurations of Parquet MR jobs. 
Data using the version 2.0 of Parquet writer might not be consumable by
        Impala, due to use of the RLE_DICTIONARY encoding. 
Use the default version of the Parquet writer and refrain from overriding the default
        writer version by setting the parquet.writer.version property or via
          WriterVersion.PARQUET_2_0 in the Parquet API.
- Plain (PLAIN = 0)
- Dictionary encoding (PLAIN_DICTIONARY = 2 and RLE_DICTIONARY = 8)
- Run Length Encoding / Bit-Packing Hybrid (RLE = 3)
- Delta binary packed (DELTA_BINARY_PACKED = 5)
- Delta-length byte array (DELTA_LENGTH_BYTE_ARRAY = 6)
- Delta byte array (DELTA_BYTE_ARRAY = 7)
- Byte stream split (BYTE_STREAM_SPLIT = 9)
- Bit-packed (BIT_PACKED = 4)
For more information, see Apache Parquet encoding.
How Parquet Data Files Are Organized
Although Parquet is a column-oriented file format, Parquet keeps all the data for a row within the same data file, to ensure that the columns for a row are always available on the same node for processing. Parquet sets a large HDFS block size and a matching maximum data file size to ensure that I/O and network transfer requests apply to large batches of data.
Within that data file, the data for a set of rows is rearranged so that all the values from the first column are organized in one contiguous block, then all the values from the second column, and so on. Putting the values from the same column next to each other lets Impala use effective compression techniques on the values in that column.
 When Impala retrieves or tests the data for a particular column, it
        opens all the data files, but only reads the portion of each file
        containing the values for that column. The column values are stored
        consecutively, minimizing the I/O required to process the values within
        a single column. If other columns are named in the
          SELECT list or WHERE clauses, the
        data for all columns in the same row is available within that same data
        file. 
 If an INSERT statement brings in less than one Parquet block's worth of data, the
        resulting data file is smaller than ideal. Thus, if you do split up an
        ETL job to use multiple INSERT statements, try to keep
        the volume of data for each INSERT statement to
        approximately 256 MB, or a multiple of 256
          MB. 
RLE and Dictionary Encoding for Parquet Data Files
Parquet uses some automatic compression techniques, such as run-length encoding (RLE) and dictionary encoding, based on analysis of the actual data values. Once the data values are encoded in a compact form, the encoded data can optionally be further compressed using a compression algorithm. Parquet data files created by Impala can use Snappy, GZip, or no compression; the Parquet spec also allows LZO compression, but currently Impala does not support LZO-compressed Parquet files.
RLE and dictionary encoding are compression techniques that Impala applies automatically to groups of Parquet data values, in addition to any Snappy or GZip compression applied to the entire data files. These automatic optimizations can save you time and planning that are normally needed for a traditional data warehouse. For example, dictionary encoding reduces the need to create numeric IDs as abbreviations for longer string values.
Run-length encoding condenses sequences of repeated data values. For example, if many consecutive rows all contain the same value for a country code, those repeating values can be represented by the value followed by a count of how many times it appears consecutively.
 Dictionary encoding takes the different values present in a column,
        and represents each one in compact 2-byte form rather than the original
        value, which could be several bytes. (Additional compression is applied
        to the compacted values, for extra space savings.) This type of encoding
        applies when the number of different values for a column is less than
        2**16 (16,384). It does not apply to columns of data type
          BOOLEAN, which are already very short.
          TIMESTAMP columns sometimes have a unique value for
        each row, in which case they can quickly exceed the 2**16 limit on
        distinct values. The 2**16 limit on different values within a column is
        reset for each data file, so if several different data files each
        contained 10,000 different city names, the city name column in each data
        file could still be condensed using dictionary encoding. 
Compacting Data Files for Parquet Tables
 If you reuse existing table structures or ETL processes for Parquet
        tables, you might encounter a many small files
 situation, which
        is suboptimal for query efficiency. 
 Here are techniques to help you produce large data files in Parquet
          INSERT operations, and to compact existing too-small
        data files: 
- 
          When inserting into a partitioned Parquet table, use statically partitioned INSERTstatements where the partition key values are specified as constant values. Ideally, use a separateINSERTstatement for each partition.
- 
          You might set the NUM_NODESoption to 1 briefly, duringINSERTorCREATE TABLE AS SELECTstatements. Normally, those statements produce one or more data files per data node. If the write operation involves small amounts of data, a Parquet table, and/or a partitioned table, the default behavior could produce many small files when intuitively you might expect only a single output file.SET NUM_NODES=1turns off thedistributed aspect of the write operation, making it more likely to produce only one or a few data files.
- 
          Be prepared to reduce the number of partition key columns from what you are used to with traditional analytic database systems. 
- 
          Do not expect Impala-written Parquet files to fill up the entire Parquet block size. Impala estimates on the conservative side when figuring out how much data to write to each Parquet file. Typically, the uncompressed data in memory is substantially reduced on disk by the compression and encoding techniques in the Parquet file format. The final data file size varies depending on the compressibility of the data. Therefore, it is not an indication of a problem if 256 MB of text data is turned into 2 Parquet data files, each less than 256 MB. 
- 
          If you accidentally end up with a table with many small data files, consider using one or more of the preceding techniques and copying all the data into a new Parquet table, either through CREATE TABLE AS SELECTorINSERT ... SELECTstatements.To avoid rewriting queries to change table names, you can adopt a convention of always running important queries against a view. Changing the view definition immediately switches any subsequent queries to use the new underlying tables: 
Schema Evolution for Parquet Tables
 Schema evolution refers to using the statement ALTER TABLE ...
          REPLACE COLUMNS to change the names, data type, or number of
        columns in a table. You can perform schema evolution for Parquet tables
        as follows: 
- 
          The Impala ALTER TABLEstatement never changes any data files in the tables. From the Impala side, schema evolution involves interpreting the same data files in terms of a new table definition. Some types of schema changes make sense and are represented correctly. Other types of changes cannot be represented in a sensible way, and produce special result values or conversion errors during queries.
- 
          The INSERTstatement always creates data using the latest table definition. You might end up with data files with different numbers of columns or internal data representations if you do a sequence ofINSERTandALTER TABLE ... REPLACE COLUMNSstatements.
- 
          If you use ALTER TABLE ... REPLACE COLUMNSto define additional columns at the end, when the original data files are used in a query, these final columns are considered to be allNULLvalues.
- 
          If you use ALTER TABLE ... REPLACE COLUMNSto define fewer columns than before, when the original data files are used in a query, the unused columns still present in the data file are ignored.
- 
          Parquet represents the TINYINT,SMALLINT, andINTtypes the same internally, all stored in 32-bit integers.-  That means it is easy to promote a TINYINTcolumn toSMALLINTorINT, or aSMALLINTcolumn toINT. The numbers are represented exactly the same in the data file, and the columns being promoted would not contain any out-of-range values.
- 
              If you change any of these column types to a smaller type, any values that are out-of-range for the new type are returned incorrectly, typically as negative numbers. 
- 
              You cannot change a TINYINT,SMALLINT, orINTcolumn toBIGINT, or the other way around. Although theALTER TABLEsucceeds, any attempt to query those columns results in conversion errors.
- 
              Any other type conversion for columns produces a conversion error during queries. For example, INTtoSTRING,FLOATtoDOUBLE,TIMESTAMPtoSTRING,DECIMAL(9,0)toDECIMAL(5,2), and so on.
 
-  That means it is easy to promote a 
 You might find that you have Parquet files where the columns do not
        line up in the same order as in your Impala table. For example, you
        might have a Parquet file that was part of a table with columns
          C1,C2,C3,C4, and now you want to reuse the same
        Parquet file in a table with columns C4,C2. By default,
        Impala expects the columns in the data file to appear in the same order
        as the columns defined for the table, making it impractical to do some
        kinds of file reuse or schema evolution. 
The query option
          PARQUET_FALLBACK_SCHEMA_RESOLUTION=name lets Impala
        resolve columns by name, and therefore handle out-of-order or extra
        columns in the data file. 
