I came across a question on the nifi dev mailing list and thought it would make a good example solving a real world problem, building off of our previous ExecuteScript post. As a side note, since Elasticsearch uses json for their documents and the PutElasticsearch processors expect the flow file to be json, you could use the EvaluateJsonPath Processor to put the field you want as an attribute.

The user’s question was

Hi,
We have dynamic indexes in elastic search like signal-YYYY.MM.DD => (which
resolved as signal-2017.03.31)
I was try hard to find the way to specify the dynamic index in nifi
processor.
I already have a field in my data which i want to use as dynamic index. for
example "index_name". I just wanted to use that data field in the index
parameter of nifi processor.
It might be simple, but i tried hard with different options like
'${index_name}' , "${index_name}" , '${INDEX_NAME}' ,  $INDEX_NAME ,
$index_name

Can anyone just refer me how to use data field in the nifi's elasticsearch
processor property of index name?

As I understand the users question, he has a key-value pair in his file, index_name, that he wants to extract and use as the index for the PutElasticSearch Processor. I responded with how I thought best to reach this goal, which was using the ExecuteScript processor to extract the field and set it as an attribute.

So our goal can be stated as…

Given a flowfile with a key-value pair, find that key and set the value as an attribute to be used later in the flow.

I’m going to make a few assumptions here since we don’t have the exact use case, mainly that the file is json. Since we are working with Elasticsearch, the flowfile here is pretty simple- a single json object.

Creating the flow

Let’s first start with creating our flow. We are going to pick up a local json file, grab a field from it and then set it as an attribute. We’ll then insert into Elasticsearch using the PutElasticSearch processor. Pretty simple flow.

This is the configured flow. I’ve copied the groovy script below into the ExecuteScript Processor’s ‘Script Body’ property. We are going to auto-terminate failures since this is just a quick example. I have the basic elasticsearch docker image running localy that I’m connecting to and am going to read in from a local directory, /tmp/nifi.rocks/example.json.

The file contents are:

{
    "data": "some-data",
    "index_name": "test-index.2017.04.01",
    "an_int": 1,
    "more_data": "yet more data"
}

The ExecuteScript Processor - Groovy

This processor ends up being pretty simple.

  1. Read in the flow file
  2. Find the field
  3. Set the flowfile attribute
  4. Transfer to success

Groovy code for the ExecuteScript:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonSlurper

def flowFile = session.get()
if(!flowFile) return
def jsonSlurper = new JsonSlurper()
def indexName = ''

session.read(flowFile, { inputStream ->
    def row = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
    indexName = row.index_name
} as InputStreamCallback)

flowFile = session.putAttribute(flowFile, 'index_name', indexName)
session.transfer(flowFile,REL_SUCCESS)

Now if we run the edited flow, with the PutElasticsearch index set to ${index_name}, we should be able to query the _cat endpoint of our local Elasitcsearch and see the new index! Again, this curl is specific to the default Elasticsearch Docker image running in development mode. The default password is ‘changeme’.

curl -u elastic http://127.0.0.1:9200/_cat/indices

And we get an output of:

yellow open .monitoring-es-2-2017.04.02 XK092VdTQJCjl9Gjrd396w 1 1 4072 98 1.7mb 1.7mb
yellow open .monitoring-data-2          NGf5b0nyR1WV13cjs0oNZA 1 1    2  0 4.1kb 4.1kb
yellow open test-index.2017.04.01       wnVIqCZhTPOK1F-5A1YVUg 5 1    1  0 5.5kb 5.5kb

Success!! We have inserted our index! If you see any interesting problems on the mailer list or have your own issues you’d like tackled, let us know via email or in the comments below!