Kafka data source integration
When integrating Kafka with SQL Stream Builder, you are able to use the Kafka specific syntaxes to customize your SQL queries based on your deployment and use case.
Timestamps
Timestamps are supported in Kafka as part of the messageformat
. This is
exposed to SQL Stream Builder through the virtual column eventTimestamp
.
The time in the message is the time when the message was created. SQL Stream Builder is also
aware of the current_timestamp
which is the current timestamp at query
execution start time.
SELECT sensor_name
FROM sensors
WHERE eventTimestamp > current_timestamp;
SELECT SUM(CAST(amount AS numeric)) AS payment_volume,
CAST(TUMBLE_END(eventTimestamp, interval '1' hour) AS varchar) AS ts
FROM payments
GROUP BY TUMBLE(eventTimestamp, interval '1' hour);
For more information, see the SQL Syntax Guide.
Assigning keys for output
_eventKey
. This alias specifies the column as the partition key using the
Kafka
driver.SELECT sensor_name AS _eventKey --sensor_name becomes the key in the output kafka topic
FROM sensors
WHERE eventTimestamp > current_timestamp;
Performance & Scalability
You can achieve high performance and scalability with SQL Stream Builder, but the proper configuration and design of the source Kafka topic is critical. SQL Stream Builder can read a maximum of one thread per Kafka partition. You can achieve the highest performance configuration when setting the SQL Stream Builder threads higher or equal than the number of Kafka partitions. If the number of partitions is less than the SQL Stream Builder threads, then SQL Stream Builder has idle threads and messages show up in the logs indicating as such. For more information about Kafka partitioning, see the Kafka partitions documentation.
Kafka message header access
You can write custom headers to Kafka which are used for metadata, sometimes routing information, filtering, and so on. SQL Stream Builder has access to the header information using the input transforms functionality. For more information about Input Transforms, see the Input Transforms section.
message.topic
message.key
message.value
message.headers
message.offset
message.partition
var out = JSON.parse(record);
out['topic'] = message.topic;
out['partition'] = message.partition;
JSON.stringify(out);
{
"name": "myschema",
"type": "record",
"namespace": "com.cloudera.test",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "topic",
"type": "string"
},
{
"name": "partition",
"type": "string"
}
]
}
message.headers
is an array that can be iterated
over:var out = JSON.parse(record);
var header = JSON.parse(message.headers);
var interested_keys = ['DC']; // should match schema definition
out['topic'] = message.topic;
out['partition'] = message.partition;
Object.keys(header).forEach(function(key) {
if (interested_keys.indexOf(key) > -1){ // if match found for schema, set value
out[key] = header[key];
}
});
JSON.stringify(out);
{
"name": "myschema",
"type": "record",
"namespace": "com.cloudera.test",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "topic",
"type": "string"
},
{
"name": "partition",
"type": "string"
},
{
"name": "DC",
"type": "string"
}
]
}