Monday, August 13, 2018

Nifi Tuning Tip - Using Backpressure and Max Retries

Apache Nifi supports buffering of queuing data. This is very useful when ur downstream processor is not working as expected (say due to a connection timeout) in which case Nifi queues up the flow files for processing.

However this can also turn out to be a negative if downstream system remains out for a longer than handled time. In such a case, the number of flow files can build up causing stress on the Nifi cluster and may even cause failure. Another scenario can be wherein a processor is translating an input message. A message in incorrect format might cause the processor to retry it an indefinite number of times causing queuing up of other messages and unnecessary stress on the cluster.


The best way to avoid this is to use a combination of Backpressure &Threshold, and impose a max limit on the number of retries. Backpressure implies limiting the amount or size of messages that can be queued up. Once the threshold value is reached, the connector doesn’t allow any further flow files or messages to get build up.  



Apache Nifi supports buffering of queuing data. This is very useful when ur downstream processor is not working as expected (say due to a connection timeout) in which case Nifi queues up the flow files for processing.

However this can also turn out to be a negative if downstream system remains out for a longer than handled time. In such a case, the number of flow files can build up causing stress on the Nifi cluster and may even cause failure. Another scenario can be wherein a processor is translating an input message. A message in incorrect format might cause the processor to retry it an indefinite number of times causing queuing up of other messages and unnecessary stress on the cluster.

The best way to avoid this is to use a combination of Backpressure &Threshold, and impose a max limit on the number of retries. Backpressure implies limiting the amount or size of messages that can be queued up. Once the threshold value is reached, the connector doesn’t allow any further flow files or messages to get build up.  

Thursday, March 29, 2018

Clearing hdfs space on CDH cluster

Problem Statement: On our dev CDH cluster we were frequently facing issues related to hdfs space getting filled up due to which HDFS and related services would halt. This would then
interrupt our development work. 

What we used to do: We would resort to dropping temporary tables which would free up some space but that would give us only a week's breathing space at max.

A more permanent solution: To find a more permanent fix I ran "hdfs dfsadmin -report" command on any of the datanodes. This command showed up DFS used and available capacity on all data nodes in the cluster. As expected, the nodes had run out of free space. 

Then i ran command "hdfs dfs -du -h /". This showed that some TBs of space was occupied by /tmp folder. 
On further drilling down i found that multiple folders (including logs folder) were hogging space but the information they had was not needed. 

So a simple hdfs dfs -rm -r /tmp/<folder name> followed by cleaning of hdfs trash folder did the trick. However free space did not immediately reflect on CDH cluster but took a couple of hours
before CDH started reporting hdfs as normal.

While this is too simple a thing to write a post on but sometimes simple things end up taking a lot of time too :)

Wednesday, January 17, 2018

Implementing mocking with JUNITs

JUNITs are an easy and effective way to unit test your code. However when writing a JUNIT you need to ensure that the scope of test case remains within that class
only and does not go beyond it.
What this means is that say you have a class A which call a method of Class B from within it. Then ur unit test case should be limited to methods of Class A and
should not test method of Class B otherwise the test case will become an integration test. This can be achieved using a Mock framework such as Mockito and this is
where JUNITs become tricky as well.

I was trying to write JUNITs using Mockito for a simple case as mentioned above. Class code below:

Class to be tested:

public Class A {
  public String parseEvent(String line) {
// some business logic
ClassB bObj = new ClassB();
String randomNum = bObj.getRandomNum().toString();
return line.append(randomNum);
  }
}

Junit:

public class ATest {

   @Test
   public void testParseEvent() {
       String inputLine= <some data>;
       String expectedOutput =<some data>;
       
       ClassB bObject = mock(ClassB.class);
       when(bOject.getRandomNum()).thenReturn(2);
       
       Class A aObject = new ClassA();
       String result = aObject.parseEvent(inputLine);
       Assert.assertArrayEquals(expectedOutput.getBytes(),result.getBytes());
       
   }

}

This looked alright to me. I was trying to mock class B's method so as to return a predetermined number and then compare expected and actual output.
However despite trying above and many other approaches I could see that B's method was actually getting called instead of getting mocked.

After a lot of googling found out that the issue was not with Junit or Mocking but with the way Class A was written. Since it was instantiating object of Class B, 
the mock object got lost and acutal instance got called. After refactoring Class A, Junit worked fine. Refactored code below:

public ClassA {

   private ClassB obj=null;

   public ClassA(ClassB obj) {
this.obj=obj;
   }

  public String parseEvent(String line) {
// some business logic
String randomNum = obj.getRandomNum().toString();
return line.append(randomNum);
  }
}

After refactoring Class A's code, in my JUNIT i passed the mocked object of Class B when creating object of Class A. The mocking worked fine and Junit returned 
expected result. 
Happy Ending :)