Data Transformations tab
When using the Add Kafka table wizard on the Streaming SQL Console, you can apply input transformation under the Transformations tab. Input transformations can be used to clean or arrange the incoming data from the source using javascript functions.
You can apply input transformations on your data when adding a Kafka table as a source to your queries. Input transformations can be used to clean or arrange the incoming data from the source using javascript functions.
Input Transforms are a powerful way to clean, modify, and arrange data that is poorly organized, has changing format, and has data that is not needed or otherwise hard to use. With the Input Transform feature of SQL Stream Builder, you can create a javascript function to transform the data after it has been consumed from a Kafk topic, and before you run SQL queries on the data.
- The source is not in your control, for example, data feed from a third-party provider
- The format is hard to change, for example, a legacy feed, other teams of feeds within your organization
- The messages are inconsistent
- The data from the sources do not have uniform keys, or without keys (like nested arrays), but are still in a valid JSON format
- The schema you want does not match the incoming topic
-
You can use the Input Transforms on Kafka tables that have the following characteristics:
- Allows one transformation per source.
- Takes record as a JSON-formatted string input variable. The input is always named record.
- Emits the output of the last line to the calling JVM. It could be any variable name. In the following example, out and emit is used as a JSON-formatted string.
A basic input transformation looks like this:var out = JSON.parse(record.value); // record is input, parse JSON formatted string to object // add more transformations if needed JSON.stringify(out); // emit JSON formatted string of object
Kafka record metadata access
There are cases when it is required to access additional metadata from the Kafka record to implement the correct processing logic. SQL Stream Builder has access to this information using the Input Transforms functionality.
record.topic
record.key
record.value
record.headers
record.offset
record.partition
var out = JSON.parse(record);
out['topic'] = message.topic;
out['partition'] = message.partition;
JSON.stringify(out);
{
"name": "myschema",
"type": "record",
"namespace": "com.cloudera.test",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "topic",
"type": "string"
},
{
"name": "partition",
"type": "string"
}
]
}
record.headers
is an array that can be iterated
over:var out = JSON.parse(record);
var header = JSON.parse(record.headers);
var interested_keys = ['DC']; // should match schema definition
out['topic'] = record.topic;
out['partition'] = record.partition;
Object.keys(header).forEach(function(key) {
if (interested_keys.indexOf(key) > -1){ // if match found for schema, set value
out[key] = header[key];
}
});
JSON.stringify(out);
{
"name": "myschema",
"type": "record",
"namespace": "com.cloudera.test",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "topic",
"type": "string"
},
{
"name": "partition",
"type": "string"
},
{
"name": "DC",
"type": "string"
}
]
}