ConsumePulsarRecord

Description:

Consumes messages from Apache Pulsar. The complementary NiFi processor for sending messages is PublishPulsarRecord. Please note that, at this time, the Processor assumes that all records that are retrieved have the same schema. If any of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the 'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.

Additional Details...

Tags:

Pulsar, Get, Record, csv, avro, json, Ingest, Ingress, Topic, PubSub, Consume

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Record ReaderRecord ReaderController Service API:
RecordReaderFactory
Implementations: Syslog5424Reader
CEFReader
ReaderLookup
CiscoEmblemSyslogMessageReader
CSVReader
GrokReader
SyslogReader
JsonTreeReader
JsonPathReader
XMLReader
AvroReader
JASN1Reader
ExcelReader
ParquetReader
EBCDICRecordReader
WindowsEventLogReader
IPFIXReader
ScriptedReader
The Record Reader to use for incoming FlowFiles
Record WriterRecord WriterController Service API:
RecordSetWriterFactory
Implementations: JsonRecordSetWriter
ParquetRecordSetWriter
CSVRecordSetWriter
ScriptedRecordSetWriter
XMLRecordSetWriter
FreeFormTextRecordSetWriter
AvroRecordSetWriter
RecordSetWriterLookup
The Record Writer to use in order to serialize the data before sending to Pulsar
Max Wait TimeMax Wait Time2 secondsThe maximum amount of time allowed for a Pulsar consumer to poll a subscription for data , zero means there is no limit. Max time less than 1 second will be equal to zero.
Supports Expression Language: true (will be evaluated using variable registry only)
Pulsar Client ServicePULSAR_CLIENT_SERVICEController Service API:
PulsarClientService
Implementation: StandardPulsarClientService
Specified the Pulsar Client Service that can be used to create Pulsar connections
Topic NamesTOPICSSpecify the topics this consumer will subscribe on. You can specify multiple topics in a comma-separated list.E.g topicA, topicB, topicC
Supports Expression Language: true (will be evaluated using variable registry only)
Topics PatternTOPICS_PATTERNAlternatively, you can specify a pattern for topics that this consumer will subscribe on. It accepts a regular expression and will be compiled into a pattern internally. E.g. "persistent://my-tenant/ns-abc/pattern-topic-.*" would subscribe to any topic whose name started with 'pattern-topic-' that was in the 'ns-abc' namespace, and belonged to the 'my-tenant' tentant.
Subscription NameSUBSCRIPTION_NAMESpecify the subscription name for this consumer.
Subscription Initial PositionSUBSCRIPTION_INITIAL_POSITIONLatest
  • Earliest The earliest position which means the start consuming position will be the first message.
  • Latest The latest position which means the start consuming position will be the last message.
Specify subscription initial position. By default the subscription will be created at the end of the topic.
Consumer NameCONSUMER_NAMESet the name of the consumer to uniquely identify this client on the Broker
Async EnabledASYNC_ENABLEDfalse
  • true
  • false
Control whether the messages will be consumed asynchronously or not. Messages consumed synchronously will be acknowledged immediately before processing the next message, while asynchronous messages will be acknowledged after the Pulsar broker responds. Enabling asynchronous message consumption introduces the possibility of duplicate data consumption in the case where the Processor is stopped before it has time to send an acknowledgement back to the Broker. In this scenario, the Broker would assume that the un-acknowledged message was not successfully processed and re-send it when the Processor restarted.
Maximum Async RequestsMAX_ASYNC_REQUESTS50The maximum number of outstanding asynchronous consumer requests for this processor. Each asynchronous call requires memory, so avoid setting this value to high.
Acknowledgment TimeoutACK_TIMEOUT30 secSet the timeout for unacked messages. Messages that are not acknowledged within the configured timeout will be replayed. This value needs to be greater than 10 seconds.
Consumer Priority LevelPRIORITY_LEVEL5Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
Consumer Receiver Queue SizeRECEIVER_QUEUE_SIZE1000The consumer receive queue controls how many messages can be accumulated by the Consumer before the application calls Consumer.receive(). Using a higher value could potentially increase the consumer throughput at the expense of bigger memory utilization. Setting the consumer queue size as zero, - Decreases the throughput of the consumer, by disabling pre-fetching of messages. - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with broker and consumer will not be able receive any further message unless batch-message in pipeline is removed
Subscription TypeSUBSCRIPTION_TYPEShared
  • Exclusive There can be only 1 consumer on the same topic with the same subscription name
  • Shared Multiple consumer will be able to use the same subscription name and the messages
  • Key_Shared Multiple consumers will be able to use the same subscription name and messages but only 1 consumer will receive the messages for a given message key.
  • Failover Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages.
Select the subscription type to be used when subscribing to the topic.
Consumer Message Batch SizeCONSUMER_BATCH_SIZE1000Set the maximum number of messages consumed at a time, and published to a single FlowFile. default: 1000. If set to a value greater than 1, messages within the FlowFile will be seperated by the Message Demarcator.
Supports Expression Language: true (will be evaluated using variable registry only)
Message DemarcatorMESSAGE_DEMARCATOR Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages consumed from Pulsar within a single FlowFile. If not specified, the content of the FlowFile will consist of all of the messages consumed from Pulsar concatenated together. If specified, the contents of the individual Pulsar messages will be separated by this delimiter. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.
Supports Expression Language: true (will be evaluated using variable registry only)
Mapped FlowFile AttributesMAPPED_FLOWFILE_ATTRIBUTESA comma-delimited list of FlowFile attributes to set based on message metadata (currently key and properties). Syntax for an individual mapping is <attribute name>[=<source property name or key>]. To map the message key to an attribute, use the reserved name __KEY__ (ex. my-attribute=__KEY__ ). If the optional source name is omitted, it is assumed to be the same as the attribute.
Replicate Subscription StateREPLICATE_SUBSCRIPTION_STATEfalse
  • true
  • false
Control whether to replicate subscription state across multiple geographical regions in case the topic is geo-replicated. In case of failover, the consumer can restart consuming from the failure point in a different cluster.

Relationships:

NameDescription
successFlowFiles for which all content was consumed from Pulsar.
parse_failureFlowFiles for which the content cannot be parsed.

Reads Attributes:

None specified.

Writes Attributes:

NameDescription
record.countThe number of records received

State management:

This component does not store state.

Restricted:

This component is not restricted.

Input requirement:

This component does not allow an incoming relationship.

System Resource Considerations:

None specified.

See Also:

PublishPulsar, ConsumePulsar, PublishPulsarRecord