Adding Catalogs

You need to add Cloudera Schema Registry, Kudu, Hive, Confluent Schema Registry or other services as a Catalog using the Streaming SQL Console in SQL Stream Builder (SSB) to use them with Flink DDL.

  • Make sure that you have the required service on your cluster.
  • Make sure that you have the right permissions set in Ranger for SSB and the services.
  1. Navigate to the Streaming SQL Console.
    1. Go to your cluster in Cloudera Manager.
    2. Select SQL Stream Builder from the list of services.
    3. Click SQLStreamBuilder Console.
      The Streaming SQL Console opens in a new window.
  2. Open a project from the Projects page of Streaming SQL Console.
    1. Select an already existing project from the list by clicking the Open button or Switch button.
    2. Create a new project by clicking the New Project button.
    3. Import a project by clicking the Import button.
    You are redirected to the Explorer view of the project.
  3. Open Data Sources from the Explorer view.
  4. Click next to Catalogs.
  5. Select New Catalog.
    The Add Catalog window appears.
  6. Select the Catalog Type from the following options:
    1. Add a Name to your catalog.
    2. Select Schema Registry from the Catalog Type drop-down.
    3. Select the Kafka cluster you registered as Data Source.
    4. Enable TLS, if needed for the communication.
      1. If you enabled TLS, provide the Schema Registry Truststore location and password to the SR TrustStore and SR TrustStore Password field.
    5. Add the Schema Registry URL.
      1. Go to your cluster in Cloudera Manager.
      2. Select Schema Registry from the list of services.
      3. Click on Instances.
      4. Copy the Hostname of Schema Registry.
      5. Add the default port of Schema Registry after the hostname.
        Example:
        http://docs-test-1.vpc.cloudera.com:7788/api/v1
    6. Optional: Specify a Schema key suffix and a Schema value suffix.

      By default, the Schema Registry catalog looks for a topic in Kafka with the same name as the schemas stored in Cloudera Schema Registry. It uses the schema to deserialize the payload of the messages in that topic and ignores the message key. If you have different schemas to deserialize message key and payload you can define suffixes to differentiate them.For example, if you configure the catalog's schema key suffix as -key and the schema value suffix as -value, and you have schemas stored in Cloudera Schema Registry named example-key and example-value. The SSB catalog will automatically create a table called example and use those schemas to deserialize the key and the payload of the messages, respectively.

    7. Optional: Add a Prefix for key fields in schema.
      You can set a prefix for key field names to avoid table creation failure as the column names in the Flink schema are created based on the field names of the Avro schema. Table creation fails if the key and value schemas contain overlapping fields. The provided prefix will be assigned to the key fields of the Flink table as shown in the following example:
      CREATE TABLE … (k_a INT, k_b INT, b INT, c INT0 with (..., ‘key.fields’=’k_a;k_b’)
    1. Select Kudu from the Catalog Type drop-down.
    2. Add the host URL of Kudu Masters.
      1. Go to your cluster in Cloudera Manager.
      2. Select Kudu from the list of services.
      3. Click on Instances.
      4. Copy the Hostname of the Master Default Group.
      5. Add the default port of Kudu after the hostname.
        Example:
        docs-test-1.vpc.cloudera.com:7051
    1. Select Hive from the Catalog Type drop-down.
    2. Provide a Name to the Hive catalog created in SSB.
    3. Provide the name of the Default Database in Hive.
    1. Select Confluent Schema Registry from the Catalog Type drop-down.
    2. Add a Name to your catalog.
    3. Select the Kafka cluster you registered as Data Source.
    4. Add the Confluent Cloud Schema Registry URL provided by your Confluent connection configurations.
    5. Choose an Authentication method from the following:
      BASIC
      If the basic authentication method is selected, you must provide the username and password.
      CUSTOM
      Custom authentication methods include bearer authentication, SSL authentication and so on. If the custom authentication method is selected, you must provide the authentication property keys to the Custom Properties field as they are set for the Confluent Cloud Schema Registry. For example, if the Confluent Cloud Schema Registry is configured with bearer authentication, you need to provide the bearer.auth.credentials.source and bearer.auth.token. If SSL authentication is configured, the schema.registry.ssl.keystore.location, schema.registry.ssl.keystore.password and so on must be provided.
    1. Select Custom from the Catalog Type drop-down.
    2. Provide a Property Key.
    3. Proivde a Property Value.

      If needed, you can specify more custom properties by using the plus icon.

  7. Click on Add Filter.
    1. Provide a Database and Table filter if you want to select specific tables to use from the catalog.
  8. Click on Validate.
  9. If the validation is successful, click Create.
You are ready to use the added catalog in SSB with Flink DDL. The already existing schemas in Schema Registry and Confluent Schema Registry, tables in Kudu and Hive are automatically imported to SSB.
After registering a catalog, you can edit, duplicate and delete it from Streaming SQL Console:
  1. Open Data Sources from the Explorer view.
  2. Click next to Catalogs.
  3. Select Manage.

    The Catalogs tab opens where the registered catalogs are listed. You have the following options to manage the catalog sources:

    • Click on one of the existing catalogs to edit its configurations.
    • Click to remove the catalog.
    • Click to duplicate the catalog with its configurations.