Blog der SHI GmbH

What is NiFi – Part 3

Testing your Needs – The Apache NiFi Test Framework

In this last part of our NiFi blog series, we will have a deeper look into NiFi’s test framework. To demonstrate the key concepts, we will write some JUnit tests for our custom LogCustomMessage processor that we implemented in the second part.

 

First things First – Integration of NiFi’s Test Framework

To make use of the NiFi test framework, you need the following Maven dependency to be able to run any custom processor in a JUnit style fashion:

<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-mock</artifactId>
    <version>[the version you are using]</version>
    <scope>test</scope>
</dependency>

Once you have included the above dependency, you gain access to an object called org.apache.nifi.util.TestRunner which enables you the full power to run any processor inside an ordinary JUnit test.

The TestRunner class in an object that mocks the NiFi runtime environment gaining full access to all important objects like ProcessContext and ProcessSession. If you are unfamiliar with these objects, please have a look at the first part of this series.

 

Writing a custom JUnit test

Let’s start writing our own JUnit test for the LogCustomMessage processor.
To obtain a full working TestRunner object we just need to call NiFi’s TestRunner factory to retrieve a new instance:

private TestRunner runner;
@Before
public void initBefore() {
    this.runner = TestRunners.newTestRunner(new LogCustomMessage());
}

As you can see, we pass our custom processor as an argument into the factory method newTestRunner(Processor processor). With this instruction we connect a custom processor with a TestRunner instance quite easily.

This is mainly done inside a method that is annotated with a @Before annotation, instructing JUnit to create a fresh TestRunner instance for each test method in our test class. This guarantees a clean TestRunner instance before each test method is running, preventing JUnit tests from failing in case test methods influence each other.

Before we start with our test method we have to introduce an “Inner Class” that helps to verify a log message is written in the correct way. For convenience, we don’t want SLF4J writing a custom log message into a log file, instead we create a custom appender, which saves the written log message inside a string object. This way we can easily compare results without the need to read in a log file!

Important note: NiFi is using SLF4J to hide/facade the underlying logging framework, thus giving us the ability to substitute any logging framework in the future. The logging framework used in the actual NiFi version is Logback.

In Listing 1 you can see the “Inner Class” with a custom append method to save log messages in a string object:

class TestAppender<E> extends AppenderBase<E> {
public String logMsg = "";

    @Override
    protected void append(E eventObject) {
        logMsg = eventObject.toString();
    }
}

Listing 1: A custom TestAppender to store a log message in a string object.

Now we have all we need to start with our first test method. Let’s test our custom processor with a debug log level and a custom log message:

@Test
public void testDebugLoggerWithMsg() {
// Let’s configure our processor with debug log level and a log message:
this.runner.setProperty(LogCustomMessage.LOG_LEVEL, "debug");
    this.runner.setProperty(LogCustomMessage.LOG_MSG, "Nifi rocks");
    this.runner.setProperty(LogCustomMessage.LOG_NAME, "test.debug");
       
    // Use a TestAppender to verify the logging:
LogCustomMessageTest<E>.TestAppender<ILoggingEvent> appender = new TestAppender<ILoggingEvent>();
    appender.start();

    // Add the TestAppender to the Logger object we configured in our processor:
    ((Logger)LoggerFactory.getLogger("test.debug")).addAppender(appender);

   
// Provide an empty flowFile for the test case
    this.runner.enqueue(new byte[0]);
// Run the LogCustomMessage processor:
    this.runner.run();
    // Let’s compare the results...

    assertEquals("[DEBUG] Nifi rocks", appender.logMsg);
    // Make sure there is no flowFile inside the queue anymore
    this.runner.assertQueueEmpty();
    // Make sure the flowFile is routed to the success relationship
    this.runner.assertTransferCount(LogCustomMessage.REL_SUCCESS, 1);
}

Listing 2: A JUnit test method using NiFi’s TestRunner object.

TestRunner provides the ability to set all available properties a custom processor has.
In our case we set the log level to “debug”, the log message is “NiFi rocks” and we call our SLF4J logger object test.debug.

Before we can run the custom processor, we need to instantiate our TestAppender first and after that calling appender.start() to instruct SLF4J to recognize our custom TestAppender.

The next step is to bind our custom TestAppender to the Logger object our custom processor is using by calling the method addAppender(Appender<ILoggingEvent> newAppender). By this instruction we make sure our processor will use our custom TestAppender and not any other default appender that will write into a log file. This comes very handy when we want to assert the results after the processor finished its work!

Now let’s continue with the most interesting part – Running the Junit Test!

Before running the custom processor, we must enqueue a flowFile into the NiFi queue we are currently using. In case of our custom processor, we neither consider the content of a flowFile nor its attributes. So, we just pass an empty byte array instructing NiFi’s test framework to create a flowFile with empty content and default attributes.
Of course you can add as many flowFiles with any content as you need in order to test your own custom processor successfully!

After we have prepared the flowFile we can run the processor by just calling TestRunner.run(). This will run our custom processor with all the settings we provided at the beginning of our test method.

After that we are able to assert the result by calling JUnit’s standard method assertEquals(String expected, String actual). If you have configured the test method correctly, our custom TestAppender should have a filled logMsg object with a value of “[DEBUG] NiFi rocks”.

Usually you want to test more things than the results a custom processor is creating inside its onTrigger(…) method. It is usually very interesting to see if a processor behaves correctly by transferring an incoming flowFile to its desired relationship!
You can easily do that by calling TestRunner.assertTransferCount(Relationship rel, int count). By this you can verify how many flowFiles are routed to a certain relationship. In our case we expect that one flowFile is routed to the success relationship!

You can test a lot more things than it is shown above. If you want to see NiFi’s test framework in more detail I recommend to have a look at the JavaDocs of the nifi-mock API.

For the sake of completeness, here is a list of the most important test methods:

Method

Description

assertAllFlowFilesContainAttribute
(Relationship relationship,String attributeName) 

Asserts that all FlowFiles that were transferred to the given relationship contain the given attribute.

assertAllFlowFilesTransferred
(String relationship,int count) 

Asserts that all FlowFiles that were transferred were transferred to the given relationship and that the number of FlowFiles transferred is equal to count.

assertQueueEmpty() 

Asserts that there are no FlowFiles left on the input queue.

assertTransferCount
(Relationship relationship,int count) 

Asserts that the number of FlowFiles transferred to the given relationship is equal to the given count.

assertValid() 

Asserts that the currently configured set of properties/annotation data is valid.

enqueue
(byte[] data,Map<String,String> attributes) 

Copies the content from the given byte array into memory and creates a FlowFile from this content with the given attributes and adds this FlowFile to the processor’s input queue.

getContentAsByteArray(MockFlowFile flowFile) 

Copies the contents of the given MockFlowFile into a byte array and returns that byte array.

setProperty(PropertyDescriptor descriptor, String value) 

Updates the value of the property with the given PropertyDescriptor to the specified value IF and ONLY IF the value is valid according to the descriptor’s validator.

setThreadCount(int threadCount) 

Updates the number of threads that will be used to run the Processor when calling the run() or run(int) methods.

What comes next?

Sadly this is the last part of our three-part series about Apache NiFi.
I hope you got some ideas what NiFi is in more detail and how you can use it in future projects.
We are pleased to support your projects especially when you use NiFi, so feel free to contact us via Email or over the contact form on this web page!

What is NiFi – Part 1

What is NiFi – Part 2

Do you need support for Apache NiFi?
You want to be a Data Hero?

 

Johannes Brucher

Johannes Brucher