ConsumeAMQP

Description:

Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be emitted as its own FlowFile to the 'success' relationship.

Additional Details...

Tags:

amqp, rabbit, get, message, receive, consume

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
QueueQueueThe name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator.
Auto-Acknowledge Messagesauto.acknowledgefalse
  • true
  • false
If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing the NiFi session. Non-Auto-Acknowledge mode provides 'at-least-once' delivery semantics. If true (Auto-Acknowledge), messages that are delivered to the AMQP Client will be auto-acknowledged by the AMQP Broker just after sending them out. This generally will provide better throughput but will also result in messages being lost upon restart/crash of the AMQP Broker, NiFi or the processor. Auto-Acknowledge mode provides 'at-most-once' delivery semantics and it is recommended only if loosing messages is acceptable.
Batch Sizebatch.size10The maximum number of messages that should be processed in a single session. Once this many messages have been received (or once no more messages are readily available), the messages received will be transferred to the 'success' relationship and the messages will be acknowledged to the AMQP Broker. Setting this value to a larger number could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.
Prefetch Countprefetch.count0The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will no longer send new messages until consumer acknowledges some of the messages already delivered to it.Allowed values: from 0 to 65535. 0 means no limit
Header Output Formatheader.formatComma-Separated String
  • Comma-Separated String Put all headers as a string with the specified separator in the attribute 'amqp$headers'.
  • JSON String Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well.
  • FlowFile Attributes Put each header as attribute of the flow file with a prefix specified in the properties
Defines how to output headers from the received message
Header Key Prefixheader.key.prefixconsume.amqpText to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property

This Property is only considered if the [Header Output Format] Property has a value of "FlowFile Attributes".
Header Separatorheader.separator,The character that is used to separate key-value for header in String. The value must be only one character.

This Property is only considered if the [Header Output Format] Property has a value of "Comma-Separated String".
Remove Curly Bracesremove.curly.bracesFalse
  • True
  • False
If true Remove Curly Braces, Curly Braces in the header will be automatically remove.

This Property is only considered if the [Header Output Format] Property has a value of "Comma-Separated String".
BrokersBrokersA comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.
Supports Expression Language: true (will be evaluated using Environment variables only)
Host NameHost NamelocalhostNetwork address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.
Supports Expression Language: true (will be evaluated using Environment variables only)
PortPort5672Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.
Supports Expression Language: true (will be evaluated using Environment variables only)
Virtual HostVirtual HostVirtual Host name which segregates AMQP system for enhanced security.
Supports Expression Language: true (will be evaluated using Environment variables only)
User NameUser NameUser Name used for authentication and authorization.
Supports Expression Language: true (will be evaluated using Environment variables only)
PasswordPasswordPassword used for authentication and authorization.
Sensitive Property: true
AMQP VersionAMQP Version0.9.1
  • 0.9.1
AMQP Version. Currently only supports AMQP v0.9.1.
SSL Context Servicessl-context-serviceController Service API:
SSLContextService
Implementations: StandardRestrictedSSLContextService
StandardSSLContextService
The SSL Context Service used to provide client certificate information for TLS/SSL connections.
Use Client Certificate Authenticationcert-authenticationfalse
  • true
  • false
Authenticate using the SSL certificate rather than user name/password.

Relationships:

NameDescription
successAll FlowFiles that are received from the AMQP queue are routed to this relationship

Reads Attributes:

None specified.

Writes Attributes:

NameDescription
amqp$appIdThe App ID field from the AMQP Message
amqp$contentEncodingThe Content Encoding reported by the AMQP Message
amqp$contentTypeThe Content Type reported by the AMQP Message
amqp$headersThe headers present on the AMQP Message. Added only if processor is configured to output this attribute.
<Header Key Prefix>.<attribute>Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute
amqp$deliveryModeThe numeric indicator for the Message's Delivery Mode
amqp$priorityThe Message priority
amqp$correlationIdThe Message's Correlation ID
amqp$replyToThe value of the Message's Reply-To field
amqp$expirationThe Message Expiration
amqp$messageIdThe unique ID of the Message
amqp$timestampThe timestamp of the Message, as the number of milliseconds since epoch
amqp$typeThe type of message
amqp$userIdThe ID of the user
amqp$clusterIdThe ID of the AMQP Cluster
amqp$routingKeyThe routingKey of the AMQP Message
amqp$exchangeThe exchange from which AMQP Message was received

State management:

This component does not store state.

Restricted:

This component is not restricted.

Input requirement:

This component does not allow an incoming relationship.

System Resource Considerations:

None specified.