S3 Sink connector

The S3 Sink Connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.

The S3 Sink connector fetches messages from Kafka and uploads them to AWS S3. The topic this connector receives messages from is determined by the value of the topics property in the configuration. The messages can contain unstructured (character or binary) data or they can be in Avro or JSON format.

If the input is unstructured data, record processing is disabled. In a case like this, multiple messages can be concatenated into a single output file on S3 using a demarcator (for example, newline for text messages). Merging is optional, large binary data can be forwarded to S3 at a 1:1 ratio, one Kafka message equals a single S3 object.

If the input is either Avro or JSON, record processing is enabled. In a case like this, the schema of the records can be a predefined schema retrieved from Schema Registry (for both Avro and JSON data), it can be embedded in the Avro data, or inferred from the JSON data. The output can be Avro, JSON, CSV or Parquet. Multiple records are typically merged into a single output file before uploading to S3.

The connector can authenticate to AWS using an Access Key.

Properties and configuration

Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:

Common connector properties
These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
Stateless NiFi Sink properties
These are the properties that are specific to the Stateless NiFi Sink connector. All Stateless NiFi Sink connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Sink property reference.
Connector/dataflow-specific properties
These properties are unique to this specific connector. Or to be more precise, unique to the dataflow running within the connector. These properties use the following prefix:
parameter.[***CONNECTOR NAME***] Parameters:
For a comprehensive list of these properties, see the ADLS Sink properties reference.

Notes and limitations

  • Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
  • If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
  • Properties not marked as required must be completely removed from the configuration JSON if not set.
  • Schema Branch and Schema Version can not be specified at the same time.
  • It is not recommended to configure the following combination of settings:
    • Kafka Message Data Format = JSON
    • Schema Access Strategy = Infer Schema
    • Maximum Number of Entries > 1
    • Output File Data Format = Avro

    In case the input records do not strictly match to one schema (for example, one record has a missing field while the other records do not), schema inference determines a schema for each record, while batching the schemas together assigns only one schema to the entire batch. Because of this, there might be records in the batch that do not match with the schema that is assigned to the batch. This can be avoided by disabling batching with the Maximum Number of Entries set to 1.

Configuration example for fetching unstructured data

In this example, the connector fetches unstructured messages containing single line character data from Kafka. The connector concatenates the messages using newline characters as the demarcator. The files that the connector creates and uploads to ADLS are maximum 10 MB in size. The files are named according to the following pattern: messages_[***UUID***]. The connector authenticates to AWS using an Access Key.

{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
 "meta.smm.predefined.flow.name": "S3 Sink",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "failure.ports": "PutS3Object Failure",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.S3 Sink Parameters:AWS Access Key ID": "[***ACCESS KEY ID***]",
 "parameter.S3 Sink Parameters:AWS Secret Access Key": "[***SECRET ACCESS KEY***]",
 "parameter.S3 Sink Parameters:S3 Region": "[***S3 REGION***]",
 "parameter.S3 Sink Parameters:S3 Bucket": "[***S3 Bucket***]",
 "parameter.S3 Sink Parameters:Maximum File Size": "10 MB",
 "parameter.S3 Sink Parameters:Kafka Message Data Format": "Raw",
 "parameter.S3 Sink Parameters:Output File Demarcator": "\n",
 "parameter.ADLS Sink Parameters:Output Filename Pattern": "messages_${filename.uuid}"
}
The following list collects the properties from the configuration example that must be customized for this use case.
topics
The name of the Kafka topic the connector fetches messages from.
AWS Access Key ID
The Access Key ID to use for authentication to AWS.
AWS Secret Access Key
The Secret Access Key to use for authentication to AWS.
S3 Region
The AWS Region of the S3 Bucket to upload data to.
S3 Bucket
The S3 Bucket to upload data to.
Maximum File Size
The maximum size of the output data file. In this example, the maximum size is 10 MB.
Kafka Message Data Format
The format of the messages the connector receives from Kafka. In this example, this property is set to Raw. This means that the connector expects unstructured text or binary data.
Output File Demarcator
Specifies the character sequence for demarcating (delimiting) message boundaries when multiple Kafka messages are ingested into an output file as raw messages. In this example, the property is set to \n (newline). This means that the newline character is used to separate the single line character data of the Kafka messages.
Output Filename Pattern
Specifies the structure of the name of the output file. This property accepts string literals (fixed text) as well as various expressions. In this example, the property is set to messages_${filename.uuid}.

messages_ is a string literal, ${filename.uuid} is an expression that inserts a generated UUID in the filename. As a result, the files are named according to the following pattern: messages_[***UUID***].

Configuration example for fetching JSON data

In this example, the connector fetches messages in JSON format from Kafka. The connector parses the JSON records and converts them to Avro format using the schema inferred from the JSON data. The schema is also embedded in the output file. The connector merges multiple records (all messages that are available for a single execution of the connector) into an Avro file, generates a name for the file using the following pattern data_[***TIMESTAMP***]_[***SEQUENCE***].avro, and uploads the file to S3. The connector authenticates to AWS using an Access Key.

{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
 "meta.smm.predefined.flow.name": "S3 Sink",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "failure.ports": "PutS3Object Failure",
 "topics": "[***KAFKA TOPIC NAME***]",
 "parameter.S3 Sink Parameters:AWS Access Key ID": "[***ACCESS KEY ID***]",
 "parameter.S3 Sink Parameters:AWS Secret Access Key": "[***SECRET ACCESS KEY***]",
 "parameter.S3 Sink Parameters:S3 Region": "[***S3 REGION***]",
 "parameter.S3 Sink Parameters:S3 Bucket": "[***S3 BUCKET***]"
 "parameter.ADLS Sink Parameters:Kafka Message Data Format": "JSON",
 "parameter.ADLS Sink Parameters:Schema Access Strategy": "Infer Schema",
 "parameter.ADLS Sink Parameters:Schema Write Strategy": "Embed Avro Schema",
 "parameter.ADLS Sink Parameters:Output File Data Format": "Avro",
 "parameter.ADLS Sink Parameters:Output Filename Pattern": "data_${filename.timestamp}_${filename.sequence}.avro",
 "parameter.ADLS Sink Parameters:Output Filename Timestamp Format": "yyyyMMdd_HHmmss_SSS"
}
The following list collects the properties from the configuration example that must be customized for this use case.
topics
The name of the Kafka topic the connector fetches messages from.
AWS Access Key ID
The Access Key ID to use for authentication to AWS.
AWS Secret Access Key
The Secret Access Key to use for authentication to AWS.
S3 Region
The AWS Region of the S3 Bucket to upload data to.
S3 Bucket
The S3 Bucket to upload data to.
Kafka Message Data Format
The format of the messages the connector receives from Kafka. In this example, this property is set to JSON. This means that the connector expects JSON data.
Schema Access Strategy
Specifies the strategy used for determining the schema of the Kafka record. In this example, this property is set to Infer Schema, meaning that the schema is determined (inferred) from the JSON data.
Schema Write Strategy
Specifies whether the record schema is written to the output data file. In this example, this property is set to Embed Avro Schema, meaning that the schema is embedded in the output Avro file.
Output File Data Format
Specifies the format of the records written to the output file. In this example, this property is set to Avro, meaning that the output file format is Avro.
Output Filename Pattern
Specifies the structure of the name of the output file. This property accepts string literals (fixed text) as well as various expressions. In this example, the property is set to data_${filename.timestamp}_${filename.sequence}.avro.

data_, the underscore ( _ ), and .avro are string literals. ${filename.timestamp} and ${filename.sequence} are expressions. ${filename.timestamp} inserts the current timestamp in the filename. ${filename.sequence} inserts an incrementing sequence value in the filename. As a result, the files are named according to the following pattern: data_[***TIMESTAMP***]_[***SEQUENCE***].avro.

Output Filename Timestamp Format
Specifies the timestamp format used in the ${filename.timestamp} expression. The expression is used when generating the output filename.