This section walks you through designing your function by developing a data flow using
NiFi on your local development workstation. Once the NiFi flow has been developed and tested,
you will deploy it as a Cloudera DataFlow function in serverless mode using AWS
Lambda.
1. Install Apache NiFi
Download and install Apache NiFi 1.16.X on your local development
workstation.
Follow the instructions to download and install the latest
version of Apache NiFi.
Log into NiFi with the generated credentials.
You can see the NiFi flow designer canvas.
2. Import the Telemetry Tutorial NiFi Flow
Download and add the example data flow to Apache NiFi. This example flow implements
the requirements of the use case described in this quickstart.
Drag a Process Group (fourth icon from the left) onto the canvas.
Click the browse link, select the Telemetry Tutorial NiFi Flow you have downloaded,
and click Add.
Explore the flow that you have uploaded to the canvas.
Double-click the DataFlowFunctionsQuickstart process group.
The Mock Data - S3 Trigger Event processor will generate a test Lambda S3 trigger
event. You can use this processor to test the function locally before deploying it
on AWS Lambda.
Double-click the Quickstart Function -TelemetryS3toS3Processing process group
that contains the flow for processing the telemetry data.
Note the details of this flow:
S3Trigger is an input port that the AWS Lambda handler sends the S3
trigger event to. All functions need to start with the input port for the
corresponding trigger that will be used.
DFFunctionS3TriggerParser parses the json trigger event to extract
the bucket, key and region and store in flow attributes so it can be easily
used by the downstream processors.
Fetch Telemetry File fetches the new telemetry file that landed in
S3 and stores it as a flowfile.
Route Telemetry Events routes sensor events in the file to
different paths based on the eventSource value in the event. Events with
eventSource of ‘truck_geo_event” will be routed to one path and events with
value ‘truck_speed_event’ be routed to another path.
For the events sent to the truck_geo_event path, the event will be
enriched using custom groovy code that looks up the geo address based on the
lat and long values in the event. The enriched events are then converted to
parquet format using the supplied schema and the file is stored in a new s3
folder.
For the events sent to the truck_speed_event path, the events are
filtered for any events with speed > 40 and then converted into parquet format
and stored in a different s3 folder.
3. Configure and start the Telemetry Tutorial NiFi Flow
Apache NiFi uses the concept of parameter context to store properties of the flow
that need to change when deploying to different environments. The parameter context for the
function in this tutorial is called DataFlowFunctionsQuickstart.
Configure the parameter context.
Right-click the canvas and select Parameters.
Update the following four parameter values based on your environment.
aws_access_key_id - AWS access key id to fetch and write objects to
S3 bucket
aws_access_key_password - AWS secret access key secret to fetch and
write objects to S3
s3_bucket - S3 bucket you created where telemetry data will be
processed
s3_region - the region where you created the bucket
Start the NiFi flow.
Start Controller Services.
Controller Services are shared services that can be used by processors and other
services to utilize for configuration or task execution
Within the process group called Quickstart Function -TelemetryS3toS3Processing,
select the canvas to bring mouse focus to it, right-click the canvas, select Configure > Controller Services.
There are five controller services defined for this function that are responsible
for parsing the incoming JSON telemetry data, writing the data in Parquet format
and doing the geo address lookup.
Click the bolt icon next to each service to enable it.
Close the configuration dialog.
Right-click the canvas and select Start to start the
process group called Quickstart Function -TelemetryS3toS3Processing.
You should see that each processor have a green play button which indicates that
all the processors are started and ready to receive data.
4. Test the Telemetry Tutorial NiFi Flow
With the flow configured and started, you can test it locally with a sample trigger
event before deploying it as a serverless function on AWS Lambda.
Configure the test S3 trigger event.
From within the process group Quickstart Function -TelemetryS3toS3Processing,
right-click the canvas and select Leave Group to go to the
parent processor group.
Right-click the Mock Data - S3 Trigger Event processor group and select
Configure.
Click the Properties tab and edit the Custom
Text property value.
This property value represents a mock trigger event that Lambda would create when
a new file called sampleTelemetryRawData.txt is added to the bucket folder.
Update the bucket.name JSON field with the name of the bucket you created
earlier.
Update the awsRegion JSON field to match the region in which you earlier created
the bucket.
Click OK and then Apply to save the
change.
Run the NiFi flow with the test trigger event.
Right-click the ‘Mock Data - S3 Trigger Event’ processor group and select ‘Run
Once’.
This will create a mock trigger event flow file.
If you double-click the process group Quickstart Function
-TelemetryS3toS3Processing, you can see the flow file processed by the
processors.
The In metric should show '1' across all processors.
Validate the output of the test.
The output of the test should be a parquet file that contains telemetry data that
is filtered and enriched based on the above requirements stored in the following s3
folders: <<your_bucket>>/processed/truck-geo-events and
<<your_bucket>>/processed/truck-speed-events.
If the Parquet file is in each of these folders, the local test has completed
successfully and the function works as expected and now can run on AWS Lambda.
5. Download the Telemetry Tutorial NiFi Flow
Download the Apache NiFi flow so that you can upload it into the Cloudera DataFlow Catalog and run it in serverless mode.
Right-click the ‘Quickstart Function -TelemetryS3toS3Processing’ process group.
Select Download Flow Definition and Without
external services.