This is a grooviest groovy script :)
variable | type | description |
---|---|---|
session | org.apache.nifi.processor.ProcessSession | the session that is used to get, change, and transfer input files |
context | org.apache.nifi.processor.ProcessContext | the context (almost unuseful) |
log | org.apache.nifi.logging.ComponentLog | the logger for this processor instance |
REL_SUCCESS | org.apache.nifi.processor.Relationship | the success relationship |
REL_FAILURE | org.apache.nifi.processor.Relationship | the failure relationship |
CTL | java.util.HashMap<String,ControllerService> | Map populated with controller services defined with `CTL.*` processor properties.
The `CTL.` prefixed properties could be linked to controller service and provides access to this service from a script without additional code. |
SQL | java.util.HashMap<String, groovy.sql.Sql> | Map populated with `groovy.sql.Sql` objects connected to corresponding database defined with `SQL.*` processor properties.
The `SQL.` prefixed properties could be linked only to DBCPSercice. |
RecordReader | java.util.HashMap<String,RecordReaderFactory> | Map populated with controller services defined with `RecordReader.*` processor properties.
The `RecordReader.` prefixed properties are to be linked to RecordReaderFactory controller service instances. |
RecordWriter | java.util.HashMap<String,RecordSetWriterFactory> | Map populated with controller services defined with `RecordWriter.*` processor properties.
The `RecordWriter.` prefixed properties are to be linked to RecordSetWriterFactory controller service instances. |
Dynamic processor properties | org.apache.nifi.components.PropertyDescriptor | All processor properties not started with `CTL.` or `SQL.` are bound to script variables |
Example: if you defined property `SQL.mydb`
and linked it to any DBCPService,
then you can access it from code SQL.mydb.rows('select * from mytable')
The processor automatically takes connection from dbcp service before executing script and tries to handle transaction:
database transactions automatically rolled back on script exception and committed on success.
Or you can manage transaction manually.
NOTE: Script must not disconnect connection.
The (org.apache.nifi.processors.groovyx.flow.SessionFile) is an actual object returned by session in Extended Groovy processor.
This flow file is a container that references session and the real flow file.
This allows to use simplified syntax to work with file attributes and content:
set new attribute value
flowFile.ATTRIBUTE_NAME = ATTRIBUTE_VALUE flowFile.'mime.type' = 'text/xml' flowFile.putAttribute("ATTRIBUTE_NAME", ATTRIBUTE_VALUE) //the same as flowFile = session.putAttribute(flowFile, "ATTRIBUTE_NAME", ATTRIBUTE_VALUE)
remove attribute
flowFile.ATTRIBUTE_NAME = null //equals to flowFile = session.removeAttribute(flowFile, "ATTRIBUTE_NAME")
get attribute value
String a = flowFile.ATTRIBUTE_NAME
write content
flowFile.write("UTF-8", "THE CharSequence to write into flow file replacing current content") flowFile.write("UTF-8"){writer-> do something with java.io.Writer... } flowFile.write{outStream-> do something with output stream... } flowFile.write{inStream, outStream-> do something with input and output streams... }
get content
InputStream i = flowFile.read() def json = new groovy.json.JsonSlurper().parse( flowFile.read() ) String text = flowFile.read().getText("UTF-8")
transfer flow file to success relation
REL_SUCCESS << flowFile flowFile.transfer(REL_SUCCESS) //the same as: session.transfer(flowFile, REL_SUCCESS)
work with dbcp
import groovy.sql.Sql //define property named `SQL.db` connected to a DBCPConnectionPool controller service //for this case it's an H2 database example //read value from the database with prepared statement //and assign into flowfile attribute `db.yesterday` def daysAdd = -1 def row = SQL.db.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual") flowFile.'db.yesterday' = row.DB_DATE //to work with BLOBs and CLOBs in the database //use parameter casting using groovy.sql.Sql.BLOB(Stream) and groovy.sql.Sql.CLOB(Reader) //write content of the flow file into database blob flowFile.read{ rawIn-> def parms = [ p_id : flowFile.ID as Long, //get flow file attribute named `ID` p_data : Sql.BLOB( rawIn ), //use input stream as BLOB sql parameter ] SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id") }
In the extended groovy processor you can catch `start` and `stop` and `unscheduled` events by providing corresponding static methods:
import org.apache.nifi.processor.ProcessContext import java.util.concurrent.atomic.AtomicLong class Const{ static Date startTime = null; static AtomicLong triggerCount = null; } static onStart(ProcessContext context){ Const.startTime = new Date() Const.triggerCount = new AtomicLong(0) println "onStart $context ${Const.startTime}" } static onStop(ProcessContext context){ def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000 println "onStop $context executed ${ Const.triggerCount } times during ${ alive } seconds" } static onUnscheduled(ProcessContext context){ def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000 println "onUnscheduled $context executed ${ Const.triggerCount } times during ${ alive } seconds" } flowFile.'trigger.count' = Const.triggerCount.incrementAndGet() REL_SUCCESS << flowFile