The ExecuteScript Processor provides the ability to use a scripting language in order to leverage the NiFi API to perform tasks such as the following:
Notes:
The Processor expects a user defined script that is evaluated when the processor is triggered. The following variables are available to the scripts:
Variable Name | Description | Variable Class |
---|---|---|
session | This is a reference to the ProcessSession assigned to the processor. The session allows you to perform operations on FlowFiles such as create(), putAttribute(), and transfer(), as well as read() and write() | ProcessSession |
context | This is a reference to the ProcessContext for the processor. It can be used to retrieve processor properties, relationships, Controller Services, and the State Manager. | ProcessContext |
log | This is a reference to the ComponentLog for the processor. Use it to log messages to NiFi, such as log.info('Hello world!') | ComponentLog |
REL_SUCCESS | This is a reference to the "success" relationship defined for the processor. It could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship. | Relationship |
REL_FAILURE | This is a reference to the "failure" relationship defined for the processor. As with REL_SUCCESS, it could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship. | Relationship |
Dynamic Properties | Any dynamic (user-defined) properties defined in ExecuteScript are passed to the script engine as variables set to the PropertyValue object corresponding to the dynamic property. This allows you to get the String value of the property, but also to evaluate the property with respect to NiFi Expression Language, cast the value as an appropriate data type (e.g., Boolean), etc. Because the dynamic property name becomes the variable name for the script, you must be aware of the variable naming properties for the chosen script engine. For example, Groovy does not allow periods (.) in variable names, so an error will occur if "my.property" was a dynamic property name. Interaction with these variables is done via the NiFi Java API. The 'Dynamic Properties' section below will discuss the relevant API calls as they are introduced. | PropertyValue |
Get an incoming FlowFile from the session
Use Case: You have incoming connection(s) to ExecuteScript and want to retrieve one FlowFile from the queue(s) for processing.
Approach: Use the get() method from the session object. This method returns the FlowFile that is next highest priority FlowFile to process. If there is no FlowFile to process, the method will return null. NOTE: It is possible to have null returned even if there is a steady flow of FlowFiles into the processor. This can happen if there are multiple concurrent tasks for the processor, and the other task(s) have already retrieved the FlowFiles. If the script requires a FlowFile to continue processing, then it should immediately return if null is returned from session.get()
Groovy
flowFile = session.get() if(!flowFile) return
Jython
flowFile = session.get() if (flowFile != None): # All processing code starts at this indent # implicit return at the end
Javascript
var flowFile = session.get(); if (flowFile != null) { // All processing code goes here }
JRuby
flowFile = session.get() if flowFile != nil # All processing code goes here end
Get multiple incoming FlowFiles from the session:
Use Case: You have incoming connection(s) to ExecuteScript and want to retrieve multiple FlowFiles from the queue(s) for processing.
Approach: Use the get(maxResults) method from the session object. This method returns up to maxResults FlowFiles from the work queue. If no FlowFiles are available, an empty list is returned (the method does not return null). NOTE: If multiple incoming queues are present, the behavior is unspecified in terms of whether all queues or only a single queue will be polled in a single call. Having said that, the observed behavior (for both NiFi 1.1.0+ and before) is described here.
Examples:
Groovy
flowFileList = session.get(100) if(!flowFileList.isEmpty()) { flowFileList.each { flowFile -> // Process each FlowFile here } }
Jython
flowFileList = session.get(100) if not flowFileList.isEmpty(): for flowFile in flowFileList: # Process each FlowFile here
Javascript
flowFileList = session.get(100) if(!flowFileList.isEmpty()) { for each (var flowFile in flowFileList) { // Process each FlowFile here } }
JRuby
flowFileList = session.get(100) if !(flowFileList.isEmpty()) flowFileList.each { |flowFile| # Process each FlowFile here } end
Create a new FlowFile
Use Case: You want to generate a new FlowFile to send to the next processor.
Approach: Use the create() method from the session object. This method returns a new FlowFile object, which you can perform further processing on
Examples:
Groovy
flowFile = session.create() // Additional processing here
Jython
flowFile = session.create() # Additional processing here
Javascript
var flowFile = session.create(); // Additional processing here
JRuby
flowFile = session.create() # Additional processing here
Create a new FlowFile from a parent FlowFile
Use Case: You want to generate new FlowFile(s) based on an incoming FlowFile.
Approach: Use the create(parentFlowFile) method from the session object. This method takes a parent FlowFile reference and returns a new child FlowFile object. The newly created FlowFile will inherit all of the parent's attributes except for the UUID. This method will automatically generate a Provenance FORK event or a Provenance JOIN event, depending on whether or not other FlowFiles are generated from the same parent before the ProcessSession is committed.
Examples:
Groovy
flowFile = session.get() if(!flowFile) return newFlowFile = session.create(flowFile) // Additional processing here
Jython
flowFile = session.get() if (flowFile != None): newFlowFile = session.create(flowFile) # Additional processing here
Javascript
var flowFile = session.get(); if (flowFile != null) { var newFlowFile = session.create(flowFile); // Additional processing here }
JRuby
flowFile = session.get() if flowFile != nil newFlowFile = session.create(flowFile) # Additional processing here end
Add an attribute to a FlowFile
Use Case: You have a FlowFile to which you'd like to add a custom attribute.
Approach: Use the putAttribute(flowFile, attributeKey, attributeValue) method from the session object. This method updates the given FlowFile's attributes with the given key/value pair. NOTE: The "uuid" attribute is fixed for a FlowFile and cannot be modified; if the key is named "uuid", it will be ignored.
Also this is a good point to mention that FlowFile objects are immutable; this means that if you update a FlowFile's attributes (or otherwise alter it) via the API, you will get a new reference to the new version of the FlowFile. This is very important when it comes to transferring FlowFiles to relationships. You must keep a reference to the latest version of a FlowFile, and you must transfer or remove the latest version of all FlowFiles retrieved from or created by the session, otherwise you will get an error when executing. Most often, the variable used to store a FlowFile reference will be overwritten with the latest version returned from a method that alters the FlowFile (intermediate FlowFile references will be automatically discarded). In these examples you will see this technique of reusing a FlowFile reference when adding attributes. Note that the current reference to the FlowFile is passed into the putAttribute() method. The resulting FlowFile has an attribute named 'myAttr' with a value of 'myValue'. Also note that the method takes a String for the value; if you have an Object you will have to serialize it to a String. Finally, please note that if you are adding multiple attributes, it is better to create a Map and use putAllAttributes() instead (see next recipe for details).
Examples:
Groovy
flowFile = session.get() if(!flowFile) return flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
Jython
flowFile = session.get() if (flowFile != None): flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue') # implicit return at the end
Javascript
var flowFile = session.get(); if (flowFile != null) { flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue') }
JRuby
flowFile = session.get() if flowFile != nil flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue') end
Add multiple attributes to a FlowFile
Use Case: You have a FlowFile to which you'd like to add custom attributes.
Approach: Use the putAllAttributes(flowFile, attributeMap) method from the session object. This method updates the given FlowFile's attributes with the key/value pairs from the given Map. NOTE: The "uuid" attribute is fixed for a FlowFile and cannot be modified; if the key is named "uuid", it will be ignored.
The technique here is to create a Map (aka dictionary in Jython, hash in JRuby) of the attribute key/value pairs you'd like to update, then call putAllAttributes() on it. This is much more efficient than calling putAttribute() for each key/value pair, as the latter case will cause the framework to create a temporary version of the FlowFile for each attribute added (see above recipe for discussion on FlowFile immutability). The examples show a map of two entries myAttr1 and myAttr2, set to '1' and the language-specific coercion of the number 2 as a String (to adhere to the method signature of requiring String values for both key and value). Note that a session.transfer() is not specified here (so the code snippets below do not work as-is), see the following recipe for that.
Examples:
Groovy
attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)] flowFile = session.get() if(!flowFile) return flowFile = session.putAllAttributes(flowFile, attrMap)
Jython
attrMap = {'myAttr1':'1', 'myAttr2':str(2)} flowFile = session.get() if (flowFile != None): flowFile = session.putAllAttributes(flowFile, attrMap) # implicit return at the end
Javascript
var number2 = 2; var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()} var flowFile = session.get() if (flowFile != null) { flowFile = session.putAllAttributes(flowFile, attrMap) }
JRuby
attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s} flowFile = session.get() if flowFile != nil flowFile = session.putAllAttributes(flowFile, attrMap) end
Get an attribute from a FlowFile
Use Case: You have a FlowFile from which you'd like to inspect an attribute.
Approach: Use the getAttribute(attributeKey) method from the FlowFile object. This method returns the String value for the given attributeKey, or null if the attributeKey is not found. The examples show the retrieval of the value for the "filename" attribute.
Examples:
Groovy
flowFile = session.get() if(!flowFile) return myAttr = flowFile.getAttribute('filename')
Jython
flowFile = session.get() if (flowFile != None): myAttr = flowFile.getAttribute('filename') # implicit return at the end
Javascript
var flowFile = session.get() if (flowFile != null) { var myAttr = flowFile.getAttribute('filename') }
JRuby
flowFile = session.get() if flowFile != nil myAttr = flowFile.getAttribute('filename') end
Get all attributes from a FlowFile
Use Case: You have a FlowFile from which you'd like to retrieve its attributes.
Approach: Use the getAttributes() method from the FlowFile object. This method returns a Map with String keys and String values, representing the key/value pairs of attributes for the FlowFile. The examples show an iteration over the Map of all attributes for a FlowFile.
Examples:
Groovy
flowFile = session.get() if(!flowFile) return flowFile.getAttributes().each { key,value -> // Do something with the key/value pair }
Jython
flowFile = session.get() if (flowFile != None): for key,value in flowFile.getAttributes().iteritems(): # Do something with key and/or value # implicit return at the end
Javascript
var flowFile = session.get() if (flowFile != null) { var attrs = flowFile.getAttributes(); for each (var attrKey in attrs.keySet()) { // Do something with attrKey (the key) and/or attrs[attrKey] (the value) } }
JRuby
flowFile = session.get() if flowFile != nil flowFile.getAttributes().each { |key,value| # Do something with key and/or value } end
Transfer a FlowFile to a relationship
Use Case: After processing a FlowFile (new or incoming), you want to transfer the FlowFile to a relationship ("success" or "failure"). In this simple case let us assume there is a variable called "errorOccurred" that indicates which relationship to which the FlowFile should be transferred. Additional error handling techniques will be discussed in part 2 of this series.
Approach: Use the transfer(flowFile, relationship) method from the session object. From the documentation: this method transfers the given FlowFile to the appropriate destination processor work queue(s) based on the given relationship. If the relationship leads to more than one destination the state of the FlowFile is replicated such that each destination receives an exact copy of the FlowFile though each will have its own unique identity.
NOTE: ExecuteScript will perform a session.commit() at the end of each execution to ensure the operations have been committed. You do not need to (and should not) perform a session.commit() within the script.
Examples:
Groovy
flowFile = session.get() if(!flowFile) return // Processing occurs here if(errorOccurred) { session.transfer(flowFile, REL_FAILURE) } else { session.transfer(flowFile, REL_SUCCESS) }
Jython
flowFile = session.get() if (flowFile != None): # All processing code starts at this indent if errorOccurred: session.transfer(flowFile, REL_FAILURE) else: session.transfer(flowFile, REL_SUCCESS) # implicit return at the end
Javascript
var flowFile = session.get(); if (flowFile != null) { // All processing code goes here if(errorOccurred) { session.transfer(flowFile, REL_FAILURE) } else { session.transfer(flowFile, REL_SUCCESS) } }
JRuby
flowFile = session.get() if flowFile != nil # All processing code goes here if errorOccurred session.transfer(flowFile, REL_FAILURE) else session.transfer(flowFile, REL_SUCCESS) end end
Send a message to the log at a specified logging level
Use Case: You want to report some event that has occurred during processing to the logging framework.
Approach: Use the log variable with the warn(), trace(), debug(), info(), or error() methods. These methods can take a single String, or a String followed by an array of Objects, or a String followed by an array of Objects followed by a Throwable. The first one is used for simple messages. The second is used when you have some dynamic objects/values that you want to log. To refer to these in the message string use "{}" in the message. These are evaluated against the Object array in order of appearance, so if the message reads "Found these things: {} {} {}" and the Object array is ['Hello',1,true], then the logged message will be "Found these things: Hello 1 true". The third form of these logging methods also takes a Throwable parameter, and is useful when an exception is caught and you want to log it.
Examples:
Groovy
log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])
Jython
from java.lang import Object from jarray import array objArray = ['Hello',1,True] javaArray = array(objArray, Object) log.info('Found these things: {} {} {}', javaArray)
Javascript
var ObjectArrayType = Java.type("java.lang.Object[]"); var objArray = new ObjectArrayType(3); objArray[0] = 'Hello'; objArray[1] = 1; objArray[2] = true; log.info('Found these things: {} {} {}', objArray)
JRuby
log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)
Read the contents of an incoming FlowFile using a callback
Use Case: You have incoming connection(s) to ExecuteScript and want to retrieve the contents of a FlowFile from the queue(s) for processing.
Approach: Use the read(flowFile, inputStreamCallback) method from the session object. An InputStreamCallback object is needed to pass into the read() method. Note that because InputStreamCallback is an object, the contents are only visible to that object by default. If you need to use the data outside the read() method, use a more globally-scoped variable. The examples will store the full contents of the incoming FlowFile into a String (using Apache Commons' IOUtils class). NOTE: For large FlowFiles, this is not the best technique; rather you should read in only as much data as you need, and process that as appropriate. For something like SplitText, you could read in a line at a time and process it within the InputStreamCallback, or use the session.read(flowFile) approach mentioned earlier to get an InputStream reference to use outside of a callback.
Examples:
Groovy
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile)return def text = '' // Cast a closure with an inputStream parameter to InputStreamCallback session.read(flowFile, {inputStream -> text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) // Do something with text here } as InputStreamCallback)
Jython
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import InputStreamCallback # Define a subclass of InputStreamCallback for use in session.read() class PyInputStreamCallback(InputStreamCallback): def __init__(self): pass def process(self, inputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) # Do something with text here # end class flowFile = session.get() if(flowFile != None): session.read(flowFile, PyInputStreamCallback()) # implicit return at the end
Javascript
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback") var IOUtils = Java.type("org.apache.commons.io.IOUtils") var StandardCharsets = Java.type("java.nio.charset.StandardCharsets") var flowFile = session.get(); if(flowFile != null) { // Create a new InputStreamCallback, passing in a function to define the interface method session.read(flowFile, new InputStreamCallback(function(inputStream) { var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8); // Do something with text here })); }
JRuby
java_import org.apache.commons.io.IOUtils java_import org.apache.nifi.processor.io.InputStreamCallback # Define a subclass of InputStreamCallback for use in session.read() class JRubyInputStreamCallback include InputStreamCallback def process(inputStream) text = IOUtils.toString(inputStream) # Do something with text here end end jrubyInputStreamCallback = JRubyInputStreamCallback.new flowFile = session.get() if flowFile != nil session.read(flowFile, jrubyInputStreamCallback) end
Write content to an outgoing FlowFile using a callback
Use Case: You want to generate content for an outgoing FlowFile.
Approach: Use the write(flowFile, outputStreamCallback) method from the session object. An OutputStreamCallback object is needed to pass into the write() method. Note that because OutputStreamCallback is an object, the contents are only visible to that object by default. If you need to use the data outside the write() method, use a more globally-scoped variable. The examples will write a sample String to a FlowFile.
Examples:
Groovy
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile) return def text = 'Hello world!' // Cast a closure with an outputStream parameter to OutputStreamCallback flowFile = session.write(flowFile, {outputStream -> outputStream.write(text.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
Jython
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import OutputStreamCallback # Define a subclass of OutputStreamCallback for use in session.write() class PyOutputStreamCallback(OutputStreamCallback): def __init__(self): pass def process(self, outputStream): outputStream.write(bytearray('Hello World!'.encode('utf-8'))) # end class flowFile = session.get() if(flowFile != None): flowFile = session.write(flowFile, PyOutputStreamCallback()) # implicit return at the end
Javascript
var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback"); var IOUtils = Java.type("org.apache.commons.io.IOUtils"); var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); var flowFile = session.get(); if(flowFile != null) { // Create a new OutputStreamCallback, passing in a function to define the interface method flowFile = session.write(flowFile, new OutputStreamCallback(function(outputStream) { outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8)) })); }
JRuby
java_import org.apache.commons.io.IOUtils java_import java.nio.charset.StandardCharsets java_import org.apache.nifi.processor.io.OutputStreamCallback # Define a subclass of OutputStreamCallback for use in session.write() class JRubyOutputStreamCallback include OutputStreamCallback def process(outputStream) outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8)) end end jrubyOutputStreamCallback = JRubyOutputStreamCallback.new flowFile = session.get() if flowFile != nil flowFile = session.write(flowFile, jrubyOutputStreamCallback) end
Overwrite an incoming FlowFile with updated content using a callback
Use Case: You want to reuse the incoming FlowFile but want to modify its content for the outgoing FlowFile.
Approach: Use the write(flowFile, streamCallback) method from the session object. An StreamCallback object is needed to pass into the write() method. StreamCallback provides both an InputStream (from the incoming FlowFile) and an outputStream (for the next version of that FlowFile), so you can use the InputStream to get the current contents of the FlowFile, then modify them and write them back out to the FlowFile. This overwrites the contents of the FlowFile, so for append you'd have to handle that by appending to the read-in contents, or use a different approach (with session.append() rather than session.write() ). Note that because StreamCallback is an object, the contents are only visible to that object by default. If you need to use the data outside the write() method, use a more globally-scoped variable. The examples will reverse the contents of the incoming flowFile (assumed to be a String) and write out the reversed string to a new version of the FlowFile.
Examples:
Groovy
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile) return def text = 'Hello world!' // Cast a closure with an inputStream and outputStream parameter to StreamCallback flowFile = session.write(flowFile, {inputStream, outputStream -> text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8)) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS)
Jython
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback # Define a subclass of StreamCallback for use in session.write() class PyStreamCallback(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8'))) # end class flowFile = session.get() if(flowFile != None): flowFile = session.write(flowFile, PyStreamCallback()) # implicit return at the end
Javascript
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback"); var IOUtils = Java.type("org.apache.commons.io.IOUtils"); var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); var flowFile = session.get(); if(flowFile != null) { // Create a new StreamCallback, passing in a function to define the interface method flowFile = session.write(flowFile, new StreamCallback(function(inputStream, outputStream) { var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8)) })); }
JRuby
java_import org.apache.commons.io.IOUtils java_import java.nio.charset.StandardCharsets java_import org.apache.nifi.processor.io.StreamCallback # Define a subclass of StreamCallback for use in session.write() class JRubyStreamCallback include StreamCallback def process(inputStream, outputStream) text = IOUtils.toString(inputStream) outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8)) end end jrubyStreamCallback = JRubyStreamCallback.new flowFile = session.get() if flowFile != nil flowFile = session.write(flowFile, jrubyStreamCallback) end
Handle errors during script processing
Use Case: An error occurs in the script (either by data validation or a thrown exception), and you want the script to handle it gracefully.
Approach: For exceptions, use the exception-handling mechanism for the scripting language (often they are try/catch block(s)). For data validation, you can use a similar approach, but define a boolean variable like "valid" and an if/else clause rather than a try/catch clause. ExecuteScript defines "success" and "failure" relationships; often your processing will transfer "good" FlowFiles to success and "bad" FlowFiles to failure (logging an error in the latter case).
Examples:
Groovy
flowFile = session.get() if(!flowFile) return try { // Something that might throw an exception here // Last operation is transfer to success (failures handled in the catch block) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Something went wrong', e) session.transfer(flowFile, REL_FAILURE) }
Jython
flowFile = session.get() if(flowFile != None): try: # Something that might throw an exception here # Last operation is transfer to success (failures handled in the catch block) session.transfer(flowFile, REL_SUCCESS) except: log.error('Something went wrong', e) session.transfer(flowFile, REL_FAILURE) # implicit return at the end
Javascript
var flowFile = session.get(); if(flowFile != null) { try { // Something that might throw an exception here // Last operation is transfer to success (failures handled in the catch block) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Something went wrong', e) session.transfer(flowFile, REL_FAILURE) } }
JRuby
flowFile = session.get() if flowFile != nil begin # Something that might raise an exception here # Last operation is transfer to success (failures handled in the rescue block) session.transfer(flowFile, REL_SUCCESS) rescue Exception => e log.error('Something went wrong', e) session.transfer(flowFile, REL_FAILURE) end end