Running your Flink application

After preparing your environment, you need to create a Kudu table, and also connect Flink with Kudu by providing the Kudu master. You also need to choose a source to which you connect Flink in Data Hub. After generating data to your source, Flink applies the computations you have added in your application design. The results are redirected to your Kudu sink.

  • You have a CDP Public Cloud environment.
  • You have a CDP username (it can be your own CDP user or a CDP machine user) and a password set to access Data Hub clusters.

    The predefined resource role of this user is at least EnvironmentUser. This resource role provides the ability to view Data Hub clusters and set the FreeIPA password for the environment.

  • Your user is synchronized to the CDP Public Cloud environment.
  • You have a Streaming Analytics cluster.
  • You have a Real-time Data Mart cluster in the same Data Hub environment as the Streaming Analytics cluster.
  • Your CDP user has the correct permissions set up in Ranger allowing access to Kudu.
  • You obtained the Kudu Master hosts:
    1. Go to Management Console > Environments.
    2. Search for your environment from the list of available environments.
    3. Select the Data Hub cluster within your environment from the list of available clusters.
    4. Select Kudu Master from the list of Services.
    5. Click Masters.
    6. Copy the host information from the list of Live Masters.
  1. Create your Kudu tables:
    • Use Impala from the Real-time Data Mart cluster to create a table in Kudu.

      For more information about how to create a Kudu table from Impala, see the official documentation.

    • Use the KuduCatalog to create a table in Kudu.
      KuduTableInfo tableInfo = KuduTableInfo
          .forTable("ExampleTable")
          .createTableIfNotExists(
              () ->
                  Lists.newArrayList(
                      new ColumnSchema
                          .ColumnSchemaBuilder("first", Type.INT32)
                          .key(true)
                          .build(),
                      new ColumnSchema
                          .ColumnSchemaBuilder("second", Type.STRING)
                          .build()
                  ),
              () -> new CreateTableOptions()
                  .setNumReplicas(1)
                  .addHashPartitions(Lists.newArrayList("first"), 2));
      
      catalog.createTable(tableInfo, false);
  2. Choose and add a source to your Flink application.
  3. Add Kudu as sink to your Flink application.
    The following code example shows how to build your application logic with a Kudu sink:
    
    KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(KUDU_MASTERS).build();
    
    KuduSink<Row> sink = new KuduSink<>(
        writerConfig,
        KuduTableInfo.forTable("AlreadyExistingTable"),
        new RowOperationMapper<>(
                new String[]{"col1", "col2", "col3"},
                AbstractSingleOperationMapper.KuduOperation.UPSERT)
    )
  4. Start generating data to your source connector.
  5. Deploy your Flink streaming application.
  6. Open the Kudu UI to see the generated table.
You have the following options to monitor and manage your Flink applications: