Problem: While querying hive db via Pig is a pretty common use case, what we had to attempt was a comparatively less common scenario - to query hive database via Mapreduce. For this we resorted to using HCatalog.
Below solution has been tried using embedded derby DB for hive, and running mapreduce job in standalone mode from console. In addition, below blog link was a lot of help:
In job apart from setting mapper and reducer, we tell it to use HCatInputFormat and pass database, table name as input.
Using HCatOutputFormat we specify output database and table details.
package callhive;
import org.apache.hadoop.conf. Configuration;
import org.apache.hadoop.conf. Configured;
import org.apache.hadoop.io. IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce. Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util. ToolRunner;
import org.apache.hive.hcatalog.data. DefaultHCatRecord;
import org.apache.hive.hcatalog.data. schema.HCatSchema;
import org.apache.hive.hcatalog. mapreduce.HCatInputFormat;
import org.apache.hive.hcatalog. mapreduce.HCatOutputFormat;
import org.apache.hive.hcatalog. mapreduce.OutputJobInfo;
public class OnTimeDriver extends Configured implements Tool{
public int run( String[] args ) throws Exception{
System.out.println("Job started.............");
Configuration conf = new Configuration();
Job job = new Job(conf, "OnTimeCount");
job.setJarByClass( OnTimeDriver.class);
job.setMapperClass( OnTimeMapper.class);
job.setReducerClass( OnTimeReducer.class);
HCatInputFormat.setInput(job, "airline", "ontimeperf");
//FileOutputFormat. setOutputPath(job, new Path(args[0]));
job.setInputFormatClass( HCatInputFormat.class);
job.setMapOutputKeyClass( IntPair.class);
job.setMapOutputValueClass( IntWritable.class);
job.setOutputKeyClass(Text. class);
job.setOutputValueClass( DefaultHCatRecord.class);
job.setOutputFormatClass( HCatOutputFormat.class);
System.out.println("Reached here ..Setting HCATOUTPUT");
HCatOutputFormat.setOutput( job, OutputJobInfo.create("airline" , "flight_count", null));
HCatSchema s = HCatOutputFormat. getTableSchema(job. getConfiguration());
HCatOutputFormat.setSchema( job, s);
return (job.waitForCompletion(true)? 0:1);
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new OnTimeDriver(), args);
System.exit(exitCode);
}
}
Mapper code queries a table called ontimeperf which contains columns month(int), year(int) and dayofmonth(int). It creates a custom key to store month and year with dayofmonth as value.
public class OnTimeMapper extends Mapper<WritableComparable, HCatRecord,IntPair, IntWritable> {
@Override
protected void map(WritableComparable key, HCatRecord value,
Mapper<WritableComparable, HCatRecord, IntPair, IntWritable>.Context context)
throws IOException, InterruptedException {
System.out.println(" Mapper invoked....");
System.out.println(" Mapper ...."+value.toString());
try {
HCatSchema schema = HCatBaseInputFormat. getTableSchema(context. getConfiguration());
Integer year = new Integer(value.getInteger(" year", schema));
Integer month = new Integer(value.getInteger(" month", schema));
Integer DayofMonth = value.getInteger("dayofmonth", schema);
System.out.println("Data read from input table "+year+"||"+month+"||"+ DayofMonth);
context.write(new IntPair(year, month), new IntWritable(DayofMonth));
System.out.println("Mapper done with");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
In Reducer, I iterated over set of year, month pairs and their count was no of flight occurrences. Saved this data to another hive table (flight_count having columns year-int, month-int and flightcount-int
public class OnTimeReducer extends Reducer<IntPair, IntWritable, NullWritable, HCatRecord> {
public void reduce(IntPair key, Iterable<IntWritable> value, Context context)
throws IOException, InterruptedException {
System.out.println(" Reducer called ....");
try {
int count = 0; // records counter for particular year-month
for (IntWritable s : value) {
count++;
}
// define output record schema
List columns = new ArrayList(3);
columns.add(new HCatFieldSchema("year", HCatFieldSchema.Type.INT, ""));
columns.add(new HCatFieldSchema("month", HCatFieldSchema.Type.INT, ""));
columns.add(new HCatFieldSchema("flightCount", HCatFieldSchema.Type.INT, ""));
HCatSchema schema = new HCatSchema(columns);
HCatRecord record = new DefaultHCatRecord(3);
record.setInteger("year", schema, key.getFirstInt());
record.set("month", schema, key.getSecondInt());
record.set("flightCount", schema, count);
System.out.println("record is " + record.toString());
context.write(NullWritable. get(), record);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
Troubleshooting tips: Faced below keys errors when running the code
- Caused by: javax.jdo.
JDOFatalUserException: There is no available StoreManager of type "rdbms". Make sure that you have put the relevant DataNucleus store plugin in your CLASSPATH and if defining a connection via JNDI or DataSource you also need to provide persistence property "datanucleus.storeManagerType.
export HADOOP_CLASSPATH=myspace-1.0.
- Required table missing : "VERSION" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.schema.
autoCreateTables"
Solved by adding hive conf folder to classpath and modifying hive-site.xml to have below entry:
<property>
<name>javax.jdo. PersistenceManagerFactoryClass </name>
<value>org.datanucleus.api. jdo. JDOPersistenceManagerFactory</ value>
<description>class implementing the jdo persistence</description>
</property>
No comments:
Post a Comment