SFTP Source connector
The SFTP Source connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.
The SFTP Source connector obtains files from an SFTP server and transfers them to Kafka. The SFTP Source connector supports record processing. In case record processing is enabled, input file is split into records and the records are transferred to Kafka. If record processing is not enabled, the connector forwards files to Kafka as they are fetched from the SFTP server.
If record processing is enabled, the input file can contain records in either JSON, CSV or
      Grok format. If the input records are in JSON or CSV format, the connector can either infer
      the schema of the records based on the data or read the schema from Schema Registry. If the
      input records are in Grok format, the connector can either derive the schema using the field
      names from the value of the Grok Expression property or read the schema from
      Schema Registry.
In addition, if record processing is enabled, the connector writes the records to Kafka in
      Avro format. The record schema gets embedded in these Avro messages if the schema is set to be
      either inferred or determined based on the Grok Expression property. If
      Schema Registry is used for getting the schema of the records, then the schema does not get
      embedded in the Avro messages.
If Schema Registry is used, and it is on a Kerberized cluster, the krb5.file
      property must point to the krb5.conf file that provides access to the cluster
      on which Schema Registry is present.
      This
      means that the krb5.conf file must be on the same cluster node that the
      connector runs on. The Kerberos keytab used to access Schema Registry must also be on the same
      cluster node that the connector runs on. The connection to Schema Registry can be secured by
      TLS. The truststore file necessary for securing the connection must also be on the same
      cluster node that the connector runs on.
Properties and configuration
Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Source properties
- These are the properties that are specific to the Stateless NiFi Source connector. All Stateless NiFi Source connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Source property reference.
- Connector/dataflow-specific properties
- These properties are unique to this specific connector. Or to be more precise, unique
            to the dataflow running within the connector. These properties use the following
            prefix:
 For a comprehensive list of these properties, see the SFTP Source properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
- If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration JSON if not set.
- The Schema Registry URLproperty is mandatory even if Schema Registry is not used. If Schema Registry is not used, use the default value, or completely remove the property from the configuration JSON.
- Schema Branchand- Schema Versioncan not be specified at the same time.
- The value of the Schema Access Strategyproperty is not independent of the value of theInput Data Formatproperty. As a result, you must exercise caution when configuringSchema Access Strategy.- If the value of Input Data FormatisJSON, the possible values forSchema Access StrategyareSchema RegistryorInfer Schema.
- If the value of Input Data FormatisCSV, the possible values forSchema Access StrategyareSchema RegistryorInfer Schema.
- If the value of Input Data FormatisGROK, the possible values forSchema Access StrategyareSchema RegistryorField Names From Grok Expression.
 
- If the value of 
- If the Enable Record Processingproperty is set tofalse(record processing disabled), the entire input file is transferred to Kafka as one message.
- If the Enable Record Processingproperty is set totrue(record processing enabled), the output is always in Avro format. However, the contents of the output message transferred to Kafka depends on the value of theSchema Access Strategyproperty.- If Schema Access Strategyis set toSchema Registry, the output Avro message does not contain the schema (the schema is not embedded in the output).
- If Schema Access Strategyis set to eitherInfer SchemaorField Names From Grok Expression, the output Avro message contains the schema (the schema is embedded in the output).
 
- If 
Configuration example
{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
 "meta.smm.predefined.flow.name": "SFTP Source",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "topics": "[*** KAFKA TOPIC NAME***]",
 "parameter.SFTP Source Parameters:Hostname": "[***SFTP Server Hostname***]",
 "parameter.SFTP Source Parameters:Port": "22",
 "parameter.SFTP Source Parameters:Username": "[***SFTP USERNAME***]",
 "parameter.SFTP Source Parameters:Password": "[***SFTP PASSWORD***]",
 "parameter.SFTP Source Parameters:Remote Path": "[***FOLDER PATH***]",
 "parameter.SFTP Source Parameters:File Filter Regex": ".*",
 "parameter.SFTP Source Parameters:Enable Record Processing": "false"
}
- topics
- The name of the Kafka topic that the connector sends messages to.
- Hostname
- The hostname of the remote system where the SFTP server runs. For example,
                my.sftp-server.com.
- Port
- The port that the remote system is listening on for file transfers.
- Username
- The username for connecting to the SFTP server.
- Password
- The password used to authenticate the user towards the SFTP server. In this example, the SFTP server requires Basic Authentication.
- Remote Path
- The path on the remote system that points to the directory from which to pull files.
              For example, /uploads.
- File Filter Regex
- The Java regular expression to use for filtering filenames. Only files whose names match the regular expression are fetched.
- Enable Record Processing
- Determines whether the contents of a file get parsed as records before sending them
              to Kafka. In this example, the property is set to false, meaning that the entire file gets forwarded to Kafka as one message.
