Develop and test a flow in Apache NiFi

This section walks you through designing your function by developing a data flow using NiFi on your local development workstation. Once the NiFi flow has been developed and tested, you will deploy it as a DataFlow function in serverless mode using AWS Lambda.

1. Install Apache NiFi

Download and install Apache NiFi 1.16.X on your local development workstation.

  1. Follow the instructions to download and install the latest version of Apache NiFi.
  2. Log into NiFi with the generated credentials.

    You can see the NiFi flow designer canvas.

2. Import the Quickstart NiFi Flow

Download and add the example data flow to Apache NiFi. This example flow implements the requirements of the use case described in this quickstart.

  1. Download the Quickstart NiFi Flow to your local machine.
  2. Drag a Process Group (4th icon from the left) onto the canvas.
  3. Click the browse link, select the Quickstart NiFi Flow you have downloaded, and click Add.
  4. Explore the flow that you have uploaded to the canvas.
    1. Double-click the DataFlowFunctionsQuickstart process group.

      The Mock Data - S3 Trigger Event processor will generate a test Lambda S3 trigger event. You can use this processor to test the function locally before deploying it on AWS Lambda.

    2. Double-click the Quickstart Function -TelemetryS3toS3Processing process group that contains the flow for processing the telemetry data.
      Note the details of this flow:
      • S3Trigger is an input port that the AWS Lambda handler sends the S3 trigger event to. All functions need to start with the input port for the corresponding trigger that will be used.

      • DFFunctionS3TriggerParser parses the json trigger event to extract the bucket, key and region and store in flow attributes so it can be easily used by the downstream processors.

      • Fetch Telemetry File fetches the new telemetry file that landed in S3 and stores it as a flowfile.

      • Route Telemetry Events routes sensor events in the file to different paths based on the eventSource value in the event. Events with eventSource of ‘truck_geo_event” will be routed to one path and events with value ‘truck_speed_event’ be routed to another path.

      • For the events sent to the truck_geo_event path, the event will be enriched using custom groovy code that looks up the geo address based on the lat and long values in the event. The enriched events are then converted to parquet format using the supplied schema and the file is stored in a new s3 folder.

      • For the events sent to the truck_speed_event path, the events are filtered for any events with speed > 40 and then converted into parquet format and stored in a different s3 folder.

3. Configure and start the Quickstart NiFi Flow

Apache NiFi uses the concept of parameter context to store properties of the flow that need to change when deploying to different environments. The parameter context for the Quickstart function is called DataFlowFunctionsQuickstart.

  1. Configure the parameter context.
    1. Right-click the canvas and select Parameters.
    2. Update the following four parameter values based on your environment.
      • aws_access_key_id - AWS access key id to fetch and write objects to S3 bucket

      • aws_access_key_password - AWS secret access key secret to fetch and write objects to S3

      • s3_bucket - S3 bucket you created where telemetry data will be processed

      • s3_region - the region where you created the bucket

  2. Start the NiFi flow.
    1. Start Controller Services.
      Controller Services are shared services that can be used by processors and other services to utilize for configuration or task execution
    2. Within the process group called Quickstart Function -TelemetryS3toS3Processing, select the canvas to bring mouse focus to it, right-click the canvas, select Configure > Controller Services.

      There are five controller services defined for this function that are responsible for parsing the incoming JSON telemetry data, writing the data in Parquet format and doing the geo address lookup.

    3. Click the bolt icon next to each service to enable it.
    4. Close the configuration dialog.
    5. Right-click the canvas and select Start to start the process group called Quickstart Function -TelemetryS3toS3Processing.
      You should see that each processor have a green play button which indicates that all the processors are started and ready to receive data.

4. Test the Quickstart NiFi Flow

With the flow configured and started, you can test it locally with a sample trigger event before deploying it as a serverless function on AWS Lambda.

  1. Configure the test S3 trigger event.
    1. From within the process group Quickstart Function -TelemetryS3toS3Processing, right-click the canvas and select Leave Group to go to the parent processor group.
    2. Right-click the Mock Data - S3 Trigger Event processor group and select Configure.
    3. Click the Properties tab and edit the Custom Text property value.

      This property value represents a mock trigger event that Lambda would create when a new file called sampleTelemetryRawData.txt is added to the bucket folder.

    4. Update the bucket.name JSON field with the name of the bucket you created earlier.
    5. Update the awsRegion JSON field to match the region in which you earlier created the bucket.
    6. Click Ok and then Apply to save the change.
  2. Run the NiFi flow with the test trigger event.
    1. Right-click the ‘Mock Data - S3 Trigger Event’ processor group and select ‘Run Once’ .

      This will create a mock trigger event flow file.

    2. If you double-click the process group Quickstart Function -TelemetryS3toS3Processing, you can see the flow file processed by the processors.
      The In metric should show 1 across all processors.
  3. Validate the output of the test.
    • The output of the test should be a parquet file that contains telemetry data that is filtered and enriched based on the above requirements stored in the following s3 folders: <<your_bucket>>/processed/truck-geo-events and <<your_bucket>>/processed/truck-speed-events.
    • If the Parquet file is in each of these folders, the local test has completed successfully and the function works as expected and now can run on AWS Lambda.

5. Download the Quickstart NiFi Flow

Download the NiFi flow so that you can upload it into the DataFlow Catalog and run it in serverless mode.

  1. Right-click the ‘Quickstart Function -TelemetryS3toS3Processing’ process group.
  2. Select Download Flow Definition and Without external services.