Using Impala with the Amazon S3 Filesystem

You can use Impala to query data residing on the Amazon S3 filesystem. This capability allows convenient access to a storage system that is remotely managed, accessible from anywhere, and integrated with various cloud-based services. Impala can query files in any supported file format from S3. The S3 storage location can be for an entire table, or individual partitions in a partitioned table.

The default Impala tables use data files stored on HDFS, which are ideal for bulk loads and queries using full-table scans. In contrast, queries against S3 data are less performant, making S3 suitable for holding "cold" data that is only queried occasionally, while more frequently accessed "hot" data resides in HDFS. In a partitioned table, you can set the LOCATION attribute for individual partitions to put some partitions on HDFS and others on S3, typically depending on the age of the data.

How Impala SQL Statements Work with S3

Impala SQL statements work with data on S3 as follows:

  • The CREATE TABLE Statement or ALTER TABLE Statement statements can specify that a table resides on the S3 filesystem by encoding an s3a:// prefix for the LOCATION property. ALTER TABLE can also set the LOCATION property for an individual partition, so that some data in a table resides on S3 and other data in the same table resides on HDFS.

  • Once a table or partition is designated as residing on S3, the SELECT Statement statement transparently accesses the data files from the appropriate storage layer.

  • If the S3 table is an internal table, the DROP TABLE Statement statement removes the corresponding data files from S3 when the table is dropped.

  • The TRUNCATE TABLE Statement (CDH 5.5 or higher only) statement always removes the corresponding data files from S3 when the table is truncated.

  • The LOAD DATA Statement can move data files residing in HDFS into an S3 table.

  • The INSERT Statement statement, or the CREATE TABLE AS SELECT form of the CREATE TABLE statement, can copy data from an HDFS table or another S3 table into an S3 table. The S3_SKIP_INSERT_STAGING Query Option (CDH 5.8 or higher only) query option chooses whether or not to use a fast code path for these write operations to S3, with the tradeoff of potential inconsistency in the case of a failure during the statement.

For usage information about Impala SQL statements with S3 tables, see Creating Impala Databases, Tables, and Partitions for Data Stored on S3 and Using Impala DML Statements for S3 Data.

Specifying Impala Credentials to Access Data in S3

To allow Impala to access data in S3, specify values for the following configuration settings in your core-site.xml file:

<property>
<name>fs.s3a.access.key</name>
<value>your_access_key</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>your_secret_key</value>
</property>

As of CDH 5.8, these settings do not have corresponding controls in the Cloudera Manager user interface. Specify them in the HDFS Client Advanced Configuration Snippet (Safety Valve) for core-site.xml field. After specifying the credentials, restart both the Impala and Hive services. (Restarting Hive is required because Impala queries, CREATE TABLE statements, and so on go through the Hive metastore.)

Loading Data into S3 for Impala Queries

If your ETL pipeline involves moving data into S3 and then querying through Impala, you can either use Impala DML statements to create, move, or copy the data, or use the same data loading techniques as you would for non-Impala data.

Using Impala DML Statements for S3 Data

In CDH 5.8 / Impala 2.6 and higher, the Impala DML statements (INSERT, LOAD DATA, and CREATE TABLE AS SELECT) can write data into a table or partition that resides in the Amazon Simple Storage Service (S3). The syntax of the DML statements is the same as for any other tables, because the S3 location for tables and partitions is specified by an s3a:// prefix in the LOCATION attribute of CREATE TABLE or ALTER TABLE statements. If you bring data into S3 using the normal S3 transfer mechanisms instead of Impala DML statements, issue a REFRESH statement for the table before using Impala to query the S3 data.

Because of differences between S3 and traditional filesystems, DML operations for S3 tables can take longer than for tables on HDFS. For example, both the LOAD DATA statement and the final stage of the INSERT and CREATE TABLE AS SELECT statements involve moving files from one directory to another. (In the case of INSERT and CREATE TABLE AS SELECT, the files are moved from a temporary staging directory to the final destination directory.) Because S3 does not support a "rename" operation for existing objects, in these cases Impala actually copies the data files from one location to another and then removes the original files. In CDH 5.8 / Impala 2.6, the S3_SKIP_INSERT_STAGING query option provides a way to speed up INSERT statements for S3 tables and partitions, with the tradeoff that a problem during statement execution could leave data in an inconsistent state. It does not apply to INSERT OVERWRITE or LOAD DATA statements. See S3_SKIP_INSERT_STAGING Query Option (CDH 5.8 or higher only) for details.

Manually Loading Data into Impala Tables on S3

As an alternative, or on earlier Impala releases without DML support for S3, you can use the Amazon-provided methods to bring data files into S3 for querying through Impala. See the Amazon S3 web site for details.

Alternative file creation techniques (less compatible with the PURGE clause) include:

After you upload data files to a location already mapped to an Impala table or partition, or if you delete files in S3 from such a location, issue the REFRESH table_name statement to make Impala aware of the new set of data files.

Creating Impala Databases, Tables, and Partitions for Data Stored on S3

Impala reads data for a table or partition from S3 based on the LOCATION attribute for the table or partition. Specify the S3 details in the LOCATION clause of a CREATE TABLE or ALTER TABLE statement. The notation for the LOCATION clause is s3a://bucket_name/path/to/file. The filesystem prefix is always s3a:// because Impala does not support the s3:// or s3n:// prefixes.

For a partitioned table, either specify a separate LOCATION clause for each new partition, or specify a base LOCATION for the table and set up a directory structure in S3 to mirror the way Impala partitioned tables are structured in HDFS. Although, strictly speaking, S3 filenames do not have directory paths, Impala treats S3 filenames with / characters the same as HDFS pathnames that include directories.

You point a nonpartitioned table or an individual partition at S3 by specifying a single directory path in S3, which could be any arbitrary directory. To replicate the structure of an entire Impala partitioned table or database in S3 requires more care, with directories and subdirectories nested and named to match the equivalent directory tree in HDFS. Consider setting up an empty staging area if necessary in HDFS, and recording the complete directory structure so that you can replicate it in S3.

For convenience when working with multiple tables with data files stored in S3, you can create a database with a LOCATION attribute pointing to an S3 path. Specify a URL of the form s3a://bucket/root/path/for/database for the LOCATION attribute of the database. Any tables created inside that database automatically create directories underneath the one specified by the database LOCATION attribute.

For example, the following session creates a partitioned table where only a single partition resides on S3. The partitions for years 2013 and 2014 are located on HDFS. The partition for year 2015 includes a LOCATION attribute with an s3a:// URL, and so refers to data residing on S3, under a specific path underneath the bucket impala-demo.

[localhost:21000] > create database db_on_hdfs;
[localhost:21000] > use db_on_hdfs;
[localhost:21000] > create table mostly_on_hdfs (x int) partitioned by (year int);
[localhost:21000] > alter table mostly_on_hdfs add partition (year=2013);
[localhost:21000] > alter table mostly_on_hdfs add partition (year=2014);
[localhost:21000] > alter table mostly_on_hdfs add partition (year=2015)
                  >   location 's3a://impala-demo/dir1/dir2/dir3/t1';

The following session creates a database and two partitioned tables residing entirely on S3, one partitioned by a single column and the other partitioned by multiple columns. Because a LOCATION attribute with an s3a:// URL is specified for the database, the tables inside that database are automatically created on S3 underneath the database directory. To see the names of the associated subdirectories, including the partition key values, we use an S3 client tool to examine how the directory structure is organized on S3. For example, Impala partition directories such as month=1 do not include leading zeroes, which sometimes appear in partition directories created through Hive.

[localhost:21000] > create database db_on_s3 location 's3a://impala-demo/dir1/dir2/dir3';
[localhost:21000] > use db_on_s3;

[localhost:21000] > create table partitioned_on_s3 (x int) partitioned by (year int);
[localhost:21000] > alter table partitioned_on_s3 add partition (year=2013);
[localhost:21000] > alter table partitioned_on_s3 add partition (year=2014);
[localhost:21000] > alter table partitioned_on_s3 add partition (year=2015);

[localhost:21000] > !aws s3 ls s3://impala-demo/dir1/dir2/dir3 --recursive;
2015-03-17 13:56:34          0 dir1/dir2/dir3/
2015-03-17 16:43:28          0 dir1/dir2/dir3/partitioned_on_s3/
2015-03-17 16:43:49          0 dir1/dir2/dir3/partitioned_on_s3/year=2013/
2015-03-17 16:43:53          0 dir1/dir2/dir3/partitioned_on_s3/year=2014/
2015-03-17 16:43:58          0 dir1/dir2/dir3/partitioned_on_s3/year=2015/

[localhost:21000] > create table partitioned_multiple_keys (x int)
                  >   partitioned by (year smallint, month tinyint, day tinyint);
[localhost:21000] > alter table partitioned_multiple_keys
                  >   add partition (year=2015,month=1,day=1);
[localhost:21000] > alter table partitioned_multiple_keys
                  >   add partition (year=2015,month=1,day=31);
[localhost:21000] > alter table partitioned_multiple_keys
                  >   add partition (year=2015,month=2,day=28);

[localhost:21000] > !aws s3 ls s3://impala-demo/dir1/dir2/dir3 --recursive;
2015-03-17 13:56:34          0 dir1/dir2/dir3/
2015-03-17 16:47:13          0 dir1/dir2/dir3/partitioned_multiple_keys/
2015-03-17 16:47:44          0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=1/day=1/
2015-03-17 16:47:50          0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=1/day=31/
2015-03-17 16:47:57          0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=2/day=28/
2015-03-17 16:43:28          0 dir1/dir2/dir3/partitioned_on_s3/
2015-03-17 16:43:49          0 dir1/dir2/dir3/partitioned_on_s3/year=2013/
2015-03-17 16:43:53          0 dir1/dir2/dir3/partitioned_on_s3/year=2014/
2015-03-17 16:43:58          0 dir1/dir2/dir3/partitioned_on_s3/year=2015/

The CREATE DATABASE and CREATE TABLE statements create the associated directory paths if they do not already exist. You can specify multiple levels of directories, and the CREATE statement creates all appropriate levels, similar to using mkdir -p.

Use the standard S3 file upload methods to actually put the data files into the right locations. You can also put the directory paths and data files in place before creating the associated Impala databases or tables, and Impala automatically uses the data from the appropriate location after the associated databases and tables are created.

You can switch whether an existing table or partition points to data in HDFS or S3. For example, if you have an Impala table or partition pointing to data files in HDFS or S3, and you later transfer those data files to the other filesystem, use an ALTER TABLE statement to adjust the LOCATION attribute of the corresponding table or partition to reflect that change. Because Impala does not have an ALTER DATABASE statement, this location-switching technique is not practical for entire databases that have a custom LOCATION attribute.

Internal and External Tables Located on S3

Just as with tables located on HDFS storage, you can designate S3-based tables as either internal (managed by Impala) or external, by using the syntax CREATE TABLE or CREATE EXTERNAL TABLE respectively. When you drop an internal table, the files associated with the table are removed, even if they are on S3 storage. When you drop an external table, the files associated with the table are left alone, and are still available for access by other tools or components. See Overview of Impala Tables for details.

If the data on S3 is intended to be long-lived and accessed by other tools in addition to Impala, create any associated S3 tables with the CREATE EXTERNAL TABLE syntax, so that the files are not deleted from S3 when the table is dropped.

If the data on S3 is only needed for querying by Impala and can be safely discarded once the Impala workflow is complete, create the associated S3 tables using the CREATE TABLE syntax, so that dropping the table also deletes the corresponding data files on S3.

For example, this session creates a table in S3 with the same column layout as a table in HDFS, then examines the S3 table and queries some data from it. The table in S3 works the same as a table in HDFS as far as the expected file format of the data, table and column statistics, and other table properties. The only indication that it is not an HDFS table is the s3a:// URL in the LOCATION property. Many data files can reside in the S3 directory, and their combined contents form the table data. Because the data in this example is uploaded after the table is created, a REFRESH statement prompts Impala to update its cached information about the data files.

[localhost:21000] > create table usa_cities_s3 like usa_cities location 's3a://impala-demo/usa_cities';
[localhost:21000] > desc usa_cities_s3;
+-------+----------+---------+
| name  | type     | comment |
+-------+----------+---------+
| id    | smallint |         |
| city  | string   |         |
| state | string   |         |
+-------+----------+---------+

-- Now from a web browser, upload the same data file(s) to S3 as in the HDFS table,
-- under the relevant bucket and path. If you already have the data in S3, you would
-- point the table LOCATION at an existing path.

[localhost:21000] > refresh usa_cities_s3;
[localhost:21000] > select count(*) from usa_cities_s3;
+----------+
| count(*) |
+----------+
| 289      |
+----------+
[localhost:21000] > select distinct state from sample_data_s3 limit 5;
+----------------------+
| state                |
+----------------------+
| Louisiana            |
| Minnesota            |
| Georgia              |
| Alaska               |
| Ohio                 |
+----------------------+
[localhost:21000] > desc formatted usa_cities_s3;
+------------------------------+------------------------------+---------+
| name                         | type                         | comment |
+------------------------------+------------------------------+---------+
| # col_name                   | data_type                    | comment |
|                              | NULL                         | NULL    |
| id                           | smallint                     | NULL    |
| city                         | string                       | NULL    |
| state                        | string                       | NULL    |
|                              | NULL                         | NULL    |
| # Detailed Table Information | NULL                         | NULL    |
| Database:                    | s3_testing                   | NULL    |
| Owner:                       | jrussell                     | NULL    |
| CreateTime:                  | Mon Mar 16 11:36:25 PDT 2015 | NULL    |
| LastAccessTime:              | UNKNOWN                      | NULL    |
| Protect Mode:                | None                         | NULL    |
| Retention:                   | 0                            | NULL    |
| Location:                    | s3a://impala-demo/usa_cities | NULL    |
| Table Type:                  | MANAGED_TABLE                | NULL    |
...
+------------------------------+------------------------------+---------+

In this case, we have already uploaded a Parquet file with a million rows of data to the sample_data directory underneath the impala-demo bucket on S3. This session creates a table with matching column settings pointing to the corresponding location in S3, then queries the table. Because the data is already in place on S3 when the table is created, no REFRESH statement is required.

[localhost:21000] > create table sample_data_s3
                  > (id int, id bigint, val int, zerofill string,
                  > name string, assertion boolean, city string, state string)
                  > stored as parquet location 's3a://impala-demo/sample_data';
[localhost:21000] > select count(*) from sample_data_s3;;
+----------+
| count(*) |
+----------+
| 1000000  |
+----------+
[localhost:21000] > select count(*) howmany, assertion from sample_data_s3 group by assertion;
+---------+-----------+
| howmany | assertion |
+---------+-----------+
| 667149  | true      |
| 332851  | false     |
+---------+-----------+

Running and Tuning Impala Queries for Data Stored on S3

Once the appropriate LOCATION attributes are set up at the table or partition level, you query data stored in S3 exactly the same as data stored on HDFS or in HBase:

  • Queries against S3 data support all the same file formats as for HDFS data.
  • Tables can be unpartitioned or partitioned. For partitioned tables, either manually construct paths in S3 corresponding to the HDFS directories representing partition key values, or use ALTER TABLE ... ADD PARTITION to set up the appropriate paths in S3.
  • HDFS and HBase tables can be joined to S3 tables, or S3 tables can be joined with each other.
  • Authorization using the Sentry framework to control access to databases, tables, or columns works the same whether the data is in HDFS or in S3.
  • The catalogd daemon caches metadata for both HDFS and S3 tables. Use REFRESH and INVALIDATE METADATA for S3 tables in the same situations where you would issue those statements for HDFS tables.
  • Queries against S3 tables are subject to the same kinds of admission control and resource management as HDFS tables.
  • Metadata about S3 tables is stored in the same metastore database as for HDFS tables.
  • You can set up views referring to S3 tables, the same as for HDFS tables.
  • The COMPUTE STATS, SHOW TABLE STATS, and SHOW COLUMN STATS statements work for S3 tables also.

Understanding and Tuning Impala Query Performance for S3 Data

Although Impala queries for data stored in S3 might be less performant than queries against the equivalent data stored in HDFS, you can still do some tuning. Here are techniques you can use to interpret explain plans and profiles for queries against S3 data, and tips to achieve the best performance possible for such queries.

All else being equal, performance is expected to be lower for queries running against data on S3 rather than HDFS. The actual mechanics of the SELECT statement are somewhat different when the data is in S3. Although the work is still distributed across the datanodes of the cluster, Impala might parallelize the work for a distributed query differently for data on HDFS and S3. S3 does not have the same block notion as HDFS, so Impala uses heuristics to determine how to split up large S3 files for processing in parallel. Because all hosts can access any S3 data file with equal efficiency, the distribution of work might be different than for HDFS data, where the data blocks are physically read using short-circuit local reads by hosts that contain the appropriate block replicas. Although the I/O to read the S3 data might be spread evenly across the hosts of the cluster, the fact that all data is initially retrieved across the network means that the overall query performance is likely to be lower for S3 data than for HDFS data.

In CDH 5.8 / Impala 2.6 and higher, Impala queries are optimized for files stored in Amazon S3. For Impala tables that use the file formats Parquet, RCFile, SequenceFile, Avro, and uncompressed text, the setting fs.s3a.block.size in the core-site.xml configuration file determines how Impala divides the I/O work of reading the data files. This configuration setting is specified in bytes. By default, this value is 33554432 (32 MB), meaning that Impala parallelizes S3 read operations on the files as if they were made up of 32 MB blocks. For example, if your S3 queries primarily access Parquet files written by MapReduce or Hive, increase fs.s3a.block.size to 134217728 (128 MB) to match the row group size of those files. If most S3 queries involve Parquet files written by Impala, increase fs.s3a.block.size to 268435456 (256 MB) to match the row group size produced by Impala.

Because of differences between S3 and traditional filesystems, DML operations for S3 tables can take longer than for tables on HDFS. For example, both the LOAD DATA statement and the final stage of the INSERT and CREATE TABLE AS SELECT statements involve moving files from one directory to another. (In the case of INSERT and CREATE TABLE AS SELECT, the files are moved from a temporary staging directory to the final destination directory.) Because S3 does not support a "rename" operation for existing objects, in these cases Impala actually copies the data files from one location to another and then removes the original files. In CDH 5.8 / Impala 2.6, the S3_SKIP_INSERT_STAGING query option provides a way to speed up INSERT statements for S3 tables and partitions, with the tradeoff that a problem during statement execution could leave data in an inconsistent state. It does not apply to INSERT OVERWRITE or LOAD DATA statements. See S3_SKIP_INSERT_STAGING Query Option (CDH 5.8 or higher only) for details.

When optimizing aspects of for complex queries such as the join order, Impala treats tables on HDFS and S3 the same way. Therefore, follow all the same tuning recommendations for S3 tables as for HDFS ones, such as using the COMPUTE STATS statement to help Impala construct accurate estimates of row counts and cardinality. See Tuning Impala for Performance for details.

In query profile reports, the numbers for BytesReadLocal, BytesReadShortCircuit, BytesReadDataNodeCached, and BytesReadRemoteUnexpected are blank because those metrics come from HDFS. If you do see any indications that a query against an S3 table performed "remote read" operations, do not be alarmed. That is expected because, by definition, all the I/O for S3 tables involves remote reads.

Restrictions on Impala Support for S3

Impala requires that the default filesystem for the cluster be HDFS. You cannot use S3 as the only filesystem in the cluster.

Prior to CDH 5.8 / Impala 2.6, Impala could not perform DML operations (INSERT, LOAD DATA, or CREATE TABLE AS SELECT) where the destination is a table or partition located on an S3 filesystem. This restriction is lifted in CDH 5.8 / Impala 2.6 and higher.

Impala does not support the old s3:// block-based and s3n:// filesystem schemes, only s3a://.

Although S3 is often used to store JSON-formatted data, the current Impala support for S3 does not include directly querying JSON data. For Impala queries, use data files in one of the file formats listed in How Impala Works with Hadoop File Formats. If you have data in JSON format, you can prepare a flattened version of that data for querying by Impala as part of your ETL cycle.

You cannot use the ALTER TABLE ... SET CACHED statement for tables or partitions that are located in S3.

Best Practices for Using Impala with S3

The following guidelines represent best practices derived from testing and field experience with Impala on S3:

  • Any reference to an S3 location must be fully qualified. (This rule applies when S3 is not designated as the default filesystem.)

  • Set the safety valve fs.s3a.connection.maximum to 1500 for impalad.

  • Set safety valve fs.s3a.block.size to 134217728 (128 MB in bytes) if most Parquet files queried by Impala were written by Hive or ParquetMR jobs. Set the block size to 268435456 (256 MB in bytes) if most Parquet files queried by Impala were written by Impala.

  • DROP TABLE .. PURGE is much faster than the default DROP TABLE. The same applies to ALTER TABLE ... DROP PARTITION PURGE versus the default DROP PARTITION operation. However, due to the eventually consistent nature of S3, the files for that table or partition could remain for some unbounded time when using PURGE. The default DROP TABLE/PARTITION is slow because Impala copies the files to the HDFS trash folder, and Impala waits until all the data is moved. DROP TABLE/PARTITION .. PURGE is a fast delete operation, and the Impala statement finishes quickly even though the change might not have propagated fully throughout S3.

  • INSERT statements are faster than INSERT OVERWRITE for S3. The query option S3_SKIP_INSERT_STAGING, which is set to true by default, skips the staging step for regular INSERT (but not INSERT OVERWRITE). This makes the operation much faster, but consistency is not guaranteed: if a node fails during execution, the table could end up with inconsistent data. Set this option to false if stronger consistency is required, however this setting will make the INSERT operations slower.

  • Too many files in a table can make metadata loading and updating slow on S3. If too many requests are made to S3, S3 has a back-off mechanism and responds slower than usual. You might have many small files because of:

    • Too many partitions due to over-granular partitioning. Prefer partitions with many megabytes of data, so that even a query against a single partition can be parallelized effectively.

    • Many small INSERT queries. Prefer bulk INSERTs so that more data is written to fewer files.