代码部分
public class WordCountApp { public static class wcMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ Text outkey = new Text(); IntWritable outvalue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //把从文件中读取的一行数据由序列化类型转变成容易处理的String类型 String line = value.toString(); //把每一行数据按照符号分隔成一个单词数组 String[] words = line.split("\t"); //遍历数组,每个单词分别发送到reducer for (String word : words) { outkey.set(word); context.write(outkey,outvalue); } } } public static class wcReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ IntWritable outvalue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //求每个单词的和的初始值 int sum = 0; //遍历同一个单词的value值序列,进行求和 for (IntWritable value : values) { sum += value.get(); } outvalue.set(sum); //可以还是传递过来的key没有发生变化,和作为value。写入到文件 context.write(key,outvalue); } } public static void main(String[] args) throws Exception { //创建一个job(任务)实例 Job job = Job.getInstance(); //4个输出泛型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3个类 job.setJarByClass(WordCountApp.class); job.setMapperClass(wcMapper.class); job.setReducerClass(wcReducer.class); //2个路径(原文件输入路径,结果的目标路径) FileInputFormat.setInputPaths(job,new Path(args[0])); //Path类型的输出路径,由于下面多次使用,避免代码重复,单独接收。 Path outputPath = new Path(args[1]); //创建文件系统,做hdfs的操作 FileSystem fs = FileSystem.get(new URI(outputPath.toString()),new Configuration()); //判断输出路径是否存在,如果存在直接删除 if(fs.exists(outputPath)){ fs.delete(outputPath,true); } //如果输出路径不存在或者存在已经删掉,重新添加输出内容。 FileOutputFormat.setOutputPath(job,outputPath); job.setNumReduceTasks(3); //1提交任务 job.waitForCompletion(true); } }
代码的规范
Mapper类
把原始数据读入并划分成最小的数据单元,以键值对的形式输出到reduce端。
Reducer类
把map端传送过来的数据,key相同的,处理value序列。
Driver类
一般来说是主方法,可以指定一些mapper和reducer的一些特有的情况
比如:输入输出文件 输入输出泛型 等
举例:wordCount,地位相当于java中的 helloWorld
需求:单词统计,给一个文件,文件中有很多个单词(多行,每行多个),
按照单词相同的,数量求和。
思路:
a.必须要有原文件(很多行数据,每一行很多单词按照某个字符分隔开)
b.每行都会执行一次mapper类
对每行的数据进行split分隔,形成单词的数组
遍历数组,每遍历一次就是一个单词,直接以(单词 -> 1)发送出去
注意:一定是执行完mapper之后,再执行reducer。
c.reducer接收数据 (单词,相同单词的value序列)
d.对序列中的数据求和
e.以 (单词 -> sum) 写入文件
Mapper里有四个泛型
KEYIN,VALUEIN, KEYOUT, VALUEOUT
注意:所有的泛型都必须是序列化类型,原理与java一致,但形式不一样。
LongWritable:输入数据的key类型 代表文本文档的每行的偏移量。
Text:输入数据的value类型 代表文本文档的每行的具体内容。
Text:输出数据的key类型 代表发送给reduce的单词。
IntWritable:输出数据的value类型,代表单词数量的1。
setUp:map之前执行一次,一般不用,
如果想对某些数据做初始化可以使用
cleanUp:map之后执行一次,一般不用,
在做完map端操作之后,需要整体对某些内容操作,可以使用
map:每行内容都需要执行一次map,绝大多数逻辑都需要重写。
run:真正执行代码,非专业人员不会轻易重写run。
代码中的一些小问题:
1.输入输出路径
默认按照hdfs-site.xml或conf.set()形式设置的文件系统进行查找。
如果想强制使用本地文件,一样要在输入输出路径前加file:///E://xxx
2.输入输出路径在代码中固定
这种形式很不好,由于代码需要打包执行,所以这种形式有局限性。
可以代码中利用main方法中的args参数,动态获取路径。
idea中可以使用 run->edit configuration->program arguments
输入路径可以写到目录级别,代表当前目录下所有的文件都是原文件
或者可以 xxx/*
3.输出路径需要手动删除
输出路径不能自动覆盖,所以每次执行必须手动删除再执行。
通过代码进行判断,如果这个路径存在,先删除,再新创建。
4.控制台没有执行过程的提示信息
在main->resources下添加log4j.properties文件,等级调到INFO。
5.setUp和cleanUp怎么使用,执行的顺序
setUp是map/reduce之前执行且只执行一次
cleanUp是map/reduce之后执行且只执行一次
reduce端代码一定是在map端代码执行完毕后再执行
MapReduce中默认输入输出格式为文本文档,如果是文本文档的情况下,
map端的输入类型一定是 LongWritable和Text
LongWritable是偏移量,代表的是每一行的开头的字符排序。
6.能不能在一个类中实现所有的MapReduce的逻辑
使用静态内部类实现
7.处理map端数据的时候,split分隔的内容比较特殊怎么办?
使用正则表达式
8.Driver能不能封装
@TODO
把Driver封装成一个方法
可以使用java中的反射机制
正则表达式:
处理字符串的工具,任何语言中都有的工具。
只不过在不同的语言之间存在一点差异
语法:
asd 具体的匹配项
[asd] 从[]中随意选择一个字符
[a-f] 从[]的范围中选择一个字符
[a-zA-Z0-9] 从[]的范围中选择一个字符
[^a-z] 除了[]范围内的任一字符都能匹配
\d [0-9]任意一个数字
\D [^0-9]除了数字以外的任意一个字符
\w [a-zA-Z0-9]字母+数字
\W [^a-zA-Z0-9]除了字母+数字
\s [\t\n\r...]任意一个空白字符
\S [^\t\n\r...]除了空白字符
? 0或1次
* 0到任意多次
+ 1到任意多次
{m} m次
{m,} m到多次
{m,n} m到n次
^ 以。。。开头
$ 以。。。结尾