Tuesday, January 8, 2019

Interactively run SQL queries on Apache Spark


Interactively running queries using Spark

Problem Statment: We needed to add a new feature to our (existing) application where in the user would be able to interactively query large amounts of data (around 400 GB per data stored over a 30 day period making it around 12 TB) via a web application. This data is stored on HDFS in the form of Hive tables using Parquet data format.
The user wanted an interactive, sub second to seconds and a seamless querying experience.

We compared two technologies for this use case – Impala and Spark.

For the uninitiated, Impala is a MPP (Massively Parallel Processing) query execution engine written in C++ and provides high-performance, low-latency SQL queries on data stored in various hadoop formats such as HDFS, Hbase etc.
Below diagram explains Impala architecture:



Image Source: https://data-flair.training/

One of the the reasons that make Impala so fast is that its daemons are always in a Ready state thus reducing the start up time. Secondly, Impala does its processing in memory. Even intermediate results are not materialised in disks. In addition, Impala uses better I/O scheduling and also makes use of data locality for its processing. Further, Impala distributes the query among other Impala daemons. The other nodes transmit partial results back to the coordinator, which constructs the final result set for a query.

However, Impala is memory intensive as well. For Impala versions < 2.0 (which we are using), if the memory required to process intermediate results on a node exceeds the amount available to Impala on that node, the query is cancelled.
Also since each of the nodes acts as a coordinator node, the metastore data is cached in each of the nodes. In cases where metadata size can be huge (like in ours), it would result in wastage of precious RAM space.
Impala is well supported on Cloudera distribution but has no support on Hortonworks. Also the latest version of EMR currently does not support Impala either.

Due to above constraints, we wanted to explore alternatives as well. Other MPP solutions available in the market include Presto, Amazon Redshift and Google Big Query. However due to time constraints we decided to jump and directly explore Spark SQL as an alternative.

Spark is reliable, extremely fast and comes with a host of optimisations that can be customized ranging from executors to memory allocation to number of cores and caching.

We found Spark can also be used to query in real time unlike the conventional approach of spark submit. In this approach, spark session is created beforehand and shared among jobs so that the start time is reduced and queries are responded to in an interactive manner.

In this post, I will cover two of the options that we explored to use Spark to run interactive queries:

  • Spark with Apache Livy

Apache Livy is a service that enables interaction with Spark cluster over a REST API. It enables submission of Spark jobs or snippets of Spark code, synchronous or asynchronous result retrieval, as well as Spark Context management.

To submit jobs to Spark interactively, we created two jobs – one to create a Livy session, and another to query spark using livy session id created earlier.

To create a Livy session refer below code snippet:

client = new LivyClientBuilder().setURI(new URI(LIVY_SERVER_URL)).setAll(props).build();
Field f = client.getClass().getDeclaredField("LivySessionId"); // NoSuchFieldException
f.setAccessible(true);
Object sessionId = f.get(client);
URI uri = new URI(<hdfs path where application code has been uploaded>);
client.addJar(uri).get();

Above code creates a livy session id and adds your application jar to its path.

Next step is to create a class (or job) that implements Livy Job interface (org.apache.livy.Job) and returns a serialiable POJO.
In the call method, fetch the spark session and use it to fire your spark sql query.

Code snippet below:

public class MyApp implements Job<QueryResultSet> {

final scala.Option<String> props = scala.Option.apply("<path to properties file>");

@Override
public QueryResultSet call(JobContext jc) throws Exception {

SparkSession spark = jc.sparkSession();
Dataset<Row> ds = this.spark.sql("<SQL query>");
QueryResultSet result = new QueryResultSet();
// Fetching result and mapping it to POJO
result.setSchema(ds.schema());
result.setRows((Row[]) ds.collect());
return result;
}
}

Submit your job by getting an instance of Livy client from session id you created earlier:

LivyClient client = new LivyClientBuilder().setURI(new URI(LIVY_SERVER_URL + "/sessions/" + <session id>)).build();
JobHandle<QueryResultSet> jhDS = client.submit(new MyJob());
QueryResultSet result = jhDS.get();
client.stop(false);

  • Attach Custom Servlet to Spark

Another way to interactively deal with Spark is to leverage Spark’s “official” container support for Jetty.

The first step to do this is to write a custom servlet (extending HttpServlet) and override its doGet or doPost methods to accept your request (say JSON), run it on Spark and return back the response.

public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
PrintWriter out = null;
JSONObject response = new JSONObject();
Long startTime = null;
Dataset<Row> ds = null;
try {
out = resp.getWriter();
BufferedReader reader = req.getReader();
StringBuilder payload = new StringBuilder();
while (reader.ready()) {
payload.append((char) reader.read());
}
JSONObject json = (JSONObject) JSONValue.parse(payload.toString());

// Extracting sql query from JSON
String query = json.get(QUERY).toString();
// Executing SQL query or Scala script
ds = this.spark.sql(query);
List<Object> data = new ArrayList<Object>();
List<Row> outList = ds.collectAsList();
// Iterating result and adding to a Java List
for (Row row : outList) {
Object obj = JavaConversions.seqAsJavaList(row.toSeq());
data.add(obj);
}
response.put(DATA, data);
response.put(COUNT, data.size());
resp.setStatus(HttpStatus.OK_200);
} catch (Exception e) {
resp.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
response.put(ERROR, e.toString());
e.printStackTrace();
} finally {
if (out != null) {
resp.setContentType(APPLICATION_JSON);
out.write(response.toString());
out.flush();
IOUtils.closeQuietly(out);
}
}
}

Next, create a Spark session and using org.spark_project.jetty.servlet.ServletContextHandler attach your custom servlet to spark UI that gets spawned.

SparkSession spark = SparkSession.builder().enableHiveSupport()
.config("hive.metastore.uris", hive_metastore_uri)
.config("spark.sql.warehouse.dir", spark_warehouse_dir).getOrCreate();

// attaching handler
ServletContextHandler handler = new ServletContextHandler();
HttpServlet servlet = new SparkQueryServlet(spark);
ServletHolder sh = new ServletHolder(servlet);
handler.setContextPath("<context path>");
handler.addServlet(sh, "<servlet path>");
spark.sparkContext().ui().get().attachHandler(handler);

try {
// The session created will stay alive unless explicitly closed
spark.streams().awaitAnyTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}

When the above code is triggered it will create an indefinite spark session and attach your servlet to it. Next you can call the servlet (say its path is http://<host>:<port>/handler/custom), pass a JSON containing your sql query. The servlet will extract the query, run it on spark cluster and give you back the response in JSON format in an interactive manner.

Hope you found this article of use.

References:



Monday, August 13, 2018

Nifi Tuning Tip - Using Backpressure and Max Retries

Apache Nifi supports buffering of queuing data. This is very useful when ur downstream processor is not working as expected (say due to a connection timeout) in which case Nifi queues up the flow files for processing.

However this can also turn out to be a negative if downstream system remains out for a longer than handled time. In such a case, the number of flow files can build up causing stress on the Nifi cluster and may even cause failure. Another scenario can be wherein a processor is translating an input message. A message in incorrect format might cause the processor to retry it an indefinite number of times causing queuing up of other messages and unnecessary stress on the cluster.


The best way to avoid this is to use a combination of Backpressure &Threshold, and impose a max limit on the number of retries. Backpressure implies limiting the amount or size of messages that can be queued up. Once the threshold value is reached, the connector doesn’t allow any further flow files or messages to get build up.  



Apache Nifi supports buffering of queuing data. This is very useful when ur downstream processor is not working as expected (say due to a connection timeout) in which case Nifi queues up the flow files for processing.

However this can also turn out to be a negative if downstream system remains out for a longer than handled time. In such a case, the number of flow files can build up causing stress on the Nifi cluster and may even cause failure. Another scenario can be wherein a processor is translating an input message. A message in incorrect format might cause the processor to retry it an indefinite number of times causing queuing up of other messages and unnecessary stress on the cluster.

The best way to avoid this is to use a combination of Backpressure &Threshold, and impose a max limit on the number of retries. Backpressure implies limiting the amount or size of messages that can be queued up. Once the threshold value is reached, the connector doesn’t allow any further flow files or messages to get build up.  

Thursday, March 29, 2018

Clearing hdfs space on CDH cluster

Problem Statement: On our dev CDH cluster we were frequently facing issues related to hdfs space getting filled up due to which HDFS and related services would halt. This would then
interrupt our development work. 

What we used to do: We would resort to dropping temporary tables which would free up some space but that would give us only a week's breathing space at max.

A more permanent solution: To find a more permanent fix I ran "hdfs dfsadmin -report" command on any of the datanodes. This command showed up DFS used and available capacity on all data nodes in the cluster. As expected, the nodes had run out of free space. 

Then i ran command "hdfs dfs -du -h /". This showed that some TBs of space was occupied by /tmp folder. 
On further drilling down i found that multiple folders (including logs folder) were hogging space but the information they had was not needed. 

So a simple hdfs dfs -rm -r /tmp/<folder name> followed by cleaning of hdfs trash folder did the trick. However free space did not immediately reflect on CDH cluster but took a couple of hours
before CDH started reporting hdfs as normal.

While this is too simple a thing to write a post on but sometimes simple things end up taking a lot of time too :)

Wednesday, January 17, 2018

Implementing mocking with JUNITs

JUNITs are an easy and effective way to unit test your code. However when writing a JUNIT you need to ensure that the scope of test case remains within that class
only and does not go beyond it.
What this means is that say you have a class A which call a method of Class B from within it. Then ur unit test case should be limited to methods of Class A and
should not test method of Class B otherwise the test case will become an integration test. This can be achieved using a Mock framework such as Mockito and this is
where JUNITs become tricky as well.

I was trying to write JUNITs using Mockito for a simple case as mentioned above. Class code below:

Class to be tested:

public Class A {
  public String parseEvent(String line) {
// some business logic
ClassB bObj = new ClassB();
String randomNum = bObj.getRandomNum().toString();
return line.append(randomNum);
  }
}

Junit:

public class ATest {

   @Test
   public void testParseEvent() {
       String inputLine= <some data>;
       String expectedOutput =<some data>;
       
       ClassB bObject = mock(ClassB.class);
       when(bOject.getRandomNum()).thenReturn(2);
       
       Class A aObject = new ClassA();
       String result = aObject.parseEvent(inputLine);
       Assert.assertArrayEquals(expectedOutput.getBytes(),result.getBytes());
       
   }

}

This looked alright to me. I was trying to mock class B's method so as to return a predetermined number and then compare expected and actual output.
However despite trying above and many other approaches I could see that B's method was actually getting called instead of getting mocked.

After a lot of googling found out that the issue was not with Junit or Mocking but with the way Class A was written. Since it was instantiating object of Class B, 
the mock object got lost and acutal instance got called. After refactoring Class A, Junit worked fine. Refactored code below:

public ClassA {

   private ClassB obj=null;

   public ClassA(ClassB obj) {
this.obj=obj;
   }

  public String parseEvent(String line) {
// some business logic
String randomNum = obj.getRandomNum().toString();
return line.append(randomNum);
  }
}

After refactoring Class A's code, in my JUNIT i passed the mocked object of Class B when creating object of Class A. The mocking worked fine and Junit returned 
expected result. 
Happy Ending :) 

Thursday, December 29, 2016

Hive - Querying Hive table data via Mapreduce job using HCatalog

Problem: While querying hive db via Pig is a pretty common use case, what we had to attempt was a comparatively less common scenario - to query hive database via Mapreduce. For this we resorted to using HCatalog.

Below solution has been tried using embedded derby DB for hive, and running mapreduce job in standalone mode from console. In addition, below blog link was a lot of help:

In job apart from setting mapper and reducer, we tell it to use HCatInputFormat and pass database, table name as input. 
Using HCatOutputFormat we specify output database and table details.

package callhive;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;

public class OnTimeDriver extends Configured implements Tool{

    public int run( String[] args ) throws Exception{
     System.out.println("Job started.............");
     Configuration conf = new Configuration();
     Job job = new Job(conf, "OnTimeCount");
     job.setJarByClass(OnTimeDriver.class); 
     job.setMapperClass(OnTimeMapper.class);
     job.setReducerClass(OnTimeReducer.class);

     HCatInputFormat.setInput(job, "airline", "ontimeperf");
     //FileOutputFormat.setOutputPath(job, new Path(args[0]));
     job.setInputFormatClass(HCatInputFormat.class);
     job.setMapOutputKeyClass(IntPair.class);
     job.setMapOutputValueClass(IntWritable.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(DefaultHCatRecord.class);
     job.setOutputFormatClass(HCatOutputFormat.class);
     System.out.println("Reached here ..Setting HCATOUTPUT");
     
   
     
     HCatOutputFormat.setOutput(job, OutputJobInfo.create("airline", "flight_count", null));
     HCatSchema s = HCatOutputFormat.getTableSchema(job.getConfiguration());
     HCatOutputFormat.setSchema(job, s);
     
     return (job.waitForCompletion(true)? 0:1);
    }
    
    public static void main(String[] args) throws Exception{
    int exitCode = ToolRunner.run(new OnTimeDriver(), args);
    System.exit(exitCode);
    }
}

Mapper code queries a table called ontimeperf which contains columns month(int), year(int) and dayofmonth(int). It creates a custom key to store month and year with dayofmonth as value.

public class OnTimeMapper extends Mapper<WritableComparable,HCatRecord,IntPair,IntWritable> {
@Override
protected void map(WritableComparable key, HCatRecord value,
Mapper<WritableComparable, HCatRecord, IntPair, IntWritable>.Context context)
throws IOException, InterruptedException {
System.out.println(" Mapper invoked....");
System.out.println(" Mapper ...."+value.toString());
try {
HCatSchema schema = HCatBaseInputFormat.getTableSchema(context.getConfiguration());
Integer year = new Integer(value.getInteger("year", schema));
   Integer month = new Integer(value.getInteger("month", schema));
   Integer DayofMonth = value.getInteger("dayofmonth", schema);
   System.out.println("Data read from input table "+year+"||"+month+"||"+DayofMonth);
   context.write(new IntPair(year, month), new IntWritable(DayofMonth));
   System.out.println("Mapper done with");
} catch (Exception ex) {
ex.printStackTrace();
} 
}
}

In Reducer, I iterated over set of year, month pairs and their count was no of flight occurrences. Saved this data to another hive table (flight_count having columns year-int, month-int and flightcount-int

public class OnTimeReducer extends Reducer<IntPair, IntWritable, NullWritable, HCatRecord> {

public void reduce(IntPair key, Iterable<IntWritable> value, Context context)
throws IOException, InterruptedException {

System.out.println(" Reducer called ....");

try {

int count = 0; // records counter for particular year-month
for (IntWritable s : value) {
count++;
}

// define output record schema
List columns = new ArrayList(3);
columns.add(new HCatFieldSchema("year", HCatFieldSchema.Type.INT, ""));
columns.add(new HCatFieldSchema("month", HCatFieldSchema.Type.INT, ""));
columns.add(new HCatFieldSchema("flightCount", HCatFieldSchema.Type.INT, ""));
HCatSchema schema = new HCatSchema(columns);
HCatRecord record = new DefaultHCatRecord(3);

record.setInteger("year", schema, key.getFirstInt());
record.set("month", schema, key.getSecondInt());
record.set("flightCount", schema, count);
System.out.println("record is " + record.toString());
context.write(NullWritable.get(), record);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

Troubleshooting tips: Faced below keys errors when running the code

  • Caused by: javax.jdo.JDOFatalUserException: There is no available StoreManager of type "rdbms". Make sure that you have put the relevant DataNucleus store plugin in your CLASSPATH and if defining a connection via JNDI or DataSource you also need to provide persistence property "datanucleus.storeManagerType.
Solved by setting below classpath. 
export HADOOP_CLASSPATH=myspace-1.0.1.jar:$HCAT_HOME/share/hcatalog/hive-hcatalog-core-2.1.0.jar:$HIVE_HOME/lib/hive-exec-2.1.0.jar:$HIVE_HOME/lib/datanucleus-core-4.1.6.jar:$HIVE_HOME/lib/datanucleus-api-jdo-4.2.1.jar:$HIVE_HOME/lib/javax.jdo-3.2.0-m3.jar:$HIVE_HOME/lib/datanucleus-rdbms-4.1.7.jar:$HIVE_HOME/lib/derby-10.10.2.0.jar:$HIVE_HOME/lib/antlr-runtime-3.4.jar

  • Required table missing : "VERSION" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.schema.autoCreateTables"
            Solved by adding hive conf folder to classpath and modifying hive-site.xml to have below entry:
<property>
   <name>javax.jdo.PersistenceManagerFactoryClass</name>
   <value>org.datanucleus.api.jdo.JDOPersistenceManagerFactory</value>
   <description>class implementing the jdo persistence</description>
</property>