Interactively
running queries using Spark
Problem
Statment: We needed to add a new feature to our (existing)
application where in the user would be able to interactively query
large amounts of data (around 400 GB per data stored over a 30 day
period making it around 12 TB) via a web application. This data is
stored on HDFS in the form of Hive tables using Parquet data format.
The
user wanted an interactive, sub second to seconds and a seamless
querying experience.
We
compared two technologies for this use case – Impala and Spark.
For
the uninitiated, Impala is a MPP (Massively Parallel Processing)
query execution engine written in C++ and provides high-performance,
low-latency SQL queries on data stored in various hadoop formats such
as HDFS, Hbase etc.
Below
diagram explains Impala architecture:
Image
Source: https://data-flair.training/
One
of the the reasons that make Impala so fast is that its daemons are
always in a Ready state thus reducing the start up time. Secondly,
Impala does its processing in memory. Even intermediate results are
not materialised in disks. In addition, Impala uses better I/O
scheduling and also makes use of data locality for its processing.
Further, Impala distributes the query among other Impala daemons. The
other nodes transmit partial results back to the coordinator, which
constructs the final result set for a query.
However,
Impala is memory intensive as well. For Impala versions < 2.0
(which we are using), if the memory required to process intermediate
results on a node exceeds the amount available to Impala on that
node, the query is cancelled.
Also
since each of the nodes acts as a coordinator node, the metastore
data is cached in each of the nodes. In cases where metadata size can
be huge (like in ours), it would result in wastage of precious RAM
space.
Impala
is well supported on Cloudera distribution but has no support on
Hortonworks. Also the latest version of EMR currently does not
support Impala either.
Due
to above constraints, we wanted to explore alternatives as well. Other
MPP solutions available in the market include Presto, Amazon Redshift
and Google Big Query. However due to time constraints we decided to
jump and directly explore Spark SQL as an alternative.
Spark
is reliable, extremely fast and comes with a host of optimisations
that can be customized ranging from executors to memory allocation to
number of cores and caching.
We
found Spark can also be used to query in real time unlike the
conventional approach of spark submit. In this approach, spark session
is created beforehand and shared among jobs so that the start time is
reduced and queries are responded to in an interactive manner.
In
this post, I will cover two of the options that we explored to use
Spark to run interactive queries:
-
Spark with Apache Livy
Apache
Livy is a service that enables interaction with Spark cluster over a
REST API. It enables submission of Spark jobs or snippets of Spark
code, synchronous or asynchronous result retrieval, as well as Spark
Context management.
To
submit jobs to Spark interactively, we created two jobs – one to
create a Livy session, and another to query spark using livy session
id created earlier.
To
create a Livy session refer below code snippet:
client
= new
LivyClientBuilder().setURI(new
URI(LIVY_SERVER_URL)).setAll(props).build();
Field
f
= client.getClass().getDeclaredField("LivySessionId");
//
NoSuchFieldException
f.setAccessible(true);
Object
sessionId
= f.get(client);
URI
uri = new
URI(<hdfs
path
where
application
code
has been
uploaded>);
client.addJar(uri).get();
Above
code creates a livy session id and adds your application jar to its
path.
Next
step is to create a class (or job) that implements Livy Job interface
(org.apache.livy.Job)
and returns a serialiable POJO.
In
the call method, fetch the spark session and use it to fire your
spark sql query.
Code
snippet below:
public
class
MyApp implements
Job<QueryResultSet>
{
final
scala.Option<String> props
= scala.Option.apply("<path
to properties file>");
@Override
public
QueryResultSet
call(JobContext
jc)
throws
Exception {
SparkSession
spark
= jc.sparkSession();
Dataset<Row>
ds
= this.spark.sql("<SQL
query>");
QueryResultSet
result
= new
QueryResultSet();
//
Fetching result and mapping it to POJO
result.setSchema(ds.schema());
result.setRows((Row[])
ds.collect());
return
result;
}
}
Submit
your job by getting an instance of Livy client from session id you
created earlier:
LivyClient
client = new
LivyClientBuilder().setURI(new
URI(LIVY_SERVER_URL + "/sessions/"
+ <session
id>)).build();
JobHandle<QueryResultSet>
jhDS
= client.submit(new
MyJob());
QueryResultSet
result
= jhDS.get();
client.stop(false);
-
Attach Custom Servlet to Spark
Another
way to interactively deal with Spark is to leverage Spark’s
“official” container support for Jetty.
The
first step to do this is to write a custom servlet (extending
HttpServlet) and override its doGet or doPost methods to accept your
request (say JSON), run it on Spark and return back the response.
public
void
doPost(HttpServletRequest req,
HttpServletResponse resp)
throws
ServletException, IOException {
PrintWriter
out
= null;
JSONObject
response
= new
JSONObject();
Long
startTime
= null;
Dataset<Row>
ds
= null;
try
{
out
= resp.getWriter();
BufferedReader
reader
= req.getReader();
StringBuilder
payload
= new
StringBuilder();
while
(reader.ready())
{
payload.append((char)
reader.read());
}
JSONObject
json
= (JSONObject) JSONValue.parse(payload.toString());
//
Extracting sql
query from JSON
String
query
= json.get(QUERY).toString();
//
Executing SQL query or Scala
script
ds
= this.spark.sql(query);
List<Object>
data
= new
ArrayList<Object>();
List<Row>
outList
= ds.collectAsList();
//
Iterating result and adding to a Java List
for
(Row row
: outList)
{
Object
obj
= JavaConversions.seqAsJavaList(row.toSeq());
data.add(obj);
}
response.put(DATA,
data);
response.put(COUNT,
data.size());
resp.setStatus(HttpStatus.OK_200);
}
catch
(Exception e)
{
resp.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
response.put(ERROR,
e.toString());
e.printStackTrace();
}
finally
{
if
(out
!= null)
{
resp.setContentType(APPLICATION_JSON);
out.write(response.toString());
out.flush();
IOUtils.closeQuietly(out);
}
}
}
Next,
create a Spark session and using
org.spark_project.jetty.servlet.ServletContextHandler
attach your custom servlet to spark UI that gets spawned.
SparkSession
spark
= SparkSession.builder().enableHiveSupport()
.config("hive.metastore.uris",
hive_metastore_uri)
.config("spark.sql.warehouse.dir",
spark_warehouse_dir).getOrCreate();
//
attaching handler
ServletContextHandler
handler
= new
ServletContextHandler();
HttpServlet
servlet
= new
SparkQueryServlet(spark);
ServletHolder
sh
= new
ServletHolder(servlet);
handler.setContextPath("<context
path>");
handler.addServlet(sh,
"<servlet
path>");
spark.sparkContext().ui().get().attachHandler(handler);
try
{
//
The session created will stay alive unless explicitly closed
spark.streams().awaitAnyTermination();
}
catch
(StreamingQueryException e)
{
e.printStackTrace();
}
When
the above code is triggered it will create an indefinite spark
session and attach your servlet to it. Next you can call the servlet
(say its path is http://<host>:<port>/handler/custom),
pass a JSON containing your sql query. The servlet will extract the
query, run it on spark cluster and give you back the response in JSON
format in an interactive manner.
Hope
you found this article of use.
References:
