Kafka connectors

When using the Kafka connector, you can choose between using an internal or external Kafka service. Based on the connector type you choose, there are mandatory fields where you must provide the correct information.

You can choose from the following Kafka connectors when creating a table in Streaming SQL Console:

Template: local-kafka
Using the Kafka service that is installed on your cluster.
Type: source/sink
The following fields are mandatory to use the connector:
  • scan.startup.mode: Startup mode for the Kafka consumer. group-offsets is the default value. You can choose from earliest-offset, latest-offset, timestamp and specific-offsets as startup mode.
  • topic: The topic from which data is read as a soruce, or the topic to which data is written to. No default value is specified. You can also add a topic list in case of sources. In this case, you need to separate the topics by semicolon. You can only specify the topic-pattern or topic for the sources.
  • format: The format used to deserialize and serialize the value part of Kafka messages. No default value is specified. You can use either the format or the value.format option.
Template: kafka
Using an external Kafka service as a connector. To connect to the external Kafka service, you need to speficy the Kafka brokers that are used in your deployment.
Type: source/sink
The following fields are mandatory to use the connector:
  • properties.bootstrap.servers: Specifying a list of Kafka brokers that are separated by comma. No default value is specified.
  • topic: The topic from which data is read as a soruce, or the topic to which data is written to. No default value is specified. You can also add a topic list in case of sources. In this case, you need to separate the topics by semicolon. You can only specify the topic-pattern or topic for the sources.
  • format: The format used to deserialize and serialize the value part of Kafka messages. No default value is specified. You can use either the format or the value.format option.
Template: upsert-kafka
Using the upsert Kafka service as a connector. For more information about the upsert Kafka connector, see the Apache Flink documentation.
Type: source/sink
  • properties.bootstrap.servers: Specifying a list of Kafka brokers that are separated by comma. No default value is specified.
  • topic: The topic from which data is read as a soruce, or the topic to which data is written to. No default value is specified. You can also add a topic list in case of sources. In this case, you need to separate the topics by semicolon. You can only specify the topic-pattern or topic for the sources.
  • key.format: The format used to deserialize and serialize the key part of Kafka messages. No default value is specified. Compared to the regular Kafka connector, the key fields are specified by the PRIMARY KEY syntax.
  • value.format: The format used to deserialize and serialize the value part of Kafka messages. No default value is specified. You can use either the format or the value.format option.

Using the Kafka connectors

You can access and import the templates of the Kafka connectors from Streaming SQL Console:

  1. Navigate to the Streaming SQL Console.
    1. Navigate to Management Console > Environments, and select the environment where you have created your cluster.
    2. Select the Streaming Analytics cluster from the list of Data Hub clusters.
    3. Select Streaming SQL Console from the list of services.

      The Streaming SQL Console opens in a new window.

  2. Select Console from the main menu.
  3. Click Templates under the SQL window.
  4. Select one of the Kafka templates you want to use.

    The template is imported to the SQL window.

  5. Provide information to the mandatory fields of the template.
  6. Click Execute.