HDFS Stateless Sink connector
The HDFS Stateless Sink connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect Framework. Learn about the connector, its properties, and configuration.
The HDFS Stateless Sink Connector fetches messages from Kafka and stores them in HDFS. The
      topic this connector receives messages from is determined by the value of the
        topics parameter in the configuration. The messages can contain
      unstructured data (character or binary) or they can be in Avro or JSON format.
If the input is unstructured data, record processing is disabled. Multiple messages can be concatenated into a single output file on HDFS using a demarcator (for example newline for text messages). Merging is optional, large binary data can be forwarded to HDFS at a 1:1 ratio, one Kafka message equals a single HDFS file. However, in case of large files, this usage is encouraged only to avoid small file problems in HDFS.
If the input is either Avro or JSON, record processing is enabled. In a case like this, the schema of the records can be a predefined schema retrieved from Schema Registry (for both Avro and JSON data), it can be embedded in the Avro data, or inferred from the JSON data. The output can be Avro, JSON, CSV, or Parquet. Multiple records are typically merged into a single output file to store in HDFS.
The connector supports connecting to Kerberos enabled HDFS and Schema Registry services.
Properties and configuration
Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Sink properties
- These are the properties that are specific to the Stateless NiFi Sink connector. All Stateless NiFi Sink connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Sink property reference.
- Connector/dataflow-specific properties
- These properties are unique to this specific connector. Or to be more precise, unique
            to the dataflow running within the connector. These properties use the following
            prefix:
 For a comprehensive list of these properties, see the HDFS Stateless Sink properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
- If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration JSON if not set.
- Schema Branchand- Schema Versioncan not be specified at the same time.
- If HDFS is running on a different cluster than the connector, the HDFS configuration resource files must be copied to the Kafka Connect node that the connector runs on.
Configuration example for unstructured data
In this example, the connector fetches unstructured messages from Kafka. The connector
        concatenates the messages using newline characters as the demarcator. The files that the
        connector creates and stores in HDFS are maximum 10 MB in size. The files are named
        according to the following pattern:
          messages_[***UUID***]. 
{
"connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
"meta.smm.predefined.flow.name": "HDFS Sink",
"meta.smm.predefined.flow.version": "1.0.0",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max": "1",
"nexus.url": "https://repository.cloudera.com/artifactory/repo/",
"working.directory": "/tmp/nifi-stateless-working",
"failure.ports": "",
"topics": "[***TOPIC NAME***]",
"parameter.HDFS Sink Parameters:Hadoop Configuration Resources": "/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml",
"parameter.HDFS Sink Parameters:Kafka Message Data Format": "Raw",
"parameter.HDFS Sink Parameters:Output Directory Pattern": "[***OUTPUT DIRECTORY PATH***]",
"parameter.HDFS Sink Parameters:Output Filename Pattern": "messages_${filename.uuid}",
"parameter.HDFS Sink Parameters:Maximum File Size": "10 MB",
"parameter.HDFS Sink Parameters:Output File Demarcator": " \n"
}
- topics
- The name of the Kafka topic the connector fetches messages from.
- Hadoop Configuration Resources
- A comma delimited list of Hadoop configuration files (core-site.xml, hdfs-site.xml, and so on). If HDFS is running on a different cluster than the connector, ensure that you copy the configuration files to the Kafka Connect node that the connector runs on.
- Output Directory Pattern
- Specifies the path to the output directory in HDFS.
- Maximum File Size
- The maximum size of the output data file. In this example, the maximum size is 10 MB.
- Kafka Message Data Format
- The format of the messages the connector receives from Kafka. In this example, this
              property is set to Raw. This means that the connector expects unstructured text or binary data and that record processing is disabled.
- Output File Demarcator
- Specifies the character sequence for demarcating (delimiting) message boundaries
              when multiple Kafka messages are ingested into an output file as raw messages. In this
              example, the property is set to \n(newline). This means that the newline character is used to separate the single line character data of the Kafka messages.
- Output Filename Pattern
- Specifies the structure of the name of the output file. This property accepts string
              literals (fixed text) as well as various expressions. In this example, the property is
              set tomessages_${filename.uuid}.The wordmessagesand the underscore ( _ ) are string literals.${filename.uuid}is an expression, which inserts a Universal Unique Identifier (UUID). As a result, the files are named according to the following pattern:messages_[***UUID***].
Configuration example for Avro input and JSON output
In this example, the connector fetches messages in Avro format from Kafka. The connector
        parses the Avro records and converts them to JSON format using the schema definition stored
        in Schema Registry. The Avro file does not contain any schema reference. This is the reason
        why the schema must be specified in the connector parameters. The connector merges multiple
        records (all messages that are available for a single execution of the connector) into a
        JSON file, generates a name for the file using the following pattern:
            messages_[***UUID***].json, and stores the file in
        HDFS.
{
"connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
"meta.smm.predefined.flow.name": "HDFS Sink",
"meta.smm.predefined.flow.version": "1.0.0",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max": "1",
"nexus.url": "https://repository.cloudera.com/artifactory/repo/",
"working.directory": "/tmp/nifi-stateless-working",
"failure.ports": "",
"topics": "[***TOPIC NAME***]",
"parameter.HDFS Sink Parameters:Hadoop Configuration Resources": "/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml",
"parameter.HDFS Sink Parameters:Kafka Message Data Format": "Avro",
"parameter.HDFS Sink Parameters:Output Directory Pattern": "[***OUTPUT DIRECTORY NAME***]",
"parameter.HDFS Sink Parameters:Output File Data Format": "JSON",
"parameter.HDFS Sink Parameters:Output Filename Pattern": "messages_${filename.uuid}.json",
"parameter.HDFS Sink Parameters:Schema Access Strategy": "Schema Registry",
"parameter.HDFS Sink Parameters:Schema Name": "[***SCHEMA NAME***]"
"parameter.HDFS Sink Parameters:Schema Registry URL": "[***SCHEMA REGISTRY SERVER URL***]"
}
- topics
- The name of the Kafka topic the connector fetches messages from.
- Hadoop Configuration Resources
- A comma delimited list of Hadoop configuration files (core-site.xml, hdfs-site.xml, and so on). If HDFS is running on a different cluster than the connector, ensure that you copy the configuration files to the Kafka Connect node that the connector runs on.
- Output Directory Pattern
- Specifies the path to the output directory in HDFS.
- Kafka Message Data Format
- The format of the messages the connector receives from Kafka. In this example, this
              property is set to Avro. This means that the connector expects Avro data and that record processing is enabled.
- Output File Data Format
- Specifies the format of the records written to the output file. In this example,
              this property is set to JSON, meaning that the output file format is JSON.
- Output Filename Pattern
- Specifies the structure of the name of the output file. This property accepts string
              literals (fixed text) as well as various expressions. In this example, the property is
              set to messages_${filename.uuid}.json. The wordmessages, the underscore ( _ ), and.jsonare string literals.${filename.uuid}is an expression, which inserts a Universal Unique Identifier (UUID). As a result, the files are named according to the following pattern:messages_[***UUID***].json.
- Schema Access Strategy
- Specifies the strategy used for determining the schema of the Kafka record. In this example, this property is set to Schema Registry, meaning that the schema definition used to parse input data is taken from the Schema Registry.
- Schema Name
- Specifies the schema name to look up in Schema Registry.
- Schema Registry URL
- The URL of the Schema Registry server.
Configuration example for JSON input and Parquet output
In this example, the connector fetches messages in JSON format from Kafka. The connector
        parses the JSON records and converts them to Parquet format using the schema inferred from
        the JSON data. The connector merges multiple records (all messages that are available for a
        single execution of the connector) into a Parquet file, generates a name for the file using
        the following pattern:
            messages_[***TIMESTAMP***]_[***SEQUENCE***].parquet.gz,
        and stores the files in HDFS. The output directory path contains a fixed name and a
        timestamp appended to it: /outputDir/[***TIMESTAMP***].
        The connector authenticates to HDFS using Kerberos.
{
"connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
"meta.smm.predefined.flow.name": "HDFS Sink",
"meta.smm.predefined.flow.version": "1.0.0",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max": "1",
"nexus.url": "https://repository.cloudera.com/artifactory/repo/",
"working.directory": "/tmp/nifi-stateless-working",
"failure.ports": "",
"topics": "[***TOPIC NAME***]",
"parameter.HDFS Sink Parameters:Hadoop Configuration Resources": "/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml",
"parameter.HDFS Sink Parameters:Kafka Message Data Format": "JSON",
"parameter.HDFS Sink Parameters:Output Directory Pattern": "/outputDir/${directory.timestamp}",
"parameter.HDFS Sink Parameters:Output Directory Timestamp Format":"yyMMdd",
"parameter.HDFS Sink Parameters:Output File Data Format": "Parquet",
"parameter.HDFS Sink Parameters:Output Filename Pattern": "messages_${filename.timestamp}_${filename.sequence}.parquet.gz",
"parameter.HDFS Sink Parameters:Output Filename Timestamp Format”: “HHmmss”,
"parameter.HDFS Sink Parameters:Schema Access Strategy": "Infer Schema",
"parameter.HDFS Sink Parameters:Kerberos Keytab for HDFS”:”[***KEYTAB LOCATION***]”,
"parameter.HDFS Sink Parameters:Kerberos Principal for HDFS”:”[***PRINCIPAL NAME***]”,
"parameter.HDFS Sink Parameters:Compression Codec for Parquet”:”GZIP” 
}- topics
- The name of the Kafka topic the connector fetches messages from.
- Hadoop Configuration Resources
- A comma delimited list of Hadoop configuration files (core-site.xml, hdfs-site.xml, and so on). If HDFS is running on a different cluster than the connector, ensure that you copy the configuration files to the Kafka Connect node that the connector runs on.
- Output Directory Pattern
- Specifies the path to the output directory in HDFS. In this example, the path is set
              to /outputDir/${directory.timestamp}. As a result, the directory path in this example resolves to/outputDir/[***TIMESTAMP***]. The timestamp format is configured in theOutput Directory Timestamp Formatproperty.
- Output Directory Timestamp Format
- Specifies the timestamp format used in the ${directory.timestamp}expression, which can be used in theOutput Directory Patternproperty’s value. Directory Time Based Partitioning can be implemented using this expression. For example, ifOutput Directory Patternis set to/outputDir/${directory.timestamp}and the timestamp format specified isyyMMdd, the output files will be generated in the /outputDir/220101/ directory if the date is January 1, 2022.
- Kafka Message Data Format
- The format of the messages the connector receives from Kafka. In this example, this
              property is set to JSON. This means that the connector expects JSON data and that record processing is enabled.
- Output File Data Format
- Specifies the format of the records written to the output file. In this example,
              this property is set to Parquet, meaning that the output file format is Parquet.
- Output Filename Pattern
- Specifies the structure of the name of the output file. This property accepts string
              literals (fixed text) as well as various expressions. In this example, the property is
              set to
              messages_${filename.timestamp}_${filename.sequence}.parquet.gz.The wordmessages, the underscores ( _ ), andparquet.gzare string literals.${filename.timestamp}and${filename.sequence}are expressions.${filename.timestamp}inserts the current timestamp in the filename.${filename.sequence}inserts an incrementing sequence value in the filename. As a result, the files are named according to the following pattern:messages_[***TIMESTAMP***]_[***SEQUENCE***].parquet.gz.
- Output Filename Timestamp Format
- Specifies the timestamp format used in the ${filename.timestamp}expression. The expression is used when generating the output filename. In this example, the format isHHmmss.
- Schema Access Strategy
- Specifies the strategy used for determining the schema of the Kafka record. In this
              example, this property is set to Infer Schema, meaning that the schema is determined (inferred) from the JSON data.
- Kerberos Keytab for HDFS
- Specifies the keytab used to authenticate to HDFS with Kerberos.
- Kerberos Principal for HDFS
- Specifies the principal used to authenticate to HDFS with Kerberos.
- Compression Codec
- Specifies the codec used to compress files and store them in compressed format in HDFS, in the example the GZIP codec is used.
