现在非常喜欢IDEA,之前在mac 上用的eclipse 经常出现无缘无故的错误。所以转为IDEA. 不过新工具需要学习成本,手头上的项目就遇到了很多问题,现列举如下:
背景描述
在hadoop 开发时,经常在mr阶段将清洗后的数据入库到Hbase. 在这个过程中,需要编译、打jar包,然后上传到服务器,执行hadoop jar *.jar 命令。每次清洗后需要手动4步操作。农民阿姨天生喜欢取巧,故这几天一直研究如何简化此过程。
思路描述
1.之前项目自动化打包上传都用ant ,不过是在window系统下eclipse开发的。但是在mac的IDEA中,屡次失败。总是出现如下错误信息
Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:287) at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:240) at java.util.jar.JarVerifier.processEntry(JarVerifier.java:317) at java.util.jar.JarVerifier.update(JarVerifier.java:228) at java.util.jar.JarFile.initializeVerifier(JarFile.java:348) at java.util.jar.JarFile.getInputStream(JarFile.java:415) at org.apache.hadoop.util.RunJar.unJar(RunJar.java:101) at org.apache.hadoop.util.RunJar.unJar(RunJar.java:81) at org.apache.hadoop.util.RunJar.run(RunJar.java:209) at org.apache.hadoop.util.RunJar.main(RunJar.java:136)网上搜了很多资料,解决方案都是失败。原因是 项目中引用的jar包签名有问题。(之前用eclipse导出jar包没有出现过这种情况)
2.用IDEA导出jar包,操作步骤如下
Fiel –>Project Structure
然后出现如下:
选好Manifest File ,Main Class
之后Build->Build Artifacts 可以编译打包了。去项目中 out 文件下可以找到。
但是通过这种打包方式,在运行时报错。因为hbase的lib相关包没有包含进去。这个一种笨的解决方案是在hadoop 中每个节点上的lib包下都拷贝一份hbase的lib包。这种方案有弊端,我没有选择。如何解决这个问题?我用maven 打包,上传jar包,运行,成功。
最终方案:
通过maven可以正常打包,那如何自动化上传并运行呢?我是用maven 打包,用ant 上传并执行的。
maven pom 文件内容:
4.0.0 groupId antdemo 1 org.apache.hbase hbase-client 0.98.15-hadoop2 org.apache.hadoop hadoop-client 2.6.0 org.apache.hadoop hadoop-mapreduce-client-core 2.6.0 org.apache.hbase hbase-common 0.98.15-hadoop2 org.apache.hbase hbase-server 0.98.15-hadoop2 com.jcraft jsch 0.1.51 org.apache.ant ant-jsch 1.9.5 maven-assembly-plugin HBaseImport jar-with-dependencies make-assembly package single
ant 中的 build.xml的内容:
本配置文件供ANT编译项目、自动进行单元测试、打包并部署之用。 默认操作(输入命令:ant)为编译源程序并发布运行。 清理完毕 最关键的是:
这步是上传并执行,好好研究一下这步代码。
MR清洗后入库Hbase 代码:
package demo;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;//import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;/** * Created by zzy on 15/11/17. *//** * mr 中操作hbase */public class HBaseImport { static class BatchImportMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { super.map(key, value, context); String line = value.toString(); String[] splited = line.split("\t"); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); String time =sdf.format(new Date(Long.parseLong(splited[0].trim()))); String rowkey = splited[1]+"_"+time; Text v2s = new Text(); v2s.set(rowkey+"\t" +line); context.write(key,v2s); } } static class BatchImportReducer extends TableReducer { private byte[] family = "cf".getBytes(); @Override protected void reduce(LongWritable key, Iterable v2s, Context context) throws IOException, InterruptedException {// super.reduce(key, values, context); for (Text v2 :v2s){ String [] splited = v2.toString().split("\t"); String rowKey = splited[0]; Put put = new Put(rowKey.getBytes()); put.add(family,"raw".getBytes(),v2.toString().getBytes()); put.add(family,"reportTime".getBytes(),splited[1].getBytes()); put.add(family,"msisdn".getBytes(),splited[2].getBytes()); put.add(family,"apmac".getBytes(),splited[3].getBytes()); put.add(family,"acmac".getBytes(),splited[4].getBytes()); } } } private static final String tableName = "logs"; private static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum","192.168.122.213:2181"); conf.set("hbase.rootdir","hdfs://192.168.122.211:9000/hbase"); conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); Job job = Job.getInstance(conf,HBaseImport.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(HBaseImport.class); job.setMapperClass(BatchImportMapper.class); job.setReducerClass(BatchImportReducer.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, "hdfs://192.168.122.211:9000/user/hbase"); job.waitForCompletion(true);// FileInputFormat.setInputPaths(job,""); }}
本文章如何对阁下有帮助,希望您高举贵手,支持一下。