Getting Started with Streaming Analytics
Also available as:
PDF
loading table of contents...

Chapter 3. Creating a Dataflow Application

Data Producer Application Generates Events

The following is a sample of a raw truck event stream generated by the sensors.

Geo Sensor Stream

Speed Sensor Stream

The date producing application or data simulator publishes CSV events with schema name in the Kafka event header (leveraging the Kafka Header feature in Kafka 1.0). The following diagram illustrates this architecture:

Use NiFi's Kafka 1.0 ConsumeKafkaRecord to do the following:

  1. Grab the schema name from Kafka Header and store in flow attribute called schema.name

  2. Use the schema name to look up the Schema in HWX SR

  3. Use the schema from HWX SR to convert to ProcessRecords

  4. Route events by event source (speed or geo) using SQL

The below sections walks through how to setup this data flow.

NiFi: Create a Dataflow Application

To make things easier to setup, import the NiFi Template for this flow by downloading it to this Github location. After importing, select Use Case 1 process group. The following instructions are with respect to that flow.

NiFi Controller Services

Click on Flow Configuration Settings icon and select Controller Services tab.

Hortonworks Schema Registry Controller Service

  1. Click on Flow Configuration Settings icon and select Controller Services tab.

    You will see the HWX Schema Registry controller service. Edit the properties to configure the Schema Registry URL based on your environment. You can find this value in the Streaming Analytics Manager Service in Ambari for the configuration property called registry.url. An example of what the URL looks similar to http://$REGISTRY_SERVER:7788/api/v1.

  2. Edit the properties to configure the Schema Registry URL based on your environment.

    You can find this value in the Streaming Analytics Manager Service in Ambari for the configuration property called registry.url. The URL should look similar to the following: http://$REGISTRY_SERVER:7788/api/v1.

  3. Enable the HWX Schema Registry and the other controller services. For more information on the RecordReader and RecordWriter controller services, see the Schema Registry User Guide. You should have 5 controller services enabled like the following.

NiFi Ingests the Raw Sensor Events

In the Use Case 1 process group, go into the "Acquire Events" process group. The first step in the NiFi flow is to ingest the csv events from the Kafka topic called raw-all_truck_events_csv. We will use the new Kafka 1.0 ConsumerKafkaRecord processor for this.

Configure the ‘Kafka Brokers’ value in ConsumeKafkaRecord_1_0 based on your cluster.

Kafka 1.0 ConsumerKafkaRecord will do the following:

  1. Grab the schema name from Kafka Header and store in Attribute called schema.name (see property called ‘Headers to Add as Attributes’).

  2. Use the schema name to look up the Schema in HWX Schema Registry.

  3. Use the schema from HWX SR to convert to ProcessRecords.

  4. Route events by event source (speed or geo) using SQL.

Publish Enriched Events to Kafka for Consumption by Analytics Applications

After NiFi has done the routing, transforms, and enrichment, NiFi will publish the enriched events into Kafka topics. These topics have a schema registered for it in the Schema Registry and we will store the schema identifier for the schema in the FlowFile attributes (UpdateAttribute processors) and use the PublishKafkaRecord processor to push the events into Kafka.

The PublishKafkaRecord_1_0 processor to push the events into Kafka serialized as Avro objects with HWX Schema encoded into the payload so that SAM can process it. Make sure for the PublishKafkaRecord, you change the Kafka Brokers property value to your cluster settings.

[Note]Note

Make sure for the PublishKafkaRecord, you change the Kafka Brokers property value to your cluster settings.

Start the NiFi Flow

Start the Process Grouped called "Use Case 1".