Quick Flow: Consume Kafka to S3 bucket

Learn about this Quick Flow that consumes from Kafka topics and writes it to S3. Review flow details, download the template, and understand the template parameters.

About this Quick Flow

This flow definition consumes from one or more Kafka topics, merges Kafka events into a single file before writing data from each topic to its own path to S3. If you use more than one topic, they have to use the same schema.

You can specify which topics you want to read from, size, or flow file count thresholds for merging Kafka events into a single file as well as the target S3 bucket and path.

If a processing error occurs it is logged in NiFi’s app log through the LogProcessingFailures processor.

Quick Flow definition details and download

Quick Flow definition details
Flow Name Kafka_JSON-Merge-to-S3_Parquet
Flow Artifact Download the file:

Kafka_JSON-Merge-to-S3_Parquet

Source Kafka Topic
Source Format JSON
Destination S3
Destination Format Parquet

Quick Flow parameters

Parameter Name Description Sensitive Example and notes
CDP Env Truststore File Provide the Truststore file that you generated for the environment where your Kafka cluster is running in N
CDP Env Trustore Password Provide the password for the Truststore you created for the environment where your Kafka cluster is running in Y
CDP Schema Registry URL Specify the Schema Registry URL you want to connect to in the format https://hostname:7790/api/v1 N https://dataflow-streams-master0.​dataflow.xcu2-8y8x.dev.cldr.work:​7790/api/v1
CDPEnvironment DataFlow will use this parameter to auto-populate the Flow Deployment with Hadoop configuration files required to interact with S3. N
Kafka Broker Endpoint Specify the Kafka bootstrap servers string as a comma separated list N messaging-cluster-1-broker0.gvettica.​xcu2-8y8x.dev.cldr.work:9093,​messaging-cluster-1-broker1.gvettica​.xcu2-8y8x.dev.cldr.work:9093,​messaging-cluster-1-broker2.gvettica​.xcu2-8y8x.dev.cldr.work:9093
Kafka Consumer Group Id The name of the consumer group used for the the source topic you are consuming from N
Kafka Service Account

Specify the CDP machine user name that you want to use to authenticate to Kafka. Ensure this user has the appropriate access rights to the Kafka topics.

N
Kafka Service Account Password Specify the password of the CDP machine user you're using to authenticate against Kafka Y
Kafka Source Topic Specify a topic or list of topics that you want to read from N
Merge Max Number of Entries Specify the maximum number of flow files which will be merged into one file before writing to S3 N Default value 1000
Merge Max Size Specify the maximum size of flow files before they are merged and written to S3. Size can be specified in B, KB or MB. E.g. use "5 MB" to define a limit of 5 megabytes N
Merge Min Number of Entries Specify the minimum number of flow files which will be merged together before writing to S3 N Default value 1
Merge Min Size Specify the minimum size of flow files before they are merged and written to S3. Size can be specified in B, KB or MB. E.g. use "500 KB" to define a limit of 500 kilobytes N Default value 0B
S3 Bucket

Specify the name of the S3 bucket you want to write to. The full path will be constructed out of

s3a://#{S3 Bucket}/#{S3 Path}/${Kafka.topic}

N

Make sure to use the CDP data lake bucket

Example: cdp-bucket

S3 Path

Specify the path within the bucket where you want to write to without any leading characters. The full path will be constructed out of

s3a://#{S3 Bucket}/#{S3 Path}/${Kafka.topic}

N

Make sure that the path points to a location within the CDP data lake directory of your bucket

Example: demo-dl/ingest

S3 Service Account Specify the CDP machine user you want to use to write data to S3. Make sure that the user has the appropriate rights to write to the specified S3 path. N srv_nifi-s3-ingest
S3 Service Account Password Specify the password of the CDP machine user you're using to write to S3 Y
Schema Name Specify the schema name to be looked up in the Schema Registry.for the Source Kafka Topic N