Build your draft flow
Start building your draft flow by adding components to the Canvas and configuring them.
-
Add an InvokeHTTP processor to the canvas.
-
Drag a Processor from the
Components sidebar to the canvas.
-
In the text box, filter for InvokeHTTP.
- Change the Processor Name to Get Recent Wikipedia Changes.
- Click Add.
After configuration, this processor calls the Wikipedia API to retrieve the latest changes.
-
Drag a Processor from the
Components sidebar to the canvas.
-
Configure the Get Recent Wikipedia Changes
processor.
Properties
- HTTP URL
- provide
https://en.wikipedia.org/w/api.php?action=query&list=recentchanges&format=json&rcprop=user%7Ccomment%7Cparsedcomment%7Ctimestamp%7Ctitle%7Csizes%7Ctags
RelationshipsSelect the following relationships:
- Original - Terminate
-
Failure - Terminate, Retry
-
Retry - Terminate
-
No Retry - Terminate
-
Click Apply.
-
Add a ConvertRecord processor to the canvas.
- Drag a Processor from the Components sidebar to the canvas.
- In the text box, filter for ConvertRecord.
- Change the Processor Name to Convert JSON to AVRO.
- Click Add.
This processor converts the JSON response to AVRO format. It uses RecordReaders and RecordWriters to accomplish this. It infers the JSON schema starting from the recent changes field.
-
Configure the Convert JSON to AVRO processor.
Properties
- Record Reader
- Select the JSON_Reader_Recent_Changes controller service you have created from the drop-down list.
- Record Writer
- Select the AvroWriter_Recent_Changes controller service you have created from the drop-down list.
RelationshipsSelect the following relationships:
failure - Terminate, Retry
- Click Apply.
-
Connect the Get Recent Wikipedia Changes and
Convert JSON to AVRO processors by hovering over the
lower-right corner of the Get Recent Wikipedia Changes
processor, clicking the arrow that appears and dragging it to the
Convert JSON to AVRO processor.
-
In the configuration popup, select the Response
relationship and click Add.
-
Add a QueryRecord processor.
- Drag a Processor from the Components sidebar to the canvas.
- In the text box, filter for QueryRecord.
- Name it Filter Edits.
- Click Add.
This processor filters out anything but actual page edits. To achieve this, it's running a query that selects all FlowFiles (events) of the type
edit
. -
Configure the Filter Edits processor.
Properties
- Record Reader
- Select the AvroReader_Recent_Changes controller service you have created from the drop-down list.
- Record Writer
- Select the AvroWriter_Recent_Changes controller service you have created from the drop-down list.
RelationshipsSelect the following relationship:
- failure - Terminate
-
original - Terminate
-
For the Filter Edits processor you also need to add a
user-defined property. Click Add Property.
- Provide Filtered edits as Name
-
Provide
Select * from FLOWFILE where type='edit'
as Value. - Click Apply.
- Connect the Convert JSON to AVRO and Filter Edits processors by hovering over the lower-right corner of the Convert JSON to AVRO processor, clicking the arrow that appears and dragging it to the Filter Edits processor.
- In the configuration pane, select the success and failure relationships and click Add.
-
Add a second QueryRecord processor.
- Drag a Processor from the Components sidebar to the canvas.
- In the text box, filter for QueryRecord.
- Name it Route on Content Size.
- Click Add.
This processor uses two SQL statements to separate edit events that resulted in a longer article from edit events that resulted in a shorter article.
-
Configure the Route on Content Size processor.
Properties
- Record Reader
- Select the AvroReader_Recent_Changes controller service you have created from the drop-down list.
- Record Writer
- Select the AvroWriter_Recent_Changes controller service you have created from the drop-down list.
RelationshipsSelect the following relationships:
-
failure - Terminate, Retry
-
original - Terminate
-
For the Route on Content Size processor you also need to
add two user-defined properties.
- Click Add Property.
- Provide Added content as Name
-
Provide
Select * from FLOWFILE where newlen>=oldlen
as Value. - Click Add.
- Click Add Property, to create the second property.
- Provide Removed content as Name.
-
Provide
Select * from FLOWFILE where newlen<oldlen
as Value. - Click Add.
- Click Apply.
-
Connect the Filter Edits and Route on Content
Size processors by hovering over the lower-right corner of the
Filter Edits processor, clicking the arrow that
appears and drawing it to Route on Content Size.
In the Create Connection pop up select the Filtered edits relation and click Add.
-
Add two MergeRecord processors.
- Drag a Processor from the Components sidebar to the canvas.
- In the text box, filter for MergeRecord.
- Name it Merge Edit Events.
- Click Add.
- Repeat the above steps to add another identical processor.
These processors are configured to merge at least 100 records into one flowfile to avoid writing lots of small files. The MaxBinAge property is set to 2 minutes which makes the processors merge records after two minutes even if less than 100 records have arrived.
-
Configure the two Merge Edit Events processors.
Properties
- Record Reader
- Select the AvroReader_Recent_Changes controller service you have created from the drop-down list.
- Record Writer
- Select the AvroWriter_Recent_Changes controller service you have created from the drop-down list.
- Max Bin Age
- Set to two minutes by providing a value of
2 min
.
RelationshipsSelect the following relationships:
-
failure - Terminate
-
original - Terminate
-
Connect the Route on Content Size processor to both of
the Merge Edit Events processors.
- For the first Merge Edit Events processor, select Added content from Relationships and click Add.
- For the second Merge Edit Events processor, select Removed content from Relationships and click Add.
-
Add two PutFile processors to the canvas.
- Drag a Processor from the Components sidebar to the canvas.
- In the text box, filter for PutFile.
- Name it Write "Added Content" Events To File.
- Click Add.
- Repeat the above steps to add another identical processor, naming this second PutFile processor Write "Removed Content" Events To File.
These processors write the filtered, routed edit events to two different locations on the local disk. In Cloudera DataFlow, you would typically not write to local disk but replace these processors with processors that resemble your destination system (Kafka, Database, Object Store etc.)
-
Configure the Write "Added Content" Events To File
processor.
Properties:
- Directory
- Provide
/tmp/larger_edits
- Maximum File Count
- Provide 500
RelationshipsSelect the following relationships:
-
SUCCESS - Terminate
-
failure - Terminate
- Click Apply.
-
Configure the Write "Removed Content" Events To File
processor.
Properties:
- Directory
- Provide
/tmp/smaller_edits
- Maximum File Count
- Provide 500
RelationshipsSelect the following relationships:
-
SUCCESS - Terminate
-
failure - Terminate
- Click Apply.
-
Connect the Merge Edit Events processor with the
Added content connection to the Write
"Added Content" Events To File processor.
In the Create Connection pop up select merged and click Add.
-
Connect the Merge Edit Events processor with the
Removed content connection to the Write
"Removed Content" Events To File processor.
In the Create Connection pop up select merged and click Add.