Data Transformations tab

When using the Add Kafka table wizard on the Streaming SQL Console, you can apply input transformation under the Transformations tab. Input transformations can be used to clean or arrange the incoming data from the source using javascript functions.

Input Transforms are a powerful way to clean, modify, and arrange data that is poorly organized, has changing format, and has data that is not needed or otherwise hard to use. With the Input Transform feature of SQL Stream Builder, you can create a javascript function to transform the data after it has been consumed from a Kafk topic, and before you run SQL queries on the data.

You can use Input Transforms in the following situations:
  • The source is not in your control, for example, data feed from a third-party provider
  • The format is hard to change, for example, a legacy feed, other teams of feeds within your organization
  • The messages are inconsistent
  • The data from the sources do not have uniform keys, or without keys (like nested arrays), but are still in a valid JSON format
  • The schema you want does not match the incoming topic
  • You can use the Input Transforms on Kafka tables that have the following characteristics:
    • Allows one transformation per source.
    • Takes record as a JSON-formatted string input variable. The input is always named record.
    • Emits the output of the last line to the calling JVM. It could be any variable name. In the following example, out and emit is used as a JSON-formatted string.
    A basic input transformation looks like this:
    var out = JSON.parse(record.value);     // record is input, parse JSON formatted string to object
                                                     // add more transformations if needed
    JSON.stringify(out);                             // emit JSON formatted string of object
    

Adding transformation to Kafka tables

When adding a Kafka table using the wizard, you can specify the input transformation on the Data Transformation tab. You have one of the following steps to apply an input transformation:

  1. Add your javascript transformation code to the Data Transformation window.

    Make sure the output of your transform matches the Schema definition detected or defined for the Kafka table.

  2. Click Install default template and schema.

    The Install Default template and schema option fills out the Data Transformation box with a template that you can use to create the Input Transform, and matches the schema with the format.

Kafka record metadata access

There are cases when it is required to access additional metadata from the Kafka record to implement the correct processing logic. SQL Stream Builder has access to this information using the Input Transforms functionality.

The following attributes are supported in the headers:
record.topic
record.key
record.value
record.headers
record.offset
record.partition
For example, an input transformation can be expressed as the following:
var out = JSON.parse(record);
out['topic'] = message.topic;
out['partition'] = message.partition;
JSON.stringify(out);
For which you define a schema manually, or use the Detect Schema feature:
{
  "name": "myschema",
  "type": "record",
  "namespace": "com.cloudera.test",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "topic",
      "type": "string"
    },
    {
      "name": "partition",
      "type": "string"
    }
  ]
}
The attribute record.headers is an array that can be iterated over:
var out = JSON.parse(record);
var header = JSON.parse(record.headers);
var interested_keys = ['DC'];               // should match schema definition

out['topic'] = record.topic;
out['partition'] = record.partition;

Object.keys(header).forEach(function(key) {
    if (interested_keys.indexOf(key) > -1){  // if match found for schema, set value
        out[key] = header[key];
    }
});

JSON.stringify(out);
For which you define a schema as follows:
{
  "name": "myschema",
  "type": "record",
  "namespace": "com.cloudera.test",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "topic",
      "type": "string"
    },
    {
      "name": "partition",
      "type": "string"
    },
    {
      "name": "DC",
      "type": "string"
    }
  ]
}