Tuesday, January 8, 2019

Interactively run SQL queries on Apache Spark


Interactively running queries using Spark

Problem Statment: We needed to add a new feature to our (existing) application where in the user would be able to interactively query large amounts of data (around 400 GB per data stored over a 30 day period making it around 12 TB) via a web application. This data is stored on HDFS in the form of Hive tables using Parquet data format.
The user wanted an interactive, sub second to seconds and a seamless querying experience.

We compared two technologies for this use case – Impala and Spark.

For the uninitiated, Impala is a MPP (Massively Parallel Processing) query execution engine written in C++ and provides high-performance, low-latency SQL queries on data stored in various hadoop formats such as HDFS, Hbase etc.
Below diagram explains Impala architecture:



Image Source: https://data-flair.training/

One of the the reasons that make Impala so fast is that its daemons are always in a Ready state thus reducing the start up time. Secondly, Impala does its processing in memory. Even intermediate results are not materialised in disks. In addition, Impala uses better I/O scheduling and also makes use of data locality for its processing. Further, Impala distributes the query among other Impala daemons. The other nodes transmit partial results back to the coordinator, which constructs the final result set for a query.

However, Impala is memory intensive as well. For Impala versions < 2.0 (which we are using), if the memory required to process intermediate results on a node exceeds the amount available to Impala on that node, the query is cancelled.
Also since each of the nodes acts as a coordinator node, the metastore data is cached in each of the nodes. In cases where metadata size can be huge (like in ours), it would result in wastage of precious RAM space.
Impala is well supported on Cloudera distribution but has no support on Hortonworks. Also the latest version of EMR currently does not support Impala either.

Due to above constraints, we wanted to explore alternatives as well. Other MPP solutions available in the market include Presto, Amazon Redshift and Google Big Query. However due to time constraints we decided to jump and directly explore Spark SQL as an alternative.

Spark is reliable, extremely fast and comes with a host of optimisations that can be customized ranging from executors to memory allocation to number of cores and caching.

We found Spark can also be used to query in real time unlike the conventional approach of spark submit. In this approach, spark session is created beforehand and shared among jobs so that the start time is reduced and queries are responded to in an interactive manner.

In this post, I will cover two of the options that we explored to use Spark to run interactive queries:

  • Spark with Apache Livy

Apache Livy is a service that enables interaction with Spark cluster over a REST API. It enables submission of Spark jobs or snippets of Spark code, synchronous or asynchronous result retrieval, as well as Spark Context management.

To submit jobs to Spark interactively, we created two jobs – one to create a Livy session, and another to query spark using livy session id created earlier.

To create a Livy session refer below code snippet:

client = new LivyClientBuilder().setURI(new URI(LIVY_SERVER_URL)).setAll(props).build();
Field f = client.getClass().getDeclaredField("LivySessionId"); // NoSuchFieldException
f.setAccessible(true);
Object sessionId = f.get(client);
URI uri = new URI(<hdfs path where application code has been uploaded>);
client.addJar(uri).get();

Above code creates a livy session id and adds your application jar to its path.

Next step is to create a class (or job) that implements Livy Job interface (org.apache.livy.Job) and returns a serialiable POJO.
In the call method, fetch the spark session and use it to fire your spark sql query.

Code snippet below:

public class MyApp implements Job<QueryResultSet> {

final scala.Option<String> props = scala.Option.apply("<path to properties file>");

@Override
public QueryResultSet call(JobContext jc) throws Exception {

SparkSession spark = jc.sparkSession();
Dataset<Row> ds = this.spark.sql("<SQL query>");
QueryResultSet result = new QueryResultSet();
// Fetching result and mapping it to POJO
result.setSchema(ds.schema());
result.setRows((Row[]) ds.collect());
return result;
}
}

Submit your job by getting an instance of Livy client from session id you created earlier:

LivyClient client = new LivyClientBuilder().setURI(new URI(LIVY_SERVER_URL + "/sessions/" + <session id>)).build();
JobHandle<QueryResultSet> jhDS = client.submit(new MyJob());
QueryResultSet result = jhDS.get();
client.stop(false);

  • Attach Custom Servlet to Spark

Another way to interactively deal with Spark is to leverage Spark’s “official” container support for Jetty.

The first step to do this is to write a custom servlet (extending HttpServlet) and override its doGet or doPost methods to accept your request (say JSON), run it on Spark and return back the response.

public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
PrintWriter out = null;
JSONObject response = new JSONObject();
Long startTime = null;
Dataset<Row> ds = null;
try {
out = resp.getWriter();
BufferedReader reader = req.getReader();
StringBuilder payload = new StringBuilder();
while (reader.ready()) {
payload.append((char) reader.read());
}
JSONObject json = (JSONObject) JSONValue.parse(payload.toString());

// Extracting sql query from JSON
String query = json.get(QUERY).toString();
// Executing SQL query or Scala script
ds = this.spark.sql(query);
List<Object> data = new ArrayList<Object>();
List<Row> outList = ds.collectAsList();
// Iterating result and adding to a Java List
for (Row row : outList) {
Object obj = JavaConversions.seqAsJavaList(row.toSeq());
data.add(obj);
}
response.put(DATA, data);
response.put(COUNT, data.size());
resp.setStatus(HttpStatus.OK_200);
} catch (Exception e) {
resp.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
response.put(ERROR, e.toString());
e.printStackTrace();
} finally {
if (out != null) {
resp.setContentType(APPLICATION_JSON);
out.write(response.toString());
out.flush();
IOUtils.closeQuietly(out);
}
}
}

Next, create a Spark session and using org.spark_project.jetty.servlet.ServletContextHandler attach your custom servlet to spark UI that gets spawned.

SparkSession spark = SparkSession.builder().enableHiveSupport()
.config("hive.metastore.uris", hive_metastore_uri)
.config("spark.sql.warehouse.dir", spark_warehouse_dir).getOrCreate();

// attaching handler
ServletContextHandler handler = new ServletContextHandler();
HttpServlet servlet = new SparkQueryServlet(spark);
ServletHolder sh = new ServletHolder(servlet);
handler.setContextPath("<context path>");
handler.addServlet(sh, "<servlet path>");
spark.sparkContext().ui().get().attachHandler(handler);

try {
// The session created will stay alive unless explicitly closed
spark.streams().awaitAnyTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}

When the above code is triggered it will create an indefinite spark session and attach your servlet to it. Next you can call the servlet (say its path is http://<host>:<port>/handler/custom), pass a JSON containing your sql query. The servlet will extract the query, run it on spark cluster and give you back the response in JSON format in an interactive manner.

Hope you found this article of use.

References:



1 comment:

  1. The main motive of the AWS big data consultant is to spread the knowledge so that they can give more big data engineers to the world.

    ReplyDelete