Configuring Kafka tables

The user defined Kafka table can be configured based on the schema, event time, input transformations and other Kafka specific properties.

Schema tab

Schema is defined for a given Kafka source when the source is created. The data contained in the Kafka topic can either be in JSON or AVRO format.

When specifying a schema you can either paste it to the Schema Definition field or click the Detect schema button to identify the schema used on the generated data. The Detect Schema functionality is only available for JSON data.

If the schema of the Kafka table where the output data is queried is not known at the time of adding the table, you can select the Dynamic Schema option. This is useful when you want to insert data to the table, and there is no guarantee that the input schema matches with the output schema. If you select the Dynamic Schema option when adding a table, you can only use that table as a sink.

Event Time tab

You can specify Watermark Definitions when adding a Kafka table. Watermarks use an event time attribute and have a watermark strategy, and can be used for various time-based operations. The Event Time tab provides the following properties to configure the event time field and watermark for the Kafka stream:
  • Input Timestamp Column: name of the timestamp column in the Kafka topic from where the watermarks are mapped to the Event Time Column of the Kafka table
  • Event Time Column: default or custom name of the resulting timestamp column where the watermarks are going to be mapped in the created Kafka table
  • Watermark seconds: number of seconds used in the watermark strategy. The watermark is defined by the current event timestamp minus this value.
You have the following options to configure the watermark strategy for the Kafka tables:
  • Using the default Kafka Timestamps setting
  • Using the default Kafka Timestamps setting, but providing custom name for the Event Time Column
  • Not using the default Kafka Timestamps setting, and providing all of the Kafka timestamp information manually
  • Not using watermark strategy for the Kafka table

Using the default Kafka Timestamp setting

By default, the Use Kafka Timestamps checkbox is selected when you create the Kafka table. In the Event Time Column, the new event time field is extracted from the Kafka message header with the 'eventTimestamp' predefined column name.

After saving your changes, you can view the created DDL syntax for the Table on the right side under the DDL tab. You can review the generated watermark strategy for your table that was set on the Watermark Definition tab.

The following DDL example shows the default setting of the Event Time Column and Watermark Seconds where the corresponding fields were not modified.
'eventTimestamp' TIMESTAMPS(3) METADATA FROM 'timestamp',
WATERMARK FOR 'eventTimestamp' AS 'eventTimestamp' - INTERVAL '3' SECOND

Using the default Kafka Timestamp setting with custom Event Time Column name

When you want to modify the timestamp field of the DDL from the stream itself, you must provide a custom name of the Event Time Column. You can also add a custom value to the Watermark Seconds. The following example shows that 'ets' is the custom name for the Event Time Column, and '4' is the custom value for the Watermark Seconds.
The Event Time Column can only be modified if the following requirements are met for the timestamp field of the Input Timestamp Column:
  • The column type must be "long".
  • The format must be in epoch (in milliseconds since January 1, 1970).
The DDL syntax should reflect the changes made for the watermark strategy as shown in the following example:
'ets' TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR 'ets' AS 'ets' - INTERVAL '4' SECOND

Manually providing the Kafka timestamp information

When you want to manually configure the watermark strategy of the Kafka table, you can provide the timestamp column name from the Kafka source, and add a custom column name for the resulting Kafka table. Make sure that you provide the correct column name for the Input Timestamp Column that exactly matches the column name in the Kafka source data.

To manually provide information for the watermark strategy, unselect the Use Kafka Timestamps checkbox, and provide the following information to the column name fields:
  • Input Timestamp Column: name of the timestamp field in the Kafka source
  • Event Time Column: predefined 'eventTimestamp' name or custome column name of the timestamp field in the created Kafka table
As an example, you have a timestamp column in the source Kafka topic named as 'ts', and want to add a new timestamp column in your Kafka table as 'event_time'. You provide the original timestamp column name in the Input Timestamp Column as 'ts', and add the custom 'event_time' name to the Event Time Column.
This results in that the watermarks from the 'ts' column is going to be mapped to the 'event_time' column of the created Kafka table. As 'event_time' will become the timestamp column name in the Kafka table, you must use the custom name (in this example the 'event_time') when querying the Kafka stream. This configuration of the timestamp columns are optional.
The Event Time Column can only be modified if the following requirements are met for the timestamp field of the Input Timestamp Column:
  • The column type must be "long".
  • The format must be in epoch (in milliseconds since January 1, 1970).

Not using watermark strategy for Kafka table

In case you do not need any watermark strategies, unselect the Use Kafka Timestamps checkbox, and leave the column and seconds field empty.

Input Transform tab

You can apply input transformations on your data when adding a Kafka table as a source to your queries. Input transformations can be used to clean or arrange the incoming data from the source using javascript functions.

For more information about Input Transform, see the Creating input transformations document.

Properties tab

You can specify certain properties to define your Kafka source in detail. You can also add customized properties additionally to the default ones. To create properties, you need to give a name to the property and provide a value for it, then click Actions.

Assigning Kafka keys in streaming queries

Based on the Sticky Partitioning strategy of Kafka, when null keyed events are sent to a topic, they are randomly distributed in smaller batches within the partitions. As the results of the SQL Stream queries by default do not include a key, when written to a Kafka table, the Sticky Partitioning strategy is used. In many cases, it is useful to have more fine-grained control over how events are distributed within the partitions. You can achieve this in SSB by configuring a custom key based on your specific workload.

If using the Dynamic Kafka sink (created by selecting “Apache Kafka” in the “Add table” drop-down, as shown below), keys can be specified in the output of the query by including the special “_eventKey” column name or alias.

For example:
SELECT sensor_name AS _eventKey --sensor_name becomes the key in the output kafka topic
FROM sensors
WHERE eventTimestamp > current_timestamp;

To configure keys in DDL-defined tables (those that are configured via “Flink DDL” in the “Add table” drop-down), refer to the official Flink Kafka SQL Connector documentation for more information (specifically the key.format and key.fields options).