We’ve decided to start a new series for getting familiar with the different Apache Nifi services and processors, and I’m calling it “Getting Familiar”. We’ll try to post fairly often about different processors, using the controller services and configuring certain things in Nifi.The first one in the series will be about the ExecuteScript processor.
Getting Famliar: ExecuteScript Processor
When the Nifi team came out with the ExecuteScript processor, I knew it was a big win. It allows for so much more flexability in the flow since you can create a custom processor on the fly, without having to write a full fledged custom java processor. Finding out that the your data needs to be massaged a little more and being able to do it via your prefered scripting language is pretty nice.
Inorder to take full advantage of the processor framework Nifi has built, there are a few variables they have exposed.
session: exposes the current ProcessSession, allowing you to take action on a flow file: get the current file, create a new one, put an attribute transfer the flow file to a relationship.
context: exposes the current ProcessContext, allowing access to the proecssor properties and relationships. Also provides access to the StateManager which can be used for simple key-value pairs.
log: exposes the current ProcessorLog, allowing you to log things to the normal nifi log
- REL_SUCCESS: the success relationship for the processor
- REL_FAILURE: the failure relationship for the processor
For adding properties, which nifi calls DynamicProperties, be aware that you have to keep to the scripting languages naming properties for variables - for python you have to start the name with an underscore or a letter, Groovy you can’t use periods in the name, etc.
For our first use case I’ll show a very basic example, expanding on our simple XML example from Getting Started with Apache Nifi.
We’ll replace the EvaluateXPath processor with an ExecuteScript processor with our python code to get the elements from the XML we want. So we want to
- get the pubDate - xpath - channel/item/pubDate
- get the link - xpath - channel/item/title
- get the title - xpath -channel/item/link
So we first create a class that is going to use the InputStreamCallback to read the flowfile. The main function in it is the process function which takes that inputStream and then works on it, reading it in with IOUtils.
We make use of this class, Callback, by passing it to the session.read function. This funciton is very similar to the one in Java - read expects a flowFile as the first parameter and an InputStream as the second. Once we are done going through the file, we get the variable we set, loop through it and set those attributes on the flowfile. We could use putAllAttributes there and pass a json string, but that would require more formatting in this case.
This python code can be copied and pasted directly into the ExecuteScript Script Body property and after pointing the Module Director to your python dir, eg /usr/lib/python, it should work.
The final flow is basically the same as the Getting Started flow, just with the ExecuteScript replacing the EvaluateXPath. After running once, if you have the PutFile stopped, you can inspect the flowFile and veryify it has the attributes as expected!
And the final flow:
Summary and Resources
This example showed the basics of using the nifi ExecuteScript Processor with python, how to access the flowFile, dealing with the session and logging.
If you would like more examples let us know! We are here to help! There are some other great resources out there too, BatchIQ github example for ExecuteScript, and funnifi’s ExecuteScript examples. BatchIQ’s github examples are pretty helpful for using all the available components Nifi exposes to you, so take a look there for more information, or ask in the comments below.