PulsarClientService
Implementation: StandardPulsarClientService
Specified the Pulsar Client Service that can be used to create Pulsar connections | Topic Names | TOPICS | | | Specify the topics this consumer will subscribe on. You can specify multiple topics in a comma-separated list.E.g topicA, topicB, topicC Supports Expression Language: true (will be evaluated using variable registry only) |
Topics Pattern | TOPICS_PATTERN | | | Alternatively, you can specify a pattern for topics that this consumer will subscribe on. It accepts a regular expression and will be compiled into a pattern internally. E.g. "persistent://my-tenant/ns-abc/pattern-topic-.*" would subscribe to any topic whose name started with 'pattern-topic-' that was in the 'ns-abc' namespace, and belonged to the 'my-tenant' tentant. |
Subscription Name | SUBSCRIPTION_NAME | | | Specify the subscription name for this consumer. |
Subscription Initial Position | SUBSCRIPTION_INITIAL_POSITION | Latest | - Earliest
- Latest
| Specify subscription initial position. By default the subscription will be created at the end of the topic. |
Consumer Name | CONSUMER_NAME | | | Set the name of the consumer to uniquely identify this client on the Broker |
Async Enabled | ASYNC_ENABLED | false | | Control whether the messages will be consumed asynchronously or not. Messages consumed synchronously will be acknowledged immediately before processing the next message, while asynchronous messages will be acknowledged after the Pulsar broker responds.
Enabling asynchronous message consumption introduces the possibility of duplicate data consumption in the case where the Processor is stopped before it has time to send an acknowledgement back to the Broker. In this scenario, the Broker would assume that the un-acknowledged message was not successfully processed and re-send it when the Processor restarted. |
Maximum Async Requests | MAX_ASYNC_REQUESTS | 50 | | The maximum number of outstanding asynchronous consumer requests for this processor. Each asynchronous call requires memory, so avoid setting this value to high. |
Acknowledgment Timeout | ACK_TIMEOUT | 30 sec | | Set the timeout for unacked messages. Messages that are not acknowledged within the configured timeout will be replayed. This value needs to be greater than 10 seconds. |
Consumer Priority Level | PRIORITY_LEVEL | 5 | | Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) |
Consumer Receiver Queue Size | RECEIVER_QUEUE_SIZE | 1000 | | The consumer receive queue controls how many messages can be accumulated by the Consumer before the application calls Consumer.receive(). Using a higher value could potentially increase the consumer throughput at the expense of bigger memory utilization.
Setting the consumer queue size as zero,
- Decreases the throughput of the consumer, by disabling pre-fetching of messages.
- Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with broker and consumer will not be able receive any further message unless batch-message in pipeline is removed |
Subscription Type | SUBSCRIPTION_TYPE | Shared | - Exclusive
- Shared
- Key_Shared
- Failover
| Select the subscription type to be used when subscribing to the topic. |
Consumer Message Batch Size | CONSUMER_BATCH_SIZE | 1000 | | Set the maximum number of messages consumed at a time, and published to a single FlowFile. default: 1000. If set to a value greater than 1, messages within the FlowFile will be seperated by the Message Demarcator. Supports Expression Language: true (will be evaluated using variable registry only) |
Message Demarcator | MESSAGE_DEMARCATOR |
| | Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages consumed from Pulsar within a single FlowFile. If not specified, the content of the FlowFile will consist of all of the messages consumed from Pulsar concatenated together. If specified, the contents of the individual Pulsar messages will be separated by this delimiter. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS. Supports Expression Language: true (will be evaluated using variable registry only) |
Mapped FlowFile Attributes | MAPPED_FLOWFILE_ATTRIBUTES | | | A comma-delimited list of FlowFile attributes to set based on message metadata (currently key and properties). Syntax for an individual mapping is <attribute name>[=<source property name or key>]. To map the message key to an attribute, use the reserved name __KEY__ (ex. my-attribute=__KEY__ ). If the optional source name is omitted, it is assumed to be the same as the attribute. |
Replicate Subscription State | REPLICATE_SUBSCRIPTION_STATE | false | | Control whether to replicate subscription state across multiple geographical regions in case the topic is geo-replicated. In case of failover, the consumer can restart consuming from the failure point in a different cluster. |
Relationships:
Name | Description |
---|
success | FlowFiles for which all content was consumed from Pulsar. |
Reads Attributes:
None specified.Writes Attributes:
Name | Description |
---|
message.count | The number of messages received from Pulsar |
State management:
This component does not store state.Restricted:
This component is not restricted.Input requirement:
This component does not allow an incoming relationship.System Resource Considerations:
None specified.See Also:
PublishPulsar, ConsumePulsarRecord, PublishPulsarRecord