Using script to integrate custom code
Learn how to use scripting to integrate custom code into MiNiFi C++ agent. You can do it either by using the ExecuteScript processor or custom Python processors.
- ExecuteScript processor
The ExecuteScript processor runs an external stateless script on each processor run, allowing simpler integration.
- Python processors
Python processors are loaded from external files, and they keep running a function, while retaining the interpreter state, which makes it ideal for tasks that need to keep state between runs, or need some initialization logic to be run only once.
To use scripting, you need to install the required Python version on all target systems.
nifi-minifi-cpp-...-extra-extensions-centos-bXX.tar.gz. In this file, you can find the
minifi-script-extensions.sofile. Copy the
minifi-script-extensions.sofile to the
extensions/directory, so that the MiNiFi C++ agent can load it on startup. On Linux, there is an additional workaround required to make scripting work. You need to patch the MiNiFi binary to link to Python:
patchelf --add-needed libpython3.6m.so MINIFI_HOME/bin/minifi
For Windows, the requirements are Python 3.10 and the 64 bit version of the agent. The Python extension is already part of the normal MiNiFi C++ MSI installer, but not enabled by default. You need to enable it during installation if you want to use it.
Using the ExecuteScript processor
When using the ExecuteScript processor, you need to place a Python script on
the agents' filesystems (see the asset push feature of EFM), and point the ExecuteScript
processor to use that script. On each execution, the Python script is evaluated, and its
onTrigger function is called to receive any incoming flow files, and
produce the output.
#!/usr/bin/env python import codecs import time class ReadCallback: def process(self, input_stream): self.content = codecs.getreader('utf-8')(input_stream).read() return len(self.content) class WriteReverseStringCallback: def __init__(self, content): self.content = content def process(self, output_stream): reversed_content = self.content[::-1] output_stream.write(reversed_content.encode('utf-8')) return len(reversed_content) def onTrigger(context, session): flow_file = session.get() if flow_file is not None: read_callback = ReadCallback() session.read(flow_file, read_callback) session.write(flow_file, WriteReverseStringCallback(read_callback.content)) flow_file.addAttribute('python_timestamp', str(int(time.time()))) session.transfer(flow_file, REL_SUCCESS)
Using a Python processor
The workflow of a Python processor is as follows: at startup, the MiNiFi C++ agent
reads the Python script directory specified in the
minifi.properties file as
the value of the
nifi.python.processor.dir property (by default, this is
MINIFI_HOME/minifi-python). The agent scans the directory for compatible scripts, and
automatically registers them for use.
These Python files are evaluated at startup, their
function is invoked before starting a flow, and their
onTrigger function is
invoked regularly as the processor is scheduled.
#!/usr/bin/env python def describe(processor): processor.setDescription("Adds an attribute to your flow files") def onInitialize(processor): processor.setSupportsDynamicProperties() def onTrigger(context, session): flow_file = session.get() if flow_file is not None: flow_file.addAttribute("Python attribute", "attributevalue") session.transfer(flow_file, REL_SUCCESS)
To add or update processors, place them in the Python script directory, and restart the agent.