博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop 2.5.1学习笔记7: 计数器的使用
阅读量:6833 次
发布时间:2019-06-26

本文共 7297 字,大约阅读时间需要 24 分钟。

hot3.png

因为有的字段无法爬到标签,那么需要评估标签的缺失对整个数据分析的影响。

代码很简单,比上一个例子要简单很多,直接拿来改改就可以了。

----------------------------------------------------------------------------

package com.dew.counter;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.util.Tool;import com.mongodb.BasicDBObject;import com.mongodb.DB;import com.mongodb.DBCollection;import com.mongodb.DBCursor;import com.mongodb.MongoClient;import com.mongodb.ServerAddress;public class PullMongoDB extends Configured implements Tool {	@Override	public int run(String[] args) throws Exception {		if (null == args || args.length < 4) {			return 0;		}		List list = new ArrayList();		String[] array = args[0].split(":");		list.add(new ServerAddress(array[0], Integer.parseInt(array[1])));		MongoClient mongoClient = new MongoClient(list);		DB database = mongoClient.getDB("" + array[2]);		DBCollection collection = database.getCollection("" + array[3]);		//		BasicDBObject query = new BasicDBObject();		query.put("pkg", new BasicDBObject("$exists", true));		query.put("tags", new BasicDBObject("$exists", true));		BasicDBObject fields = new BasicDBObject();		fields.put("pkg", 1);		fields.put("tags", 1);		//write hdfs		Configuration conf = new Configuration();		FileSystem hdfs = FileSystem.get(conf);		FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1]));				//write		DBCursor cursor = collection.find(query, fields);		while (cursor.hasNext()) {			BasicDBObject record = (BasicDBObject) cursor.next();			String pkg = record.getString("pkg");			ArrayList
als = (ArrayList
) record.get("tags"); String tags = ""; for (String s : als) { tags += " " + s.trim(); } tags = tags.trim(); String finalString = pkg + "\t" + tags + System.getProperty("line.separator"); outHandler.write(finalString.getBytes("UTF8")); } //remove handle outHandler.close(); cursor.close(); mongoClient.close(); return 0; }} 
package com.dew.counter;import java.io.BufferedReader;import java.io.FileReader;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.Hashtable;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.mongodb.BasicDBObject;public class TagCounter extends Configured implements Tool {	private static String PREFIX = "";	private static String NULL = "Null";	private static String ZERO = "Zero";	private static String HIT = "Hit";	// map	public static class MapClass extends Mapper
{ private Hashtable
joinData = new Hashtable
(); private void readFile(String file) { BufferedReader joinReader = null; String line = null; try { joinReader = new BufferedReader(new FileReader(file)); while ((line = joinReader.readLine()) != null) { String[] array = line.split("\t"); if (null == array || array.length < 2) continue; String pkg = array[0]; if (null == pkg || pkg.length() <= 0) continue; String tagStr = array[1]; if (null == tagStr) continue; tagStr = tagStr.trim(); if (tagStr.length() <= 0) continue; joinData.put(pkg, tagStr); System.out.println("[map,setup] " + pkg + " | " + tagStr); } } catch (Exception e) { // XXX System.out .println("--------------------------------------------\n" + e.toString()); } finally { if (null != joinReader) try { joinReader.close(); } catch (IOException e) { e.printStackTrace(); } } } protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { try { // Configuration conf = context.getConfiguration(); URI[] cacheFiles = context.getCacheFiles(); if (null != cacheFiles && cacheFiles.length > 0) { readFile(cacheFiles[0].getPath().toString()); } } catch (IOException e) { // xxx } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // key neglected if (null == value) return; String content = value.toString(); if (null == content || content.trim().length() == 0) return; // split String[] strArray = content.split("\t"); if (null == strArray || strArray.length < 29) return; String sender = strArray[12].trim(); String receiver = strArray[14].trim(); String pkg = strArray[28].trim(); if (null == sender || sender.length() == 0 || null == receiver || receiver.length() == 0 || null == pkg || pkg.length() == 0) { return; } String tags = this.joinData.get(pkg); if (null == tags) { Counter c = context.getCounter(PREFIX, NULL); c.increment(1); } else if (tags.trim().length() == 0) { Counter c = context.getCounter(PREFIX, ZERO); c.increment(1); } else { Counter c = context.getCounter(PREFIX, HIT); c.increment(1); } // okay,output it // context.write(new Text(sender), new Text(tags)); // context.write(new Text(receiver), new Text(tags)); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "ComputeProfileHDFSPlusMongoDB"); // add distributed file job.addCacheFile(new Path(args[1]).toUri()); // DistributedCache.addCacheFile(new Path(args[1]).toUri(), // job.getConfiguration()); // prepare FileInputFormat.setInputPaths(job, new Path(args[2])); FileOutputFormat.setOutputPath(job, new Path(args[3])); // FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setJobName("TagCounter"); job.setJarByClass(TagCounter.class); job.setMapperClass(MapClass.class); // job.setCombinerClass(Combiner.class); job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(NullOutputFormat.class); // execute int exitCode = job.waitForCompletion(true) ? 0 : 1; try { FileSystem fs = FileSystem.get(conf); fs.delete(new Path(args[1])); fs.delete(new Path(args[3])); } catch (Exception e) { } return exitCode; } public static String[] args = null; public static void main(String[] a) throws Exception { args = a; int res; res = ToolRunner.run(new Configuration(), new PullMongoDB(), a); res = ToolRunner.run(new Configuration(), new TagCounter(), args); System.exit(res); }}

 

很简单,没啥好说的。

结果截图如下:

 

 

转载于:https://my.oschina.net/qiangzigege/blog/345638

你可能感兴趣的文章
使用maven将jar包生成maven依赖及pom到本地仓库
查看>>
【iOS-Cocos2d游戏开发之十三】CCSprite利用Bezier(贝塞尔)抛物线并同时播放两个Action动作!...
查看>>
帕特•基辛格:EMC要成为最优秀的数据中心架构厂商
查看>>
LINUX下WEBLOGIC卸载
查看>>
YAHOO 工程师CSS初始化代码
查看>>
个人--理财经验001-2015.07.25
查看>>
单例模式Singleton
查看>>
XSS研究1-来自内部的XSS***
查看>>
小白学习大数据之路——在docker集群上搭建spark集群
查看>>
Scala --- 第二章 控制结构和函数
查看>>
ViewPager Fragment 优化卡顿问题
查看>>
Promise
查看>>
CSS字体大小: em与px、pt、百分比之间的对比
查看>>
【更新】3D造型引擎3D ACIS Modeling 发布2018.1.0版本
查看>>
maven私服deploy时报Return code is: 401, ReasonPhrase: Unauthorized
查看>>
一般人不告诉的精通linux系统秘籍
查看>>
JVM内存管理、JVM垃圾回收机制、新生代、老年代以及永久代
查看>>
Solr学习笔记
查看>>
Thread类的使用
查看>>
高并发和分布式中的幂等处理
查看>>