Transformations tab

When using the Add Kafka table wizard on the Streaming SQL Console, you can apply input tranformations under the Transformations tab.

You can apply input transformations on your data when adding a Kafka table as a source to your queries. Input transformations can be used to clean or arrange the incoming data from the source using javascript functions.

For more information about Input Transform, see the Creating input transformations section.

Kafka record metadata access

There are cases when it is required to access additional metadata from the Kafka record to implement the correct processing logic. SQL Stream Builder has access to this information using the Input Transforms functionality. For more information about Input Transform, see the Creating input transformations section.

The following attributes are supported in the headers:
record.topic
record.key
record.value
record.headers
record.offset
record.partition
For example, an input transformation can be expressed as the following:
var out = JSON.parse(record);
out['topic'] = message.topic;
out['partition'] = message.partition;
JSON.stringify(out);
For which you define a schema manually, or use the Detect Schema feature:
{
  "name": "myschema",
  "type": "record",
  "namespace": "com.cloudera.test",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "topic",
      "type": "string"
    },
    {
      "name": "partition",
      "type": "string"
    }
  ]
}
The attribute record.headers is an array that can be iterated over:
var out = JSON.parse(record);
var header = JSON.parse(record.headers);
var interested_keys = ['DC'];               // should match schema definition

out['topic'] = record.topic;
out['partition'] = record.partition;

Object.keys(header).forEach(function(key) {
    if (interested_keys.indexOf(key) > -1){  // if match found for schema, set value
        out[key] = header[key];
    }
});

JSON.stringify(out);
For which you define a schema as follows:
{
  "name": "myschema",
  "type": "record",
  "namespace": "com.cloudera.test",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "topic",
      "type": "string"
    },
    {
      "name": "partition",
      "type": "string"
    },
    {
      "name": "DC",
      "type": "string"
    }
  ]
}