Kafka data source integration

When integrating Kafka with SQL Stream Builder, you are able to use the Kafka specific syntaxes to customize your SQL queries based on your deployment and use case.

Timestamps

Timestamps are supported in Kafka as part of the messageformat. This is exposed to SQL Stream Builder through the virtual column eventTimestamp. The time in the message is the time when the message was created. SQL Stream Builder is also aware of the current_timestamp which is the current timestamp at query execution start time.

You can construct the queries in the following way:
SELECT sensor_name
FROM sensors
WHERE eventTimestamp > current_timestamp;
You can also use the timestamp in windowing queries in the following way:
SELECT SUM(CAST(amount AS numeric)) AS payment_volume,
CAST(TUMBLE_END(eventTimestamp, interval '1' hour) AS varchar) AS ts
FROM payments
GROUP BY TUMBLE(eventTimestamp, interval '1' hour);

For more information, see the SQL Syntax Guide.

Assigning keys for output

If using a Kafka as a sink, keys can be assigned using the special alias _eventKey. This alias specifies the column as the partition key using the Kafka driver.
SELECT sensor_name AS _eventKey --sensor_name becomes the key in the output kafka topic
FROM sensors
WHERE eventTimestamp > current_timestamp;

Performance & Scalability

You can achieve high performance and scalability with SQL Stream Builder, but the proper configuration and design of the source Kafka topic is critical. SQL Stream Builder can read a maximum of one thread per Kafka partition. You can achieve the highest performance configuration when setting the SQL Stream Builder threads higher or equal than the number of Kafka partitions. If the number of partitions is less than the SQL Stream Builder threads, then SQL Stream Builder has idle threads and messages show up in the logs indicating as such. For more information about Kafka partitioning, see the Kafka partitions documentation.

Kafka message header access

You can write custom headers to Kafka which are used for metadata, sometimes routing information, filtering, and so on. SQL Stream Builder has access to the header information using the input transforms functionality. For more information about Input Transforms, see the Input Transforms section.

The following attributes are supported in the headers:
message.topic
message.key
message.value
message.headers
message.offset
message.partition
For example, an input transformation could be expressed as the following:
var out = JSON.parse(record);
out['topic'] = message.topic;
out['partition'] = message.partition;
JSON.stringify(out);
For which you define a schema as follows:
{
  "name": "myschema",
  "type": "record",
  "namespace": "com.cloudera.test",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "topic",
      "type": "string"
    },
    {
      "name": "partition",
      "type": "string"
    }
  ]
}
The attribute message.headers is an array that can be iterated over:
var out = JSON.parse(record);
var header = JSON.parse(message.headers);
var interested_keys = ['DC'];               // should match schema definition

out['topic'] = message.topic;
out['partition'] = message.partition;

Object.keys(header).forEach(function(key) {
    if (interested_keys.indexOf(key) > -1){  // if match found for schema, set value
        out[key] = header[key];
    }
});

JSON.stringify(out);
For which you define a schema as follows:
{
  "name": "myschema",
  "type": "record",
  "namespace": "com.cloudera.test",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "topic",
      "type": "string"
    },
    {
      "name": "partition",
      "type": "string"
    },
    {
      "name": "DC",
      "type": "string"
    }
  ]
}