Configuring Kafka tables
The user defined Kafka table can be configured based on the schema, event time, input transformations and other Kafka specific properties.
Schema is defined for a given Kafka source when the source is created. The data contained in the Kafka topic can either be in JSON or AVRO format.
When specifying a schema you can either paste it to the Schema Definition field or click the Detect schema button to identify the schema used on the generated data. The Detect Schema functionality is only available for JSON data.
Event Time tab
- Input Timestamp Column: name of the timestamp column in the Kafka table from where the event time column is mapped
- Event Time Column: new name of the timestamp column where the watermarks are going to be mapped
- Watermark seconds: number of seconds used in the watermark strategy. The watermark is defined by the current event timestamp minus this value.
If you select Use Kafka Timestamps checkbox,the new event time field is extracted from the Kafka message header.
After saving your changes, you can view the created DDL syntax for the table on the right side under the DDL tab. You can review the generated watermark strategy for your table that was set on the Watermark Definition tab.
The following DDL example shows the default setting of the Event Time Column and Watermark Seconds where the corresponding fields were not modified.
You can modify the timestamp field of the DDL from the stream itself. In this case, you must provide the custom name of the Event Time Column and the Watermark Seconds as shown in the following example:
- The timestamps column type must be "long".
- The timestamp format must be in epoch (in milliseconds since January 1, 1970).
The DDL syntax should reflect the changes made for the watermark strategy as shown in the following example:
In case you do not need any watermark strategies, unselect the Use Kafka Timestamps checkbox, and do not enter any value to the fields.
Input Transform tab
For more information about Input Transform, see the Creating input transformations document.
You can specify certain properties to define your Kafka source in detail. You can also add customized properties additionally to the default ones. To create properties, you need to give a name to the property and provide a value for it, then click Actions.
Assigning Kafka keys in streaming queries
Based on the Sticky Partitioning strategy of Kafka, when null keyed events are sent to a topic, they are randomly distributed in smaller batches within the partitions. As the results of the SQL Stream queries by default do not include a key, when written to a Kafka table, the Sticky Partitioning strategy is used. In many cases, it is useful to have more fine-grained control over how events are distributed within the partitions. You can achieve this in SSB by configuring a custom key based on your specific workload.
If using the Dynamic Kafka sink (created by selecting “Apache Kafka” in the “Add table” drop-down, as shown below), keys can be specified in the output of the query by including the special “_eventKey” column name or alias.
SELECT sensor_name AS _eventKey --sensor_name becomes the key in the output kafka topic FROM sensors WHERE eventTimestamp > current_timestamp;
To configure keys in DDL-defined tables (those that are configured via “Flink DDL” in the “Add table” drop-down), refer to the official Flink Kafka SQL Connector documentation for more information (specifically the key.format and key.fields options).