SFTP Source connector
The SFTP Source connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.
The SFTP Source connector obtains files from an SFTP server and transfers them to Kafka. The SFTP Source connector supports record processing. In case record processing is enabled, input file is split into records and the records are transferred to Kafka. If record processing is not enabled, the connector forwards files to Kafka as they are fetched from the SFTP server.
If record processing is enabled, the input file can contain records in either JSON, CSV or
Grok format. If the input records are in JSON or CSV format, the connector can either infer
the schema of the records based on the data or read the schema from Schema Registry. If the
input records are in Grok format, the connector can either derive the schema using the field
names from the value of the Grok Expression
property or read the schema from
Schema Registry.
In addition, if record processing is enabled, the connector writes the records to Kafka in
Avro format. The record schema gets embedded in these Avro messages if the schema is set to be
either inferred or determined based on the Grok Expression
property. If
Schema Registry is used for getting the schema of the records, then the schema does not get
embedded in the Avro messages.
If Schema Registry is used, and it is on a Kerberized cluster, the krb5.file
property must point to the krb5.conf
file that provides access to the cluster
on which Schema Registry is present.
This
means that the krb5.conf
file must be on the same cluster node that the
connector runs on. The Kerberos keytab used to access Schema Registry must also be on the same
cluster node that the connector runs on. The connection to Schema Registry can be secured by
TLS. The truststore file necessary for securing the connection must also be on the same
cluster node that the connector runs on.
Properties and configuration
Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Source properties
- These are the properties that are specific to the Stateless NiFi Source connector. All Stateless NiFi Source connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Source property reference.
- Connector/dataflow-specific properties
- These properties are unique to this specific connector. Or to be more precise, unique
to the dataflow running within the connector. These properties use the following
prefix:
For a comprehensive list of these properties, see the SFTP Source properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration JSON.
- If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration JSON if not set.
- The
Schema Registry URL
property is mandatory even if Schema Registry is not used. If Schema Registry is not used, use the default value, or completely remove the property from the configuration JSON. Schema Branch
andSchema Version
can not be specified at the same time.- The value of the
Schema Access Strategy
property is not independent of the value of theInput Data Format
property. As a result, you must exercise caution when configuringSchema Access Strategy
.- If the value of
Input Data Format
isJSON
, the possible values forSchema Access Strategy
areSchema Registry
orInfer Schema
. - If the value of
Input Data Format
isCSV
, the possible values forSchema Access Strategy
areSchema Registry
orInfer Schema
. - If the value of
Input Data Format
isGROK
, the possible values forSchema Access Strategy
areSchema Registry
orField Names From Grok Expression
.
- If the value of
- If the
Enable Record Processing
property is set tofalse
(record processing disabled), the entire input file is transferred to Kafka as one message. - If the
Enable Record Processing
property is set totrue
(record processing enabled), the output is always in Avro format. However, the contents of the output message transferred to Kafka depends on the value of theSchema Access Strategy
property.- If
Schema Access Strategy
is set toSchema Registry
, the output Avro message does not contain the schema (the schema is not embedded in the output). - If
Schema Access Strategy
is set to eitherInfer Schema
orField Names From Grok Expression
, the output Avro message contains the schema (the schema is embedded in the output).
- If
Configuration example
{
"connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
"meta.smm.predefined.flow.name": "SFTP Source",
"meta.smm.predefined.flow.version": "1.0.0",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max": "1",
"nexus.url": "https://repository.cloudera.com/artifactory/repo",
"extensions.directory": "/tmp/nifi-stateless-extensions",
"working.directory": "/tmp/nifi-stateless-working",
"topics": "[*** KAFKA TOPIC NAME***]",
"parameter.SFTP Source Parameters:Hostname": "[***SFTP Server Hostname***]",
"parameter.SFTP Source Parameters:Port": "22",
"parameter.SFTP Source Parameters:Username": "[***SFTP USERNAME***]",
"parameter.SFTP Source Parameters:Password": "[***SFTP PASSWORD***]",
"parameter.SFTP Source Parameters:Remote Path": "[***FOLDER PATH***]",
"parameter.SFTP Source Parameters:File Filter Regex": ".*",
"parameter.SFTP Source Parameters:Enable Record Processing": "false"
}
topics
- The name of the Kafka topic that the connector sends messages to.
Hostname
- The hostname of the remote system where the SFTP server runs. For example,
my.sftp-server.com
. Port
- The port that the remote system is listening on for file transfers.
Username
- The username for connecting to the SFTP server.
Password
- The password used to authenticate the user towards the SFTP server. In this example, the SFTP server requires Basic Authentication.
Remote Path
- The path on the remote system that points to the directory from which to pull files.
For example,
/uploads
. File Filter Regex
- The Java regular expression to use for filtering filenames. Only files whose names match the regular expression are fetched.
Enable Record Processing
- Determines whether the contents of a file get parsed as records before sending them
to Kafka. In this example, the property is set to
false
, meaning that the entire file gets forwarded to Kafka as one message.