PublishPulsar

Description:

Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar Producer API.The messages to send may be individual FlowFiles or may be delimited, using a user-specified delimiter, such as a new-line. The complementary NiFi processor for fetching messages is ConsumePulsar.

Additional Details...

Tags:

Apache, Pulsar, Put, Send, Message, PubSub

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Pulsar Client ServicePULSAR_CLIENT_SERVICEController Service API:
PulsarClientService
Implementation: StandardPulsarClientService
Specified the Pulsar Client Service that can be used to create Pulsar connections
Topic NameTOPICThe name of the Pulsar Topic.
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Async EnabledASYNC_ENABLEDfalse
  • true
  • false
Control whether the messages will be sent asynchronously or not. Messages sent synchronously will be acknowledged immediately before processing the next message, while asynchronous messages will be acknowledged after the Pulsar broker responds. Running the processor with async enabled will result in increased the throughput at the risk of potential duplicate data being sent to the Pulsar broker.
Maximum Async RequestsMAX_ASYNC_REQUESTS50The maximum number of outstanding asynchronous publish requests for this processor. Each asynchronous call requires memory, so avoid setting this value to high.
Batching EnabledBATCHING_ENABLEDfalse
  • true
  • false
Control whether automatic batching of messages is enabled for the producer. default: false [No batching] When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the broker, leading to better throughput, especially when publishing small messages. If compression is enabled, messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or contents. When enabled default batch delay is set to 10 ms and default batch size is 1000 messages
Batching Max MessagesBATCHING_MAX_MESSAGES1000Set the maximum number of messages permitted in a batch within the Pulsar client. default: 1000. If set to a value greater than 1, messages will be queued until this threshold is reached or the batch interval has elapsed, whichever happens first.
Supports Expression Language: true (will be evaluated using variable registry only)
Batch IntervalBATCH_INTERVAL10 msSet the time period within which the messages sent will be batched if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval has been reached OR until the Batching Max Messages threshould has been reached, whichever occurs first.
Supports Expression Language: true (will be evaluated using variable registry only)
Block if Message Queue FullBLOCK_IF_QUEUE_FULLfalse
  • true
  • false
Set whether the processor should block when the outgoing message queue is full. Default is false. If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left in pending queue.
Compression TypeCOMPRESSION_TYPENone
  • None No compression
  • LZ4 Compress with LZ4 algorithm.
  • ZLIB Compress with ZLib algorithm
Set the compression type for the producer.
Message Routing ModeMESSAGE_ROUTING_MODERound Robin Partition
  • Custom Partition Route messages to a custom partition
  • Round Robin Partition Route messages to all partitions in a round robin manner
  • Single Partition Route messages to a single partition
Set the message routing mode for the producer. This applies only if the destination topic is partitioned
Message DemarcatorMESSAGE_DEMARCATORSpecifies the string (interpreted as UTF-8) to use for demarcating multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate Pulsar message. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Max Pending MessagesPENDING_MAX_MESSAGES1000Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
Supports Expression Language: true (will be evaluated using variable registry only)
Mapped Message PropertiesMAPPED_MESSAGE_PROPERTIESA comma-delimited list of message properties to set based on FlowFile attributes. Syntax for an individual property entry is <property name>[=<source attribute name>]. If the optional source attribute name is omitted, it is assumed to be the same as the property.
Message KeyMESSAGE_KEYThe key of the outgoing message.
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)

Relationships:

NameDescription
successFlowFiles for which all content was sent to Pulsar.
failureAny FlowFile that cannot be sent to Pulsar will be routed to this Relationship

Reads Attributes:

None specified.

Writes Attributes:

NameDescription
msg.countThe number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to This attribute is added only to FlowFiles that are routed to success.

State management:

This component does not store state.

Restricted:

This component is not restricted.

Input requirement:

This component requires an incoming relationship.

System Resource Considerations:

None specified.

See Also:

ConsumePulsar, ConsumePulsarRecord, PublishPulsarRecord