ConsumePulsar

Description:

Consumes messages from Apache Pulsar. The complementary NiFi processor for sending messages is PublishPulsar.

Additional Details...

Tags:

Pulsar, Get, Ingest, Ingress, Topic, PubSub, Consume

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Pulsar Client ServicePULSAR_CLIENT_SERVICEController Service API:
PulsarClientService
Implementation: StandardPulsarClientService
Specified the Pulsar Client Service that can be used to create Pulsar connections
Topic NamesTOPICSSpecify the topics this consumer will subscribe on. You can specify multiple topics in a comma-separated list.E.g topicA, topicB, topicC
Supports Expression Language: true (will be evaluated using variable registry only)
Topics PatternTOPICS_PATTERNAlternatively, you can specify a pattern for topics that this consumer will subscribe on. It accepts a regular expression and will be compiled into a pattern internally. E.g. "persistent://my-tenant/ns-abc/pattern-topic-.*" would subscribe to any topic whose name started with 'pattern-topic-' that was in the 'ns-abc' namespace, and belonged to the 'my-tenant' tentant.
Subscription NameSUBSCRIPTION_NAMESpecify the subscription name for this consumer.
Subscription Initial PositionSUBSCRIPTION_INITIAL_POSITIONLatest
  • Earliest The earliest position which means the start consuming position will be the first message.
  • Latest The latest position which means the start consuming position will be the last message.
Specify subscription initial position. By default the subscription will be created at the end of the topic.
Consumer NameCONSUMER_NAMESet the name of the consumer to uniquely identify this client on the Broker
Async EnabledASYNC_ENABLEDfalse
  • true
  • false
Control whether the messages will be consumed asynchronously or not. Messages consumed synchronously will be acknowledged immediately before processing the next message, while asynchronous messages will be acknowledged after the Pulsar broker responds. Enabling asynchronous message consumption introduces the possibility of duplicate data consumption in the case where the Processor is stopped before it has time to send an acknowledgement back to the Broker. In this scenario, the Broker would assume that the un-acknowledged message was not successfully processed and re-send it when the Processor restarted.
Maximum Async RequestsMAX_ASYNC_REQUESTS50The maximum number of outstanding asynchronous consumer requests for this processor. Each asynchronous call requires memory, so avoid setting this value to high.
Acknowledgment TimeoutACK_TIMEOUT30 secSet the timeout for unacked messages. Messages that are not acknowledged within the configured timeout will be replayed. This value needs to be greater than 10 seconds.
Consumer Priority LevelPRIORITY_LEVEL5Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
Consumer Receiver Queue SizeRECEIVER_QUEUE_SIZE1000The consumer receive queue controls how many messages can be accumulated by the Consumer before the application calls Consumer.receive(). Using a higher value could potentially increase the consumer throughput at the expense of bigger memory utilization. Setting the consumer queue size as zero, - Decreases the throughput of the consumer, by disabling pre-fetching of messages. - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with broker and consumer will not be able receive any further message unless batch-message in pipeline is removed
Subscription TypeSUBSCRIPTION_TYPEShared
  • Exclusive There can be only 1 consumer on the same topic with the same subscription name
  • Shared Multiple consumer will be able to use the same subscription name and the messages
  • Key_Shared Multiple consumers will be able to use the same subscription name and messages but only 1 consumer will receive the messages for a given message key.
  • Failover Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages.
Select the subscription type to be used when subscribing to the topic.
Consumer Message Batch SizeCONSUMER_BATCH_SIZE1000Set the maximum number of messages consumed at a time, and published to a single FlowFile. default: 1000. If set to a value greater than 1, messages within the FlowFile will be seperated by the Message Demarcator.
Supports Expression Language: true (will be evaluated using variable registry only)
Message DemarcatorMESSAGE_DEMARCATOR Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages consumed from Pulsar within a single FlowFile. If not specified, the content of the FlowFile will consist of all of the messages consumed from Pulsar concatenated together. If specified, the contents of the individual Pulsar messages will be separated by this delimiter. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.
Supports Expression Language: true (will be evaluated using variable registry only)
Mapped FlowFile AttributesMAPPED_FLOWFILE_ATTRIBUTESA comma-delimited list of FlowFile attributes to set based on message metadata (currently key and properties). Syntax for an individual mapping is <attribute name>[=<source property name or key>]. To map the message key to an attribute, use the reserved name __KEY__ (ex. my-attribute=__KEY__ ). If the optional source name is omitted, it is assumed to be the same as the attribute.
Replicate Subscription StateREPLICATE_SUBSCRIPTION_STATEfalse
  • true
  • false
Control whether to replicate subscription state across multiple geographical regions in case the topic is geo-replicated. In case of failover, the consumer can restart consuming from the failure point in a different cluster.

Relationships:

NameDescription
successFlowFiles for which all content was consumed from Pulsar.

Reads Attributes:

None specified.

Writes Attributes:

NameDescription
message.countThe number of messages received from Pulsar

State management:

This component does not store state.

Restricted:

This component is not restricted.

Input requirement:

This component does not allow an incoming relationship.

System Resource Considerations:

None specified.

See Also:

PublishPulsar, ConsumePulsarRecord, PublishPulsarRecord