Streaming Split Join Pattern
About This Task
Your objective is to perform three enrichments:
-
Retrieve a driver's certification and wage plan from the driver's table.
-
Retrieve the driver's hours and miles logged from the timesheet table.
-
Query weather information for a specific time and location.
To do this, use the split join pattern to split the stream into three, perform the enrichment in parallel, and then re-join the three streams.
Steps for Creating a Split Join Key
- Create a new split key in the stream which allows you to join in a common field when
you join the three stream.
To do this, drag the projection processor to the canvas and create a connection from the EventType rule processor to this projection processor.
When configuring the connection, select the Non Violation Events Rule which tells SAM to only send non-violation events to this project processor.
- Configure the projection processor to create the split join key called
splitJoinValue using the custom UDF you uploaded earlier called
"TIMESTAMP_LONG".
You will also do a transformation which calculates the week based on the event time which is required for one of the enrichments downstream. Configure the processor with the following parameters:
Steps for Splitting the Stream into Three to Perform Enrichments in Parallel
- With the split join key created, you can split the stream into three to perform the
enrichments in parallel.
To do the first split to enrichment the wage and certification status of driver, drag the "ENRICH-PHOENIX" processor to the canvas and connect it from the Split project processor.
- Configure the enrich processor with the following parameters:
- ENRICHMENT SQL: select certified, wage_plan from drivers where driverid=${driverId}
- ENRICHMENT OUTPUT FIELDS: driverCertification, driverWagePlan
- SECURE CLUSTER: false
- OUTPUT FIELDS: Click Select All.
- NEW OUTPUT FIELDS: Add new output fields for the two enriched values: driverCertification and driverWagePlan.
After this processor executes, the output schema will have two fields populated called driverCertification and driverWagePlan.
- Create the second stream to enrich the drivers hours and miles logged in last week by
dragging another "ENRICH-PHOENIX" processor to the canvas and connecting it from the
Split projection processor.
- Configure the enrich processor with the following parameters:
- ENRICHEMNT SQL: select hours_logged, miles_logged from timesheet where driverid= ${driverId} and week=${week}
- ENRICHMENT OUTPUT FIELDS: driverFatigueByHours, driverFatigueByMiles
- SECURE CLUSTER: false
- OUTPUT FIELDS: Select the splitJoinValue field.
- NEW OUTPUT FIELDS: Add new output fields for the two enriched values
driverFatigueByHours and
driverFatigueByMiles.
After this processor executes, the output schema will have two fields populated called driverFatigueByHours and driverFatigueByMiles.
- Create the third stream to do weather enrichment by dragging the custom processor you
uploaded called "ENRICH-WEATHER" processor to the canvas and connect it from the Split
project processor.
- Configure the weather process with the following parameters (currently the weather
processor is just a stub that generates random normalized weather info).
- WEATHER WEB SERVICE URL: http://weather.com/api?lat=${latitude}&lng=${longitude}
- INPUT SCHEMA MAPPINGS: Leave defaults
- OUTPUT FIELDS: Select the splitJoinValue and the three model enriched features
After this processor executes, the output schema will have three fields populated called Model_Feature_FoggyWeather, Model_Feature_RainyWeather, and Model_Feature_WindyWeather.
Steps for Rejoining the Three Enriched Streams
- Now that you have done the enrichment in parallel by splitting the stream into three,
you can now join the three streams by dragging the join processor to the canvas and
connecting the join from the three streams.
- Configure the join processor like the following where you use the joinSplitValue to
join all three streams.
For the Output field, click SELECT ALL to select all the fields across the three streams.
- Now that you have joined three enriched streams, normalize the data into the format
that the model expects by dragging the "NORMALIZE-MODEL-FEATURES" custom processor that
you added to the canvas.
For the output fields, select all the fields and leave the mapping as defaults.
Result
Your flow looks similar to the following.