1.需求:
现有一些原始日志需要做增强解析处理,流程:
1、 从原始日志文件中读取数据(日志文件:)
2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志
3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录
2.需求分析:
程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可
以通过自定义outputformat来实现
3.需求实现:
技术实现要点:
1、 在mapreduce中访问外部资源(知识数据库)
2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
代码实现:
1.数据库获取数据的工具类:
1.首先启动数据库服务(192.168.232.201):service mysql start
2.使用远程客户端连接数据库工具Navicat操作数据库:导入创建url_rule表语句和导入该表数据
3.创建表语句及其数据下载:
package cn.bigdata.hdfs.LogEnhance;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;import java.util.Map;public class DBLoader { public static void dbLoader(MapruleMap) throws Exception { Connection conn = null; Statement st = null; ResultSet res = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://192.168.232.201:3306/mysql", "root", "root"); st = conn.createStatement(); res = st.executeQuery("select url,content from url_rule"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); } }catch(Exception e){ e.printStackTrace(); } } }}
2.自定义一个outputformat继承自FileOutputFormat,实现getRecordWriter方法:
package cn.bigdata.hdfs.LogEnhance;import java.io.IOException;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter * 然后再调用RecordWriter的write(k,v)方法将数据写出 */public class LogEnhanceOutputFormat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { ///拿到一个文件系统操作的客户端实例对象 FileSystem fs = FileSystem.get(context.getConfiguration()); Path enhancePath = new Path("F:/temp/en/log.dat"); Path tocrawlPath = new Path("F:/temp/crw/url.dat"); //流式创建文件 FSDataOutputStream enhancedOs = fs.create(enhancePath); FSDataOutputStream tocrawlOs = fs.create(tocrawlPath); return new EnhanceRecordWriter(enhancedOs, tocrawlOs); } /** * 构造一个自己的recordwriter */ static class EnhanceRecordWriter extends RecordWriter { FSDataOutputStream enhancedOs = null; FSDataOutputStream tocrawlOs = null; public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) { super(); this.enhancedOs = enhancedOs; this.tocrawlOs = tocrawlOs; } //实现抽象方法write @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String result = key.toString(); // 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.dat if (result.contains("tocrawl")) { tocrawlOs.write(result.getBytes()); } else { // 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.dat enhancedOs.write(result.getBytes()); } } //实现抽象方法close @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (tocrawlOs != null) { tocrawlOs.close(); } if (enhancedOs != null) { enhancedOs.close(); } } }}
3.开发mapreduce处理流程:
这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行
原始日志后面
maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中
package cn.bigdata.hdfs.LogEnhance;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogEnhance { static class LogEnhanceMapper extends Mapper{ Map ruleMap = new HashMap (); Text k = new Text(); NullWritable v = NullWritable.get(); // 从数据库中加载规则信息倒ruleMap中 @Override protected void setup(Context context) throws IOException, InterruptedException { try { DBLoader.dbLoader(ruleMap); } catch (Exception e) { e.printStackTrace(); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称 Counter counter = context.getCounter("malformed", "malformedline"); String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { String url = fields[26]; String content_tag = ruleMap.get(url); // 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志 if (content_tag == null) { k.set(url + "\t" + "tocrawl" + "\n"); context.write(k, v); } else { k.set(line + "\t" + content_tag + "\n"); context.write(k, v); } } catch (Exception exception) { counter.increment(1); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogEnhance.class); job.setMapperClass(LogEnhanceMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法 job.setOutputFormatClass(LogEnhanceOutputFormat.class); //本地原始日志文件存放目录 FileInputFormat.setInputPaths(job, new Path("F:/webloginput/")); // 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat // 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出path FileOutputFormat.setOutputPath(job, new Path("F:/weblogoutput/")); // 不需要reducer job.setNumReduceTasks(0); job.waitForCompletion(true); System.exit(0); }}
总结:
1.其中在mapreduce程序中用到了计数器;获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称
2.拷贝mysql的驱动包到工程的lib目录下,这里使用本地运行模式;
3.maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter,然后再调用
RecordWriter的write(k,v)方法将数据写出
4.在setup方法中完成知识库的加载,写入到Map中
5.Map端在Context,write时,如果后面没有Reduce,将没有整个的shuffe过程,将直接调用outPutFormat进行输出,进来什
么顺序,则出去什么顺序(总之:没有reduce就没有shuffle)