InfluxDB Sink connector
The InfluxDB Sink connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect Framework. Learn about the connector, its properties, and configuration.
The InfluxDB Sink Connector fetches messages from Kafka and loads them to InfluxDB. The topic
      this connector receives messages from is determined by the value of the
        topics parameter in the configuration. The messages the connector receives
      from Kafka can be in either Avro or JSON format.
If the connector input is in Avro format, then it can either read the schema from the Avro
      file it receives (provided that the schema is embedded) or it can fetch the schema from Schema
      Registry. If the connector’s input is in JSON format, then it can either infer the schema or
      fetch it from Schema Registry. The strategy that is used to retrieve the schema is determined
      by the Schema Access Strategy property.
The connector can authenticate to InfluxDB using username and password.
Properties and configuration
Configuration is passed to the connector in a JSON file during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Sink properties
- These are the properties that are specific to the Stateless NiFi Sink connector. All Stateless NiFi Sink connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Sink property reference.
- Connector/dataflow-specific properties
- These properties are unique to this specific connector. Or to be more precise, unique
            to the dataflow running within the connector. These properties use the following
            prefix:
 For a comprehensive list of these properties, see the InfluxDB Sink properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- If a property that has a default value is completely removed from the configuration JSON, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration JSON if not set.
- Schema Branchand- Schema Versioncan not be specified at the same time.
- The value of the Schema Access Strategyproperty is not independent of the value of theKafka Message Data Formatproperty. As a result, you must exercise caution when configuringSchema Access Strategy.- If the value of Kafka Message Data FormatisAVRO, the possible values forSchema Access StrategyareSchema RegistryorEmbedded Schema.
- If the value of Kafka Message Data FormatisJSON, the possible values forSchema Access StrategyareSchema RegistryorInfer Schema.
 
- If the value of 
- The Line Protocol Queryparameter contains a query expression which creates a string from input record fields. The query depends on the record schema. The result of the query must comply with the line protocol format.
Configuration example
In this example, the connector receives data in JSON format (which is the default setting), transforms the data to line protocol format, and inserts it to InfluxDB. Schema Registry is not used in the example. As a result, this example does not include Schema Registry key and truststore configurations.
{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSinkConnector",
 "meta.smm.predefined.flow.name": "InfluxDB Sink",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
 "tasks.max": "1",
 "extensions.directory": "/tmp/stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",
 "nexus.url": "https://repository.cloudera.com/artifactory/repo/",
 "failure.ports": "Retry from PutInfluxDB",
 "topics": "[***TOPIC NAME***]",
 "parameter.InfluxDB Connection URL": "http://[**SERVER  HOSTNAME**]:[**PORT**]",
 "parameter.InfluxDB Database Name": "[***DATABASE NAME***]",
 "parameter.InfluxDB User Name": "[***USER NAME***]",
 "parameter.InfluxDB User Password": "[***USER PASSWORD***]",
 "parameter.Line Protocol Query": "select 'temp,device_id=' || device_id || ',device_state=' || device_state || ',part_id=' || part_id || ',code=' || code || ',result=' || result_code || ' temperature=' || temperature || ' ' || ts || '000000' as payload from FLOWFILE",
 "parameter.Schema Access Strategy":"Infer Schema"
}
- topics
- The name of the Kafka topic the connector fetches messages from.
- InfluxDB Connection URL
- The URL used to connect to InfluxDB.
- InfluxDB Database Name
- The name of the InfluxDB database.
- InfluxDB User Name
- The username used to connect to InfluxDB.
- InfluxDB User Password
- The password used to connect to InfluxDB.
- Line Protocol Query
- Specifies a query based on the Avro schema of the records and the measurement used.
- Schema Access Strategy
- Specifies the strategy used for determining the schema of the Kafka record. In this
              example, the property is set to Infer Schema. This means that the schema is determined (inferred) from the JSON data.
