Using the Parquet File Format with Impala, Hive, Pig, and MapReduce
Parquet is automatically installed when you install any of the above components, and the necessary libraries are automatically placed in the classpath for all of them. Copies of the libraries are in /usr/lib/parquet or inside the parcels in /lib/parquet.
The Parquet file format incorporates several features that make it highly suited to data warehouse-style operations:
- Columnar storage layout. A query can examine and perform calculations on all values for a column while reading only a small fraction of the data from a data file or table.
- Flexible compression options. The data can be compressed with any of several codecs. Different data files can be compressed differently. The compression is transparent to applications that read the data files.
- Innovative encoding schemes. Sequences of identical, similar, or related data values can be represented in ways that save disk space and memory. The encoding schemes provide an extra level of space savings beyond the overall compression for each data file.
- Large file size. The layout of Parquet data files is optimized for queries that process large volumes of data, with individual files in the multi-megabyte or even gigabyte range.
Among components of the CDH distribution, Parquet support originated in Impala. Impala can create Parquet tables, insert data into them, convert data from other file formats to Parquet, and then perform SQL queries on the resulting data files. Parquet tables created by Impala can be accessed by Hive, and vice versa.
The CDH software stack lets you use the tool of your choice with the Parquet file format, for each phase of data processing. For example, you can read and write Parquet files using Pig and MapReduce jobs. You can convert, transform, and query Parquet tables through Impala and Hive. And you can interchange data files between all of those components.
Using Parquet Tables with Impala
The Cloudera Impala component can create tables that use Parquet data files; insert data into those tables, converting the data into Parquet format; and query Parquet data files produced by Impala or by other components. The only syntax required is the STORED AS PARQUET clause on the CREATE TABLE statement. After that, all SELECT, INSERT, and other statements recognize the Parquet format automatically. For example, a session in the impala-shell interpreter might look as follows:
[localhost:21000] > create table parquet_table (x int, y string) stored as parquet; [localhost:21000] > insert into parquet_table select x, y from some_other_table; Inserted 50000000 rows in 33.52s [localhost:21000] > select y from parquet_table where x between 70 and 100;
Once you create a Parquet table this way in Impala, you can query it or insert into it through either Impala or Hive.
Remember that Parquet format is optimized for working with large data files. In Impala 2.0 and later, the default size of Parquet files written by Impala is 256 MB; in earlier releases, 1 GB. Avoid using the INSERT ... VALUES syntax, or partitioning the table at too granular a level, if that would produce a large number of small files that cannot take advantage of the Parquet optimizations for large data chunks.
Inserting data into a partitioned Impala table can be a memory-intensive operation, because each data file requires a memory buffer to hold the data before being written. Such inserts can also exceed HDFS limits on simultaneous open files, because each node could potentially write to a separate data file for each partition, all at the same time. Always make sure table and column statistics are in place for any table used as the source for an INSERT ... SELECT operation into a Parquet table. If capacity problems still occur, consider splitting up such insert operations into one INSERT statement per partition.
Impala can query Parquet files that use the PLAIN, PLAIN_DICTIONARY, BIT_PACKED, and RLE encodings. Currently, Impala does not support RLE_DICTIONARY encoding. When creating files outside of Impala for use by Impala, make sure to use one of the supported encodings. In particular, for MapReduce jobs, parquet.writer.version must not be defined (especially as PARQUET_2_0) for writing the configurations of Parquet MR jobs. Use the default version (or format). The default format, 1.0, includes some enhancements that are compatible with older versions. Data using the 2.0 format might not be consumable by Impala, due to use of the RLE_DICTIONARY encoding.
If you use Sqoop to convert RDBMS data to Parquet, be careful with interpreting any resulting values from DATE, DATETIME, or TIMESTAMP columns. The underlying values are represented as the Parquet INT64 type, which is represented as BIGINT in the Impala table. The Parquet values represent the time in milliseconds, while Impala interprets BIGINT as the time in seconds. Therefore, if you have a BIGINT column in a Parquet table that was imported this way from Sqoop, divide the values by 1000 when interpreting as the TIMESTAMP type.
For complete instructions and examples, see Using the Parquet File Format with Impala Tables.
Using Parquet Tables in Hive
To create a table named PARQUET_TABLE that uses the Parquet format, you would use a command like the following, substituting your own table name, column names, and data types:
hive> CREATE TABLE parquet_table_name (x INT, y STRING) STORED AS PARQUET;
If the table will be populated with data files generated outside of Impala and Hive, it is often useful to create the table as an external table pointing to the location where the files will be created:
hive> create external table parquet_table_name (x INT, y STRING) ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat" OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat" LOCATION '/test-warehouse/tinytable';
To populate the table with an INSERT statement, and to read the table with a SELECT statement, see Using the Parquet File Format with Impala Tables.
Select the compression to use when writing data with the parquet.compression property, for example:
set parquet.compression=GZIP; INSERT OVERWRITE TABLE tinytable SELECT * FROM texttable;
The valid options for compression are:
- UNCOMPRESSED
- GZIP
- SNAPPY
Using Parquet Files in Pig
Reading Parquet Files in Pig
Assuming the external table was created and populated with Impala or Hive as described above, the Pig instruction to read the data is:
grunt> A = LOAD '/test-warehouse/tinytable' USING parquet.pig.ParquetLoader AS (x: int, y int);
Writing Parquet Files in Pig
Create and populate a Parquet file with the ParquetStorer class:
grunt> store A into '/test-warehouse/tinytable' USING parquet.pig.ParquetStorer;
There are three compression options: uncompressed, snappy, and gzip. The default is snappy. You can specify one of them once before the first store instruction in a Pig script:
SET parquet.compression gzip;
Using Parquet Files in MapReduce
MapReduce needs Thrift in its CLASSPATH and in libjars to access Parquet files. It also needs parquet-format in libjars. Perform the following setup before running MapReduce jobs that access Parquet data files:
if [ -e /opt/cloudera/parcels/CDH ] ; then CDH_BASE=/opt/cloudera/parcels/CDH else CDH_BASE=/usr fi THRIFTJAR=`ls -l $CDH_BASE/lib/hive/lib/libthrift*jar | awk '{print $9}' | head -1` export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$THRIFTJAR export LIBJARS=`echo "$CLASSPATH" | awk 'BEGIN { RS = ":" } { print }' | grep parquet-format | tail -1` export LIBJARS=$LIBJARS,$THRIFTJAR hadoop jar my-parquet-mr.jar -libjars $LIBJARS
Reading Parquet Files in MapReduce
Taking advantage of the Example helper classes in the Parquet JAR files, a simple map-only MapReduce job that reads Parquet files can use the ExampleInputFormat class and the Group value class. There is nothing special about the reduce phase when using Parquet files. The following example demonstrates how to read a Parquet file in a MapReduce job; portions of code specific to the Parquet aspect are shown in bold.
import static java.lang.Thread.sleep; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import parquet.Log; import parquet.example.data.Group; import parquet.hadoop.example.ExampleInputFormat; public class TestReadParquet extends Configured implements Tool { private static final Log LOG = Log.getLog(TestReadParquet.class); /* * Read a Parquet record */ public static class MyMap extends Mapper<LongWritable, Group, NullWritable, Text> { @Override public void map(LongWritable key, Group value, Context context) throws IOException, InterruptedException { NullWritable outKey = NullWritable.get(); String outputRecord = ""; // Get the schema and field values of the record String inputRecord = value.toString(); // Process the value, create an output record // ... context.write(outKey, new Text(outputRecord)); } } public int run(String[] args) throws Exception { Job job = new Job(getConf()); job.setJarByClass(getClass()); job.setJobName(getClass().getName()); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(MyMap.class); job.setNumReduceTasks(0); job.setInputFormatClass(ExampleInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { try { int res = ToolRunner.run(new Configuration(), new TestReadParquet(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); System.exit(255); } } }
Writing Parquet Files in MapReduce
When writing Parquet files you will need to provide a schema. The schema can be specified in the run method of the job before submitting it, for example:
... import parquet.Log; import parquet.example.data.Group; import parquet.hadoop.example.GroupWriteSupport; import parquet.hadoop.example.ExampleInputFormat; import parquet.hadoop.example.ExampleOutputFormat; import parquet.hadoop.metadata.CompressionCodecName; import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.MessageType; import parquet.schema.MessageTypeParser; import parquet.schema.Type; ... public int run(String[] args) throws Exception { ... String writeSchema = "message example {\n" + "required int32 x;\n" + "required int32 y;\n" + "}"; ExampleOutputFormat.setSchema( job, MessageTypeParser.parseMessageType(writeSchema)); job.submit();
or it can be extracted from the input file(s) if they are in Parquet format:
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RemoteIterator; ... public int run(String[] args) throws Exception { ... String inputFile = args[0]; Path parquetFilePath = null; // Find a file in case a directory was passed RemoteIterator<LocatedFileStatus> it = FileSystem.get(getConf()).listFiles(new Path(inputFile), true); while(it.hasNext()) { FileStatus fs = it.next(); if(fs.isFile()) { parquetFilePath = fs.getPath(); break; } } if(parquetFilePath == null) { LOG.error("No file found for " + inputFile); return 1; } ParquetMetadata readFooter = ParquetFileReader.readFooter(getConf(), parquetFilePath); MessageType schema = readFooter.getFileMetaData().getSchema(); GroupWriteSupport.setSchema(schema, getConf()); job.submit();
Records can then be written in the mapper by composing a Group as value using the Example classes and no key:
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Void, Group>.Context context) throws java.io.IOException, InterruptedException { int x; int y; // Extract the desired output values from the input text // Group group = factory.newGroup() .append("x", x) .append("y", y); context.write(null, group); } }
Compression can be set before submitting the job with:
ExampleOutputFormat.setCompression(job, codec);
The codec should be one of the following:
- CompressionCodecName.UNCOMPRESSED
- CompressionCodecName.SNAPPY
- CompressionCodecName.GZIP
Parquet File Interoperability
Impala has included Parquet support from the beginning, using its own high-performance code written in C++ to read and write the Parquet files. The Parquet JARs for use with Hive, Pig, and MapReduce are available with CDH 4.5 and higher. Using the Java-based Parquet implementation on a CDH release prior to CDH 4.5 is not supported.
A Parquet table created by Hive can typically be accessed by Impala 1.1.1 and higher with no changes, and vice versa. Prior to Impala 1.1.1, when Hive support for Parquet was not available, Impala wrote a dummy SerDes class name into each data file. These older Impala data files require a one-time ALTER TABLE statement to update the metadata for the SerDes class name before they can be used with Hive. See Apache Impala Incompatible Changes and Limitations for details.
A Parquet file written by Hive, Impala, Pig, or MapReduce can be read by any of the others. Different defaults for file and block sizes, compression and encoding settings, and so on might cause performance differences depending on which component writes or reads the data files. For example, Impala typically sets the HDFS block size to 256 MB and divides the data files into 256 MB chunks, so that each I/O request reads an entire data file.
In CDH 5.5 and higher, non-Impala components that write Parquet files include some extra padding to ensure that the Parquet row groups are aligned with HDFS data blocks. The maximum amount of padding is controlled by the parquet.writer.max-padding setting, specified as a number of bytes. By default, up to 8 megabytes of padding might be added to the end of each row group. This alignment helps to avoid remote reads during Impala queries. The setting does not apply to Parquet files written by Impala, because Impala always writes each Parquet file as a single HDFS data block.
There may be limitations in a particular release. The following are current limitations in CDH:
- Parquet has not been tested with HCatalog. Without HCatalog, Pig cannot correctly read dynamically partitioned tables; this is true for all file formats.
- Impala supports table columns using nested data types or complex data types such as map, struct, or array only in Impala 2.3 (corresponding to CDH 5.5) and higher. Impala 2.2 (corresponding to CDH 5.4) can query only the scalar columns of Parquet files containing such types. Lower releases of Impala cannot query any columns from Parquet data files that include such types.
- Cloudera supports some but not all of the object models from the upstream Parquet-MR project. Currently, the supported object models
are:
- parquet-avro (recommended for Cloudera users)
- parquet-thrift
- parquet-protobuf
- parquet-pig
- The Impala and Hive object models that are built into those components, not available in external libraries. (CDH does not include the parquet-hive module of the parquet-mr project, because recent versions of Hive have Parquet support built in.)
Parquet File Structure
- cat: Print a file's contents to standard out. Use the -j option to output JSON.
- head: Print the first few records of a file to standard output.
- schema: Print the Parquet schema for the file.
- meta: Print the file footer metadata, including key-value properties (like Avro schema), compression ratios, encodings, compression used, and row group information.
- dump: Print all data and metadata.
$ # Be careful doing this for a big file! Use parquet-tools head to be safe. $ parquet-tools cat sample.parq year = 1992 month = 1 day = 2 dayofweek = 4 dep_time = 748 crs_dep_time = 750 arr_time = 851 crs_arr_time = 846 carrier = US flight_num = 53 actual_elapsed_time = 63 crs_elapsed_time = 56 arrdelay = 5 depdelay = -2 origin = CMH dest = IND distance = 182 cancelled = 0 diverted = 0 year = 1992 month = 1 day = 3 ...
$ parquet-tools head -n 2 sample.parq year = 1992 month = 1 day = 2 dayofweek = 4 dep_time = 748 crs_dep_time = 750 arr_time = 851 crs_arr_time = 846 carrier = US flight_num = 53 actual_elapsed_time = 63 crs_elapsed_time = 56 arrdelay = 5 depdelay = -2 origin = CMH dest = IND distance = 182 cancelled = 0 diverted = 0 year = 1992 month = 1 day = 3 ...
$ parquet-tools schema sample.parq message schema { optional int32 year; optional int32 month; optional int32 day; optional int32 dayofweek; optional int32 dep_time; optional int32 crs_dep_time; optional int32 arr_time; optional int32 crs_arr_time; optional binary carrier; optional int32 flight_num; ...
$ parquet-tools meta sample.parq creator: impala version 2.2.0-cdh5.4.3 (build 517bb0f71cd604a00369254ac6d88394df83e0f6) file schema: schema ------------------------------------------------------------------- year: OPTIONAL INT32 R:0 D:1 month: OPTIONAL INT32 R:0 D:1 day: OPTIONAL INT32 R:0 D:1 dayofweek: OPTIONAL INT32 R:0 D:1 dep_time: OPTIONAL INT32 R:0 D:1 crs_dep_time: OPTIONAL INT32 R:0 D:1 arr_time: OPTIONAL INT32 R:0 D:1 crs_arr_time: OPTIONAL INT32 R:0 D:1 carrier: OPTIONAL BINARY R:0 D:1 flight_num: OPTIONAL INT32 R:0 D:1 ... row group 1: RC:20636601 TS:265103674 ------------------------------------------------------------------- year: INT32 SNAPPY DO:4 FPO:35 SZ:10103/49723/4.92 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN month: INT32 SNAPPY DO:10147 FPO:10210 SZ:11380/35732/3.14 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN day: INT32 SNAPPY DO:21572 FPO:21714 SZ:3071658/9868452/3.21 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN dayofweek: INT32 SNAPPY DO:3093276 FPO:3093319 SZ:2274375/5941876/2.61 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN dep_time: INT32 SNAPPY DO:5367705 FPO:5373967 SZ:28281281/28573175/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN crs_dep_time: INT32 SNAPPY DO:33649039 FPO:33654262 SZ:10220839/11574964/1.13 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN arr_time: INT32 SNAPPY DO:43869935 FPO:43876489 SZ:28562410/28797767/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN crs_arr_time: INT32 SNAPPY DO:72432398 FPO:72438151 SZ:10908972/12164626/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN carrier: BINARY SNAPPY DO:83341427 FPO:83341558 SZ:114916/128611/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN flight_num: INT32 SNAPPY DO:83456393 FPO:83488603 SZ:10216514/11474301/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN ...
Examples of Java Programs to Read and Write Parquet Files
You can find full examples of Java code at the Cloudera Parquet examples Github repository.
The TestReadWriteParquet.java example demonstrates the "identity" transform. It reads any Parquet data file and writes a new file with exactly the same content.
The TestReadParquet.java example reads a Parquet data file, and produces a new text file in CSV format with the same content.