When using the Kafka connector, you can choose between using an internal or external
Kafka service. Based on the connector type you choose, there are mandatory fields where you must
provide the correct information.
You can choose from the following Kafka connectors when creating a table in Streaming SQL
Console:
- Template: local-kafka
- Using the Kafka service that is installed on your cluster.
- Type: source/sink
- The following fields are mandatory to use the connector:
scan.startup.mode
: Startup mode for the Kafka consumer.
group-offsets
is the default value. You can choose from
earliest-offset
, latest-offset
, timestamp
and specific-offsets
as startup mode.
topic
: The topic from which data is read as a soruce, or the topic to
which data is written to. No default value is specified. You can also add a topic list in
case of sources. In this case, you need to separate the topics by semicolon. You can only
specify the topic-pattern
or topic
for the sources.
format
: The format used to deserialize and serialize the value part of
Kafka messages. No default value is specified. You can use either the format
or the value.format
option.
- Template: kafka
- Using an external Kafka service as a connector. To connect to the external Kafka service,
you need to speficy the Kafka brokers that are used in your deployment.
- Type: source/sink
- The following fields are mandatory to use the connector:
properties.bootstrap.servers
: Specifying a list of Kafka brokers that are
separated by comma. No default value is specified.
topic
: The topic from which data is read as a soruce, or the topic to
which data is written to. No default value is specified. You can also add a topic list in
case of sources. In this case, you need to separate the topics by semicolon. You can only
specify the topic-pattern
or topic
for the sources.
format
: The format used to deserialize and serialize the value part of
Kafka messages. No default value is specified. You can use either the format
or the value.format
option.
- Template: upsert-kafka
- Using the upsert Kafka service as a connector. For more information about the upsert Kafka
connector, see the Apache Flink documentation.
- Type: source/sink
-
properties.bootstrap.servers
: Specifying a list of Kafka brokers that are
separated by comma. No default value is specified.
topic
: The topic from which data is read as a soruce, or the topic to
which data is written to. No default value is specified. You can also add a topic list in
case of sources. In this case, you need to separate the topics by semicolon. You can only
specify the topic-pattern
or topic
for the sources.
key.format
: The format used to deserialize and serialize the key part of
Kafka messages. No default value is specified. Compared to the regular Kafka connector, the
key fields are specified by the PRIMARY KEY
syntax.
value.format
: The format used to deserialize and serialize the value part
of Kafka messages. No default value is specified. You can use either the
format
or the value.format
option.
Using the Kafka connectors
You can access and import the templates of the Kafka connectors from Streaming SQL
Console:
- Navigate to the Streaming SQL Console.
- Go to your cluster in Cloudera Manager.
- Click on SQL Stream Builder from the list of Services.
- Click on the SQLStreamBuilder Console.
The Streaming SQL Console
opens in a new window.
- Select Console from the main menu.
- Click Templates under the SQL window.
- Select one of the Kafka templates you want to use.
The template is imported to the SQL
window.
- Provide information to the mandatory fields of the template.
- Click Execute.