博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop_27_MapReduce_运营商原始日志增强(自定义OutputFormat)
阅读量:6329 次
发布时间:2019-06-22

本文共 7593 字,大约阅读时间需要 25 分钟。

1.需求:

  现有一些原始日志需要做增强解析处理,流程:

  1、 从原始日志文件中读取数据(日志文件:)

  2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志

  3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录

2.需求分析:

  程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可

以通过自定义outputformat来实现

3.需求实现: 

技术实现要点:

  1、 在mapreduce中访问外部资源(知识数据库)

  2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()


代码实现:

1.数据库获取数据的工具类:

  1.首先启动数据库服务(192.168.232.201):service mysql start

  2.使用远程客户端连接数据库工具Navicat操作数据库:导入创建url_rule表语句和导入该表数据

  3.创建表语句及其数据下载:

  

package cn.bigdata.hdfs.LogEnhance;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;import java.util.Map;public class DBLoader {    public static void dbLoader(Map
ruleMap) throws Exception { Connection conn = null; Statement st = null; ResultSet res = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://192.168.232.201:3306/mysql", "root", "root"); st = conn.createStatement(); res = st.executeQuery("select url,content from url_rule"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); } }catch(Exception e){ e.printStackTrace(); } } }}
View Code

 2.自定义一个outputformat继承自FileOutputFormat,实现getRecordWriter方法:

package cn.bigdata.hdfs.LogEnhance;import java.io.IOException;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter * 然后再调用RecordWriter的write(k,v)方法将数据写出 */public class LogEnhanceOutputFormat extends FileOutputFormat
{ @Override public RecordWriter
getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { ///拿到一个文件系统操作的客户端实例对象 FileSystem fs = FileSystem.get(context.getConfiguration()); Path enhancePath = new Path("F:/temp/en/log.dat"); Path tocrawlPath = new Path("F:/temp/crw/url.dat"); //流式创建文件 FSDataOutputStream enhancedOs = fs.create(enhancePath); FSDataOutputStream tocrawlOs = fs.create(tocrawlPath); return new EnhanceRecordWriter(enhancedOs, tocrawlOs); } /** * 构造一个自己的recordwriter */ static class EnhanceRecordWriter extends RecordWriter
{ FSDataOutputStream enhancedOs = null; FSDataOutputStream tocrawlOs = null; public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) { super(); this.enhancedOs = enhancedOs; this.tocrawlOs = tocrawlOs; } //实现抽象方法write @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String result = key.toString(); // 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.dat if (result.contains("tocrawl")) { tocrawlOs.write(result.getBytes()); } else { // 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.dat enhancedOs.write(result.getBytes()); } } //实现抽象方法close @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (tocrawlOs != null) { tocrawlOs.close(); } if (enhancedOs != null) { enhancedOs.close(); } } }}

 3.开发mapreduce处理流程:

  这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行

原始日志后面

  maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中

package cn.bigdata.hdfs.LogEnhance;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogEnhance {    static class LogEnhanceMapper extends Mapper
{ Map
ruleMap = new HashMap
(); Text k = new Text(); NullWritable v = NullWritable.get(); // 从数据库中加载规则信息倒ruleMap中 @Override protected void setup(Context context) throws IOException, InterruptedException { try { DBLoader.dbLoader(ruleMap); } catch (Exception e) { e.printStackTrace(); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称 Counter counter = context.getCounter("malformed", "malformedline"); String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { String url = fields[26]; String content_tag = ruleMap.get(url); // 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志 if (content_tag == null) { k.set(url + "\t" + "tocrawl" + "\n"); context.write(k, v); } else { k.set(line + "\t" + content_tag + "\n"); context.write(k, v); } } catch (Exception exception) { counter.increment(1); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogEnhance.class); job.setMapperClass(LogEnhanceMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法 job.setOutputFormatClass(LogEnhanceOutputFormat.class); //本地原始日志文件存放目录 FileInputFormat.setInputPaths(job, new Path("F:/webloginput/")); // 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat // 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出path FileOutputFormat.setOutputPath(job, new Path("F:/weblogoutput/")); // 不需要reducer job.setNumReduceTasks(0); job.waitForCompletion(true); System.exit(0); }}
View Code

 总结:

  1.其中在mapreduce程序中用到了计数器;获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称

  2.拷贝mysql的驱动包到工程的lib目录下,这里使用本地运行模式;

  3.maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter,然后再调用

RecordWriter的write(k,v)方法将数据写出

  4.在setup方法中完成知识库的加载,写入到Map中

  5.Map端在Context,write时,如果后面没有Reduce,将没有整个的shuffe过程,将直接调用outPutFormat进行输出,进来什

么顺序,则出去什么顺序(总之:没有reduce就没有shuffle)

  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

转载于:https://www.cnblogs.com/yaboya/p/9258070.html

你可能感兴趣的文章
LayUI之layer关闭刷新父界面
查看>>
动态代理
查看>>
分布式集群系统下的高可用session解决方案
查看>>
服务器部署raid5图解
查看>>
印度海得拉巴发生连环爆炸
查看>>
警惕:移动应用App背后的安全危机!
查看>>
python 异常
查看>>
django模板高级进阶
查看>>
消息handler message 线程通信 空消息
查看>>
禁止自动横屏下的视频播放强制旋转
查看>>
JavaScript实现链表
查看>>
103. Binary Tree Zigzag Level Order Traversal
查看>>
JavaScript函数式编程,真香之组合(一)
查看>>
使用Envoy 作Sidecar Proxy的微服务模式-3.分布式追踪
查看>>
深入了解以太坊
查看>>
SpringBoot 实战 (二) | 第一个 SpringBoot 工程详解
查看>>
Go goroutine理解
查看>>
IDE 插件新版本发布,开发效率 “biu” 起来了
查看>>
理解环境变量 JAVA_TOOL_OPTIONS
查看>>
看大牛是如何使用和理解线程池
查看>>