JMS Source connector
The JMS Source connector is a Stateless NiFi dataflow developed by Cloudera that is running in the Kafka Connect framework. Learn about the connector, its properties, and configuration.
The JMS Source Connector consumes messages from a JMS broker and transfers the message body
      to Kafka. The Kafka topic this connector transfers messages to is determined by the value of
      the topics parameter in the configuration. The JMS Source connector does not
      perform record processing, the messages are transferred to Kafka as they are consumed from the
      JMS broker.
TLS can be used to establish a secure connection between the connector and the JMS broker. The keystore and truststore files necessary for securing the connection must be present on the cluster node that the connector runs on.
Properties and configuration
Configuration is passed to the connector during creation. The properties of the connector can be categorized into three groups. These are as follows:
- Common connector properties
- These are the properties of the Kafka Connect framework that are accepted by all connectors. For a comprehensive list of these properties, see the Apache Kafka documentation.
- Stateless NiFi Source properties
- These are the properties that are specific to the Stateless NiFi Source connector. All Stateless NiFi Source connectors share and accept these properties. For a comprehensive list of these properties, see the Stateless NiFi Source property reference.
- Connector/dataflow-specific properties
- These properties are unique to this specific connector. Or to be more precise, unique
            to the dataflow running within the connector. These properties use the following
            prefix:
 For a comprehensive list of these properties, see the JMS Source properties reference.parameter.[***CONNECTOR NAME***] Parameters:
Notes and limitations
- Required properties must be assigned a valid value even if they are not used in the particular configuration. If a required property is not used, either leave its default value, or completely remove the property from the configuration.
- If a property that has a default value is completely removed from the configuration, the system uses the default value.
- Properties not marked as required must be completely removed from the configuration if not set.
Configuration example
{
 "connector.class": "org.apache.nifi.kafka.connect.StatelessNiFiSourceConnector",
 "meta.smm.predefined.flow.name": "JMS Source",
 "meta.smm.predefined.flow.version": "1.0.0",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",,
 "tasks.max": "1"
 "nexus.url": "https://repository.cloudera.com/artifactory/repo",
 "extensions.directory": "/tmp/nifi-stateless-extensions",
 "working.directory": "/tmp/nifi-stateless-working",,
 "topics": "[***TOPIC NAME***]"
 "parameter.JMS Source Parameters:JMS Broker URI": "[***BROKER URI***]",
 "parameter.JMS Source Parameters:JMS Client Libraries": "[***PATH TO CLIENT LIBRARIES***]",
 "parameter.JMS Source Parameters:JMS Connection Factory Class Name": "org.apache.activemq.ActiveMQSslConnectionFactory",
 "parameter.JMS Source Parameters:JMS Destination Name": "[***QUEUE NAME***]",
 "parameter.JMS Source Parameters:JMS Destination Type": "QUEUE",
 "parameter.JMS Source Parameters:JMS User Name": "[***USERNAME***]",
 "parameter.JMS Source Parameters:JMS User Password": "[***PASSWORD***]",
 "parameter.JMS Source Parameters:Truststore Filename": "[***PATH TO TRUSTSTORE***]",
 "parameter.JMS Source Parameters:Truststore Password": "[***TRUSTSTORE PASSWORD***]",
 "parameter.JMS Source Parameters:Truststore Type": "[***TRUSTSTORE TYPE***]"
}
- topics
- The name of the Kafka topic that the connector sends messages to.
- JMS Broker URI
- The URI of the ActiveMQ Message Broker. For example,
                ssl://activemq-hostname:61617.
- JMS Client Libraries
- The path to the directory containing ActiveMQ client libraries.
- JMS Connection Factory Class Name
- The Java class name of the ActiveMQ ConnectionFactory implementation.
- JMS Destination Name
- The name of the destination (in this case a queue) on the ActiveMQ Message Broker that the messages are received from.
- JMS Destination Type
- The type of the destination. In this example, the type is
              QUEUE.
- JMS User Name
- The username used for authenticating to the ActiveMQ Message Broker.
- JMS User Password
- The password of the user.
- Truststore *
- These are the properties for accessing the truststore containing the ActiveMQ Message Broker’s certificate used for secure communication.
Stateless NiFi Source properties reference
Review the following reference for a comprehensive list of the connector properties that are specific to the Stateless NiFi Source connector.
In addition to the properties listed here, Stateless NiFi connectors also accept the properties of the Kafka Connect framework. For a comprehensive list of these properties, see the Apache Kafka documentation.
dataflow.timeout
- Description
- Specifies the maximum amount of time to wait for the dataflow to complete. If the dataflow does not complete before this timeout, the thread is interrupted and the dataflow is considered as a failure. The session is rolled back and the connector retriggers the flow. Defaults to 60 seconds if not specified.
- Default Value
- 60 seconds
- Accepted Values
- Required
- false
extensions.directory
- Description
- Specifies the directory that stores downloaded extensions. Extensions are the NAR (NiFi Archive) files containing the processors and controller services a flow might use. Since Stateless NiFi is only the NiFi engine, it does not contain any of the processors and controller services you might use in your flow. When deploying the connector with the custom flow, the system needs to download the specific extensions that your flow uses from Nexus (unless they are already present in this directory). These extensions are stored in this directory. Because the default directory might not be writable, and to aid in upgrade scenarios, Cloudera recommends that you always specify an extensions directory.
- Default Value
- /tmp/nifi-stateless-extensions
- Accepted Values
- Required
- true
flow.snapshot
- Description
- Specifies the dataflow to run. When using Streams Messaging Manager to deploy a connector, the value you set in this property must be a JSON object. URLs, file paths, or escaped JSON strings are not supported when using Streams Messaging Manager. Alternatively, if using the Kafka Connect REST API to deploy a connector, this can be a file containing the dataflow, a URL that points to a dataflow, or a string containing the entire dataflow as an escaped JSON. Cloudera however, does not recommend using the Kafka Connect REST API to interact with this connector or Kafka Connect.
- Default Value
- Accepted Values
- Required
- true
header.attribute.regex
- Description
- A Java regular expression that is evaluated against all flowfile attribute names. Any attribute name matching the regular expression is converted into a Kafka message header. The name of the attribute is used as the header key, the value of the attribute is used as the header value. If not specified, headers are not added to the Kafka record.
- Default Value
- Accepted Values
- Required
- false
header.name.regex
- Description
- 
            A Java regular expression that will be evaluated against all flowfile attribute names. For any attribute whose name matches the regular expression, the Kafka record will have a header whose name matches the attribute name and whose value matches the attribute value. If not specified, the Kafka record will have no headers added to it. 
- Default Value
- Accepted Values
- Required
- false
key.attribute
- Description
- Specifies the name of a flowfile attribute that should be used to specify the key of the Kafka record. If not specified, the Kafka record will not have a key associated with it. If specified, but the attribute does not exist on a particular flowfile, it will also have no key associated with it.
- Default Value
- Accepted Values
- Required
- false
krb5.file
- Description
- Specifies the krb5.conffile to use if the dataflow interacts with any services that are secured using Kerberos. Defaults to/etc/krb5.confif not specified.
- Default Value
- /etc/krb5.conf
- Accepted Values
- Required
- false
name
- Description
- The name of the connector. On the Streams Messaging Manager UI, the
            connector names are specified using the Enter Name field. The
            name that you enter in the Enter Name field is automatically set
            as the value of the nameproperty when the connector is deployed. Because of this, the name property is omitted from the configuration template provided in Streams Messaging Manager. If you manually add the name property to the configuration in Streams Messaging Manager, ensure that the value you set matches the connector name specified in the Enter Name field. Otherwise, the connector fails to deploy.
- Default Value
- Accepted Values
- Required
- True
nexus.url
- Description
- Specifies the Base URL of the Nexus instance to source extensions from. If configuring
            a Nexus instance that has multiple repositories, include the name of the repository in
            the URL. For example,
              https://nexus-private.myorganization.org/nexus/repository/my-repository/. If the property is not specified, the necessary extensions (the ones used by the flow) must be provided in the extensions directory before deploying the connector.
- Default Value
- Accepted Values
- Required
- true
output.port
- Description
- The name of the output port in the NiFi dataflow to pull data from. If the dataflow contains exactly one port, this property is optional and can be omitted. However, if the dataflow contains multiple ports (for example, a success and a failure port), this property must be specified. If any flowfile is sent to any port other than the specified port, it is considered as a failure. The session is rolled back and no data is collected.
- Default Value
- Accepted Values
- Required
- false
parameter.[***FLOW PARAMETER NAME***]
- Description
- Specifies a parameter to use in the dataflow. For example, assume that you have the
            following entry in your connector configuration "parameter.Directory": "/mydir".In a case like this, any parameter context in the dataflow that has a parameter namedDirectorygets the specified value (/mydir). If the dataflow has child process groups, and those child process groups have their own parameter contexts, the value is used for all parameter contexts that contain a parameter namedDirectory. Parameters can also be applied to specific Parameter Contexts only. This can be done by prefixing the parameter name (Directory) with the name of the parameter context followed by a colon. For example,parameter.My Context:Directoryonly applies the specified value for theDirectoryparameter in the parameter context named My Context.
- Default Value
- Accepted Values
- Required
- false
topic.name.attribute
- Description
- Specifies the name of a flowfile attribute to use for determining which Kafka topic a
            flowfile is sent to. Either the topicsortopic.name.attributeproperty must be specified. If both are specified,topic.name.attributetakes precedence. However, if a flowfile does not have the specified attribute name, then the connector falls back to using the topics property.
- Default Value
- Accepted Values
- Required
- false
topics
- Description
- The name of the topic to deliver data to. All flowfiles are delivered to the topic
            specified here. However, it is also possible to determine the topic individually for
            each flowfile. To do this, ensure that the dataflow specifies the topic name in an
            attribute, and then use topic.name.attributeto specify the name of the attribute instead of topic name. For example, if you wanted a separate Kafka topic for each data source, you can omit thetopicsproperty and instead specify the attribute (for example,datasource.hostname) corresponding to the topic using thetopic.name.attributeproperty.
- Default Value
- Accepted Values
- Required
- true
working.directory
- Description
- Specifies a directory on the Connect server that NiFi should use for unpacking
            extensions that it needs to perform the dataflow. The contents of
              extensions.directoryare unpacked here. Defaults to/tmp/nifi-stateless-workingif not specified.
- Default Value
- /tmp/nifi-stateless-working
- Accepted Values
- Required
- false
JMS Source properties reference
Review the following reference for a comprehensive list of the connector properties that are specific to the JMS Source connector.
parameter.[***CONNECTOR NAME***] Parameters:In addition to the properties listed here, this connector also accepts certain properties of the Kafka Connect framework as well as the properties of the NiFi Stateless Source connector. When creating a new connector using the Streams Messaging Manager UI, all valid properties are presented in the default configuration template. You can view the configuration template to get a full list of valid properties. In addition, for more information regarding the accepted properties not listed here, you can review the Apache Kafka documentation and the Stateless NiFi Source property reference.
JMS Broker URI
- Description
- The URI to use to connect to the JMS Message Broker. For example,
              ssl://myhost:61617
- Default Value
- Accepted Values
- Required
- true
JMS Client Libraries
- Description
- The path to the directory containing the JMS client libraries.
- Default Value
- Accepted Values
- Required
- true
JMS Connection Factory Class Name
- Description
- The fully qualified name of the JMS ConnectionFactory implementation class. For
            example, org.apache.activemq.ActiveMQSslConnectionFactory.
- Default Value
- Accepted Values
- Required
- true
JMS Destination Name
- Description
- The name of the JMS Destination.
- Default Value
- Accepted Values
- Required
- true
JMS Destination Type
- Description
- The type of the JMS Destination.
- Default Value
- QUEUE
- Accepted Values
- QUEUE, TOPIC
- Required
- true
JMS User Name
- Description
- The username used for authentication.If username/password authentication is not required by the JMS Message Broker, this property must be completely removed from the configuration. 
- Default Value
- Accepted Values
- Required
- false
JMS User Password
- Description
- The password used for authentication.If username/password authentication is not required by the JMS Message Broker, this property must be completely removed from the configuration. 
- Default Value
- Accepted Values
- Required
- false
Keystore Filename
- Description
- The fully-qualified filename of a keystore. This keystore is used for mutual TLS
            towards the JMS Message Broker.If the JMS Message Broker does not require client certificate authentication, this property must be completely removed from the configuration. 
- Default Value
- Accepted Values
- Required
- false
Keystore Key Password
- Description
- The password used to access the key stored in the keystore file configured in the
            Keystore Filename property.If the JMS Message Broker broker does not require client certificate authentication, this property must be completely removed from the configuration. 
- Default Value
- Accepted Values
- Required
- false
Keystore Password
- Description
- The password used to access the contents keystore configured in the Keystore Filename
              property.If the JMS Message Broker does not require client certificate authentication, this property must be completely removed from the configuration. 
- Default Value
- Accepted Values
- Required
- false
Keystore Type
- Description
- The type of the keystore configured in the Keystore Filename property.If the JMS Message Broker does not require client certificate authentication, this property must be completely removed from the configuration. 
- Default Value
- Accepted Values
- BCFKS, PKCS12, JKS
- Required
- false
Truststore Filename
- Description
- The fully-qualified filename of a truststore. This truststore is used to establish a secure connection with the JMS server using TLS.
- Default Value
- The location of the default truststore which is empty and can only be used for unsecure connections.
- Accepted Values
- Required
- true
Truststore Password
- Description
- The password used to access the contents of the truststore configured in the Truststore Filename property.
- Default Value
- password
- Accepted Values
- Required
- true
Truststore Type
- Description
- The type of the truststore configured in the Truststore Filename property.
- Default Value
- JKS
- Accepted Values
- BCFKS, PKCS12, JKS
- Required
- true
