Creating Iceberg tables
Apache Iceberg is an open, high-performance table format for organizing datasets that can contain petabytes of data. Iceberg can be used to add tables to computing engines, such as Apache Hive and Apache Flink, from which the data can be queried using SQL.
As Iceberg is integrated as a connector to Flink, you can use the table format the same way for Cloudera SQL Stream Builder. Both the V1 and V2 version specifics are supported by the Flink connector.For more information, about the version changes, see the Apache Iceberg documentation.
| Feature | SSB |
|---|---|
| Create catalog | Supported |
| Create database | Supported |
| Create table | Supported |
| Alter table | Supported |
| Drop table | Supported |
| Select | Supported |
| Insert into | Supported |
| Insert overwrite | Supported |
| Metadata tables | Supported |
| Rewrite files action | Supported |
| Upsert | Technical preview1 |
| Equality delete | Technical preview2 |
Enabling checkpointing for streaming writes
When writing data to an Iceberg table in a streaming fashion using INSERT INTO or INSERT OVERWRITE, you must enable checkpointing in your Flink job. The Flink Iceberg sink commits the data only when checkpoints are completed. Without checkpointing, your data will not be written to the Iceberg table.
You can enable checkpointing in the Streaming SQL Console by setting the execution.checkpointing.interval property at the beginning of your script:
-- Set the checkpointing interval to 10 seconds
SET 'execution.checkpointing.interval' = '10s';
-- Insert data from a source table into your Iceberg table
INSERT INTO your_iceberg_table
SELECT * FROM your_source_table;
If you are using the programmatic Table API with Python, you can set the property on the execution environment:
env.get_config().set("execution.checkpointing.interval", "10 s")
Using Hive for Iceberg integration
When using the Hive service located on your cluster, you can add it as a catalog on Streaming SQL Console. Before creating the Iceberg table, ensure that you have added Hive as a catalog using the steps described in documentation.
CREATE
TABLE statement as the example shows
below:CREATE TABLE ‘ssb’.’ssb_default’.’iceberg_hive’ (
‘column_int’ INT,
‘column_str’ STRING,
) WITH (
‘connector’ = ‘iceberg’,
‘catalog-database’ = ‘test_db’,
‘catalog-type’ = ‘hive’,
‘catalog-name’ = ‘iceberg_hive_catalog’,
‘catalog-table’ = ‘iceberg_hive_table’,
‘ssb-hive-catalog’ = ‘ssb_hive_catalog’
‘engine.hive.enabled’ = ‘true’
)
| Property | Example | Description |
|---|---|---|
catalog-database |
test_db |
The Iceberg database name in the backend catalog, uses the current Flink database name by default. It will be created automatically if it does not exist when writing records into the Flink table |
catalog-type |
hive |
Type of the catalog |
catalog-name |
iceberg_hive_catalog |
User-specified catalog name. It is required as the connector does not have any default value. |
catalog-table |
iceberg_hive_table |
Name of the Iceberg table in the backend catalog. |
ssb-hive-catalog |
ssb_hive_catalog |
The name of the Hive catalog that you have registered in SSB. The configuration can be used when creating Iceberg catalogs as well. |
