Using Text Data Files with Impala Tables

Cloudera Impala supports using text files as the storage format for input and output. Text files are a convenient format to use for interchange with other applications or scripts that produce or read delimited text files, such as CSV or TSV with commas or tabs for delimiters.

Text files are also very flexible in their column definitions. For example, a text file could have more fields than the Impala table, and those extra fields are ignored during queries; or it could have fewer fields than the Impala table, and those missing fields are treated as NULL values in queries. You could have fields that were treated as numbers or timestamps in a table, then use ALTER TABLE ... REPLACE COLUMNS to switch them to strings, or the reverse.

Text Format Support in Impala
File Type Format Compression Codecs Impala Can CREATE? Impala Can INSERT?
Text Unstructured LZO, gzip, bzip2, Snappy Yes. For CREATE TABLE with no STORED AS clause, the default file format is uncompressed text, with values separated by ASCII 0x01 characters (typically represented as Ctrl-A). Yes: CREATE TABLE, INSERT, LOAD DATA, and query. If LZO compression is used, you must create the table and load data in Hive. If other kinds of compression are used, you must load data through LOAD DATA, Hive, or manually in HDFS.

Query Performance for Impala Text Tables

Data stored in text format is relatively bulky, and not as efficient to query as binary formats such as Parquet. You typically use text tables with Impala if that is the format you receive the data and you do not have control over that process, or if you are a relatively new Hadoop user and not familiar with techniques to generate files in other formats. (Because the default format for CREATE TABLE is text, you might create your first Impala tables as text without giving performance much thought.) Either way, look for opportunities to use more efficient file formats for the tables used in your most performance-critical queries.

For frequently queried data, you might load the original text data files into one Impala table, then use an INSERT statement to transfer the data to another table that uses the Parquet file format; the data is converted automatically as it is stored in the destination table.

For more compact data, consider using LZO compression for the text files. LZO is the only compression codec that Impala supports for text data, because the "splittable" nature of LZO data files lets different nodes work on different parts of the same file in parallel. See Using LZO-Compressed Text Files for details.

In Impala 2.0 and later, you can also use text data compressed in the gzip, bzip2, or Snappy formats. Because these compressed formats are not "splittable" in the way that LZO is, there is less opportunity for Impala to parallelize queries on them. Therefore, use these types of compressed data only for convenience if that is the format in which you receive the data. Prefer to use LZO compression for text data if you have the choice, or convert the data to Parquet using an INSERT ... SELECT statement to copy the original data into a Parquet table.

Creating Text Tables

To create a table using text data files:

If the exact format of the text data files (such as the delimiter character) is not significant, use the CREATE TABLE statement with no extra clauses at the end to create a text-format table. For example:

create table my_table(id int, s string, n int, t timestamp, b boolean);

The data files created by any INSERT statements will use the Ctrl-A character (hex 01) as a separator between each column value.

A common use case is to import existing text files into an Impala table. The syntax is more verbose; the significant part is the FIELDS TERMINATED BY clause, which must be preceded by the ROW FORMAT DELIMITED clause. The statement can end with a STORED AS TEXTFILE clause, but that clause is optional because text format tables are the default. For example:

create table csv(id int, s string, n int, t timestamp, b boolean)
  row format delimited
  fields terminated by ',';

create table tsv(id int, s string, n int, t timestamp, b boolean)
  row format delimited
  fields terminated by '\t';

create table pipe_separated(id int, s string, n int, t timestamp, b boolean)
  row format delimited
  fields terminated by '|'
  stored as textfile;

You can create tables with specific separator characters to import text files in familiar formats such as CSV, TSV, or pipe-separated. You can also use these tables to produce output data files, by copying data into them through the INSERT ... SELECT syntax and then extracting the data files from the Impala data directory.

In Impala 1.3.1 and higher, you can specify a delimiter character '\0' to use the ASCII 0 (nul) character for text tables:

create table nul_separated(id int, s string, n int, t timestamp, b boolean)
  row format delimited
  fields terminated by '\0'
  stored as textfile;

Issue a DESCRIBE FORMATTED table_name statement to see the details of how each table is represented internally in Impala.

Data Files for Text Tables

When Impala queries a table with data in text format, it consults all the data files in the data directory for that table, with some exceptions:

  • Impala ignores any hidden files, that is, files whose names start with a dot or an underscore.

  • Impala queries ignore files with extensions commonly used for temporary work files by Hadoop tools. Any files with extensions .tmp or .copying are not considered part of the Impala table. The suffix matching is case-insensitive, so for example Impala ignores both .copying and .COPYING suffixes.

  • Impala uses suffixes to recognize when text data files are compressed text. For Impala to recognize the compressed text files, they must have the appropriate file extension corresponding to the compression codec, either .gz, .bz2, or .snappy. The extensions can be in uppercase or lowercase.

  • Otherwise, the file names are not significant. When you put files into an HDFS directory through ETL jobs, or point Impala to an existing HDFS directory with the CREATE EXTERNAL TABLE statement, or move data files under external control with the LOAD DATA statement, Impala preserves the original filenames.

Filenames for data produced through Impala INSERT statements are given unique names to avoid filename conflicts.

An INSERT ... SELECT statement produces one data file from each node that processes the SELECT part of the statement. An INSERT ... VALUES statement produces a separate data file for each statement; because Impala is more efficient querying a small number of huge files than a large number of tiny files, the INSERT ... VALUES syntax is not recommended for loading a substantial volume of data. If you find yourself with a table that is inefficient due to too many small data files, reorganize the data into a few large files by doing INSERT ... SELECT to transfer the data to a new table.

Special values within text data files:

  • Impala recognizes the literal strings inf for infinity and nan for "Not a Number", for FLOAT and DOUBLE columns.

  • Impala recognizes the literal string \N to represent NULL. When using Sqoop, specify the options --null-non-string and --null-string to ensure all NULL values are represented correctly in the Sqoop output files. By default, Sqoop writes NULL values using the string null, which causes a conversion error when such rows are evaluated by Impala. (A workaround for existing tables and data files is to change the table properties through ALTER TABLE name SET TBLPROPERTIES("serialization.null.format"="null").)

Loading Data into Impala Text Tables

To load an existing text file into an Impala text table, use the LOAD DATA statement and specify the path of the file in HDFS. That file is moved into the appropriate Impala data directory.

To load multiple existing text files into an Impala text table, use the LOAD DATA statement and specify the HDFS path of the directory containing the files. All non-hidden files are moved into the appropriate Impala data directory.

To convert data to text from any other file format supported by Impala, use a SQL statement such as:

-- Text table with default delimiter, the hex 01 character.
CREATE TABLE text_table AS SELECT * FROM other_file_format_table;

-- Text table with user-specified delimiter. Currently, you cannot specify
-- the delimiter as part of CREATE TABLE LIKE or CREATE TABLE AS SELECT.
-- But you can change an existing text table to have a different delimiter.
CREATE TABLE csv LIKE other_file_format_table;
ALTER TABLE csv SET SERDEPROPERTIES ('serialization.format'=',', 'field.delim'=',');
INSERT INTO csv SELECT * FROM other_file_format_table;

This can be a useful technique to see how Impala represents special values within a text-format data file. Use the DESCRIBE FORMATTED statement to see the HDFS directory where the data files are stored, then use Linux commands such as hdfs dfs -ls hdfs_directory and hdfs dfs -cat hdfs_file to display the contents of an Impala-created text file.

To create a few rows in a text table for test purposes, you can use the INSERT ... VALUES syntax:

INSERT INTO text_table VALUES ('string_literal',100,hex('hello world'));

When you create a text file for use with an Impala text table, specify \N to represent a NULL value. For the differences between NULL and empty strings, see NULL.

If a text file has fewer fields than the columns in the corresponding Impala table, all the corresponding columns are set to NULL when the data in that file is read by an Impala query.

If a text file has more fields than the columns in the corresponding Impala table, the extra fields are ignored when the data in that file is read by an Impala query.

You can also use manual HDFS operations such as hdfs dfs -put or hdfs dfs -cp to put data files in the data directory for an Impala table. When you copy or move new data files into the HDFS directory for the Impala table, issue a REFRESH table_name statement in impala-shell before issuing the next query against that table, to make Impala recognize the newly added files.

Using LZO-Compressed Text Files

Cloudera Impala supports using text data files that employ LZO compression. Cloudera recommends compressing text data files when practical. Impala queries are usually I/O-bound; reducing the amount of data read from disk typically speeds up a query, despite the extra CPU work to uncompress the data in memory.

Impala can work with LZO-compressed text files. LZO-compressed files are preferable to text files compressed by other codecs, because LZO-compressed files are "splittable", meaning that different portions of a file can be uncompressed and processed independently by different nodes.

Impala does not currently support writing LZO-compressed text files.

Because Impala can query LZO-compressed files but currently cannot write them, you use Hive to do the initial CREATE TABLE and load the data, then switch back to Impala to run queries. For instructions on setting up LZO compression for Hive CREATE TABLE and INSERT statements, see the LZO page on the Hive wiki. Once you have created an LZO text table, you can also manually add LZO-compressed text files to it, produced by the lzop command or similar method.

Preparing to Use LZO-Compressed Text Files

Before using LZO-compressed tables in Impala, do the following one-time setup for each machine in the cluster. Install the necessary packages using either the Cloudera public repository, a private repository you establish, or by using packages. You must do these steps manually, whether or not the cluster is managed by the Cloudera Manager product.

  1. Prepare your systems to work with LZO using Cloudera repositories:

    On systems managed by Cloudera Manager using parcels:

    See the setup instructions for the LZO parcel in the Cloudera Manager documentation for Cloudera Manager 5.

    On systems managed by Cloudera Manager using packages, or not managed by Cloudera Manager:

    Download and install the appropriate file to each machine on which you intend to use LZO with Impala. These files all come from the Cloudera GPL extras download site. Install the:

  2. Configure Impala to use LZO:

    Use one of the following sets of commands to refresh your package management system's repository information, install the base LZO support for Hadoop, and install the LZO support for Impala.

    For RHEL/CentOS systems:

    $ sudo yum update
    $ sudo yum install hadoop-lzo
    $ sudo yum install impala-lzo

    For SUSE systems:

    $ sudo apt-get update
    $ sudo zypper install hadoop-lzo
    $ sudo zypper install impala-lzo

    For Debian/Ubuntu systems:

    $ sudo zypper update
    $ sudo apt-get install hadoop-lzo
    $ sudo apt-get install impala-lzo
  3. For core-site.xml on the client and server (that is, in the configuration directories for both Impala and Hadoop), append com.hadoop.compression.lzo.LzopCodec to the comma-separated list of codecs. For example:
    <property>
      <name>io.compression.codecs</name>
      <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,
            org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,
            org.apache.hadoop.io.compress.SnappyCodec,com.hadoop.compression.lzo.LzopCodec</value>
    </property>
  4. Restart the MapReduce and Impala services.

Creating LZO Compressed Text Tables

A table containing LZO-compressed text files must be created in Hive with the following storage clause:

STORED AS
    INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

Also, certain Hive settings need to be in effect. For example:

hive> SET mapreduce.output.fileoutputformat.compress=true;
hive> SET hive.exec.compress.output=true;
hive> SET mapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec;
hive> CREATE TABLE lzo_t (s string) STORED AS
  > INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
  > OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
hive> INSERT INTO TABLE lzo_t SELECT col1, col2 FROM uncompressed_text_table;

Once you have created LZO-compressed text tables, you can convert data stored in other tables (regardless of file format) by using the INSERT ... SELECT statement in Hive.

Files in an LZO-compressed table must use the .lzo extension. Examine the files in the HDFS data directory after doing the INSERT in Hive, to make sure the files have the right extension. If the required settings are not in place, you end up with regular uncompressed files, and Impala cannot access the table because it finds data files with the wrong (uncompressed) format.

After loading data into an LZO-compressed text table, index the files so that they can be split. You index the files by running a Java class, com.hadoop.compression.lzo.DistributedLzoIndexer, through the Linux command line. This Java class is included in the hadoop-lzo package.

Run the indexer using a command like the following:

$ hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-version-gplextras.jar
  com.hadoop.compression.lzo.DistributedLzoIndexer /hdfs_location_of_table/

Indexed files have the same name as the file they index, with the .index extension. If the data files are not indexed, Impala queries still work, but the queries read the data from remote DataNodes, which is very inefficient.

Once the LZO-compressed tables are created, and data is loaded and indexed, you can query them through Impala. As always, the first time you start impala-shell after creating a table in Hive, issue an INVALIDATE METADATA statement so that Impala recognizes the new table. (In Impala 1.2 and higher, you only have to run INVALIDATE METADATA on one node, rather than on all the Impala nodes.)

Using gzip, bzip2, or Snappy-Compressed Text Files

In Impala 2.0 and later, Impala supports using text data files that employ gzip, bzip2, or Snappy compression. These compression types are primarily for convenience within an existing ETL pipeline rather than maximum performance. Although it requires less I/O to read compressed text than the equivalent uncompressed text, files compressed by these codecs are not "splittable" and therefore cannot take full advantage of the Impala parallel query capability.

As each bzip2- or Snappy-compressed text file is processed, the node doing the work reads the entire file into memory and then decompresses it. Therefore, the node must have enough memory to hold both the compressed and uncompressed data from the text file. The memory required to hold the uncompressed data is difficult to estimate in advance, potentially causing problems on systems with low memory limits or with resource management enabled. In Impala 2.1 and higher, this memory overhead is reduced for gzip-compressed text files. The gzipped data is decompressed as it is read, rather than all at once.

To create a table to hold gzip, bzip2, or Snappy-compressed text, create a text table with no special compression options. Specify the delimiter and escape character if required, using the ROW FORMAT clause.

Because Impala can query compressed text files but currently cannot write them, produce the compressed text files outside Impala and use the LOAD DATA statement, manual HDFS commands to move them to the appropriate Impala data directory. (Or, you can use CREATE EXTERNAL TABLE and point the LOCATION attribute at a directory containing existing compressed text files.)

For Impala to recognize the compressed text files, they must have the appropriate file extension corresponding to the compression codec, either .gz, .bz2, or .snappy. The extensions can be in uppercase or lowercase.

The following example shows how you can create a regular text table, put different kinds of compressed and uncompressed files into it, and Impala automatically recognizes and decompresses each one based on their file extensions:

create table csv_compressed (a string, b string, c string)
  row format delimited fields terminated by ",";

insert into csv_compressed values
  ('one - uncompressed', 'two - uncompressed', 'three - uncompressed'),
  ('abc - uncompressed', 'xyz - uncompressed', '123 - uncompressed');
...make equivalent .gz, .bz2, and .snappy files and load them into same table directory...

select * from csv_compressed;
+--------------------+--------------------+----------------------+
| a                  | b                  | c                    |
+--------------------+--------------------+----------------------+
| one - snappy       | two - snappy       | three - snappy       |
| one - uncompressed | two - uncompressed | three - uncompressed |
| abc - uncompressed | xyz - uncompressed | 123 - uncompressed   |
| one - bz2          | two - bz2          | three - bz2          |
| abc - bz2          | xyz - bz2          | 123 - bz2            |
| one - gzip         | two - gzip         | three - gzip         |
| abc - gzip         | xyz - gzip         | 123 - gzip           |
+--------------------+--------------------+----------------------+

$ hdfs dfs -ls 'hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/';
...truncated for readability...
75 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed.snappy
79 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_bz2.csv.bz2
80 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_gzip.csv.gz
116 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/dd414df64d67d49b_data.0.