Data Transformations tab
When using the Add Kafka table wizard on the Streaming SQL Console, you can apply input transformation under the Transformations tab. Input transformations can be used to clean or arrange the incoming data from the source using javascript functions.
Input Transforms are a powerful way to clean, modify, and arrange data that is poorly organized, has changing format, and has data that is not needed or otherwise hard to use. With the Input Transform feature of SQL Stream Builder, you can create a javascript function to transform the data after it has been consumed from a Kafk topic, and before you run SQL queries on the data.
- The source is not in your control, for example, data feed from a third-party provider
- The format is hard to change, for example, a legacy feed, other teams of feeds within your organization
- The messages are inconsistent
- The data from the sources do not have uniform keys, or without keys (like nested arrays), but are still in a valid JSON format
- The schema you want does not match the incoming topic
-
You can use the Input Transforms on Kafka tables that have the following characteristics:
- Allows one transformation per source.
- Takes record as a JSON-formatted string input variable. The input is always named record.
- Emits the output of the last line to the calling JVM. It could be any variable name. In the following example, out and emit is used as a JSON-formatted string.
A basic input transformation looks like this:var out = JSON.parse(record.value); // record is input, parse JSON formatted string to object // add more transformations if needed JSON.stringify(out); // emit JSON formatted string of object
Adding transformation to Kafka tables
When adding a Kafka table using the wizard, you can specify the input transformation on the Data Transformation tab. You have one of the following steps to apply an input transformation:
- Add your javascript transformation code to the Data Transformation
window.
Make sure the output of your transform matches the Schema definition detected or defined for the Kafka table.
- Click Install default template and schema.
The Install Default template and schema option fills out the Data Transformation box with a template that you can use to create the Input Transform, and matches the schema with the format.
Kafka record metadata access
There are cases when it is required to access additional metadata from the Kafka record to implement the correct processing logic. SQL Stream Builder has access to this information using the Input Transforms functionality.
record.topic record.key record.value record.headers record.offset record.partition
var out = JSON.parse(record); out['topic'] = message.topic; out['partition'] = message.partition; JSON.stringify(out);
{ "name": "myschema", "type": "record", "namespace": "com.cloudera.test", "fields": [ { "name": "id", "type": "int" }, { "name": "topic", "type": "string" }, { "name": "partition", "type": "string" } ] }
record.headers
is an array that can be iterated
over:var out = JSON.parse(record); var header = JSON.parse(record.headers); var interested_keys = ['DC']; // should match schema definition out['topic'] = record.topic; out['partition'] = record.partition; Object.keys(header).forEach(function(key) { if (interested_keys.indexOf(key) > -1){ // if match found for schema, set value out[key] = header[key]; } }); JSON.stringify(out);
{ "name": "myschema", "type": "record", "namespace": "com.cloudera.test", "fields": [ { "name": "id", "type": "int" }, { "name": "topic", "type": "string" }, { "name": "partition", "type": "string" }, { "name": "DC", "type": "string" } ] }