Kafka connectors

When using the Kafka connector, you can choose between using an internal or external Kafka service. Based on the connector type you choose, there are mandatory fields where you must provide the correct information.

You can choose from the following Kafka connectors when creating a table in Streaming SQL Console:

Template: local-kafka
Automatically using the Kafka service that is registered in the Data Sources, and runs on the same cluster as the SQL Stream Builder service. You can choose between JSON, Avro and CSV data types.
Type: source/sink
The following fields are mandatory to use the connector:
  • scan.startup.mode: Startup mode for the Kafka consumer. group-offsets is the default value. You can choose from earliest-offset, latest-offset, timestamp and specific-offsets as startup mode.
  • topic: The topic from which data is read as a source, or the topic to which data is written to. No default value is specified. You can also add a topic list in case of sources. In this case, you need to separate the topics by semicolon. You can only specify the topic-pattern or topic for the sources.
  • format: The format used to deserialize and serialize the value part of Kafka messages. No default value is specified. You can use either the format or the value.format option.
Template: kafka
Using an external Kafka service as a connector. To connect to the external Kafka service, you need to speficy the Kafka brokers that are used in your deployment.
Type: source/sink
The following fields are mandatory to use the connector:
  • properties.bootstrap.servers: Specifying a list of Kafka brokers that are separated by comma. No default value is specified.
  • topic: The topic from which data is read as a soruce, or the topic to which data is written to. No default value is specified. You can also add a topic list in case of sources. In this case, you need to separate the topics by semicolon. You can only specify the topic-pattern or topic for the sources.
  • format: The format used to deserialize and serialize the value part of Kafka messages. No default value is specified. You can use either the format or the value.format option.
Template: upsert-kafka
Connecting to a Kafka service in the upsert mode. This means that when using it as a source, the connector produces a changelog stream, where each data record represents an update or delete event. The value in the data records is interpreted as an update of the last value for the same key. When using the table as a sink, the connector can consume a changelog stream, and write insert/update_after data as normal Kafka message valuea. Null values are represented as delete. For more information about the upsert Kafka connector, see the Apache Flink documentation.
Type: source/sink
  • properties.bootstrap.servers: Specifying a list of Kafka brokers that are separated by comma. No default value is specified.
  • topic: The topic from which data is read as a soruce, or the topic to which data is written to. No default value is specified. You can also add a topic list in case of sources. In this case, you need to separate the topics by semicolon. You can only specify the topic-pattern or topic for the sources.
  • key.format: The format used to deserialize and serialize the key part of Kafka messages. No default value is specified. Compared to the regular Kafka connector, the key fields are specified by the PRIMARY KEY syntax.
  • value.format: The format used to deserialize and serialize the value part of Kafka messages. No default value is specified. You can use either the format or the value.format option.

Configuring deserialization policy in DDL

You can configure every supported type of Kafka connectors (local-kafka, kafka or upsert) how to handle if a message fails to deserialize which can result in job submission error. You can choose from the following configurations:

Fail
In this case an exception is thrown, and the job submission fails
Ignore
In this case the error message is ignored without any log, and the job submission is successful
Ignore and Log
In this case the error message is ignored, and the job submission is successful
Save to DLQ
In this case the error message is ignored, but you can store it in a dead-letter queue (DLQ) Kafka topic
  1. Choose one of the Kafka template types from Templates.
  2. Select any type of data format.

    The predefined CREATE TABLE statement is imported to the SQL Editor.

  3. Fill out the Kafka template based on your requirements.
  4. Search for the deserialization.failure.policy.
  5. Provide the value for the error handling from the following options:
    1. ‘error’
    2. ‘ignore’
    3. ‘ignore_and_log’
    4. ‘dlq’

      If you choose the dlq option, you need to create a dedicated Kafka topic where you store the error message. In this case, you must provide the name of the created DLQ topic for the deserialization.failure.dlq.topic property.

  6. Click Execute.

For more information about how to configure the error handling using the Kafka wizard, see the Deserialization tab section.