Kafka connectors
When using the Kafka connector, you can choose between using an internal or external Kafka service. Based on the connector type you choose, there are mandatory fields where you must provide the correct information.
You can choose from the following Kafka connectors when creating a table in Streaming SQL Console:
- Template: local-kafka
- Automatically using the Kafka service that is registered in the Data Sources, and runs on the same cluster as the SQL Stream Builder service. You can choose between JSON, Avro and CSV data types.
- Template: kafka
- Using an external Kafka service as a connector. To connect to the external Kafka service, you need to speficy the Kafka brokers that are used in your deployment.
- Template: upsert-kafka
- Connecting to a Kafka service in the upsert mode. This means that when using it as a source, the connector produces a changelog stream, where each data record represents an update or delete event. The value in the data records is interpreted as an update of the last value for the same key. When using the table as a sink, the connector can consume a changelog stream, and write insert/update_after data as normal Kafka message valuea. Null values are represented as delete. For more information about the upsert Kafka connector, see the Apache Flink documentation.
Configuring deserialization policy in DDL
You can configure every supported type of Kafka connectors (local-kafka, kafka or upsert) how to handle if a message fails to deserialize which can result in job submission error. You can choose from the following configurations:
- Fail
- In this case an exception is thrown, and the job submission fails
- Ignore
- In this case the error message is ignored without any log, and the job submission is successful
- Ignore and Log
- In this case the error message is ignored, and the job submission is successful
- Save to DLQ
- In this case the error message is ignored, but you can store it in a dead-letter queue (DLQ) Kafka topic
- Choose one of the Kafka template types from Templates.
- Select any type of data format.
The predefined
CREATE TABLE
statement is imported to the SQL Editor. - Fill out the Kafka template based on your requirements.
- Search for the
deserialization.failure.policy
. - Provide the value for the error handling from the following options:
‘error’
‘ignore’
‘ignore_and_log’
‘dlq’
If you choose the
dlq
option, you need to create a dedicated Kafka topic where you store the error message. After selecting this option, you need to further provide the name of the created DLQ topic.
- Click Execute.