MapReduce前提:配置文件
Hadoop的配置通过配置文件来完成,配置文件的目录在/hadoopxx/etc/hadoop/目录下有各种有关hadoop生态系统组件的配置,在代码层面,可以通过Configuration类的实例来获取配置的信息以及代表相关的配置。配置文件的信息以键,值的方式来实现。
例如:configuration-1.xml,位置(/etc/hadoop/)
<?xml version="1.0"?>
<configuration>
<property>
<name>name</name>
<value>liudong</value>
<final>true</final>
<description>author name</description>
</property>
<property>
<name>age</name>
<value>25</value>
<description>author age</description>
</property>
</configuration>
获取Configuration配置信息的代码:
package com.dong.hello;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
public class getConfiguration {
public static void main(String[] args) {
Configuration conf =new Configuration();
conf.addResource("configuration-1.xml");
String name = conf.get("name");
String age = conf.get("age");
System.out.println(name + " : " + age);
}
}
在安装haoop的主机上执行jar文件:
[ld@localhost ~]$ hadoop jar tetConfiguration.jar
liudong : 25
[ld@localhost ~]$
如果有多个Configuration文件,可以通过conf.addResource("xx")来进行添加,如果配置文件中有相同的属性,则后者会覆盖前者属性的值,好处是,用于覆盖系统默认的配置文件而不用修改原来的文件。但是如果属性的final状态时true,则不允许被后面的相同属性覆盖。
也可以在配置文件中使用系统属性或者其他属性进行定义,注意系统属性的优先级高于资源文件中定义的属性。
简化配置的辅助类:GenericOptionsParser,Tool,ToolRunner.
简单的方式是实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser.
package com.dong.hello;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class testGenericOptionsParser extends Configured implements Tool{
static {
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
for(Entry<String,String> entry : conf) {
System.out.printf("%s=%s\n",entry.getKey(),entry.getValue());
}
return 0;
}
public static void main(String[] args) throws Exception {
//testGenericOptionsParser没有调用自身的run(),而是通过ToolRunner的静态run()方法,该方法
//负责在调用自身的run()方法之前,为Tool建立一个Configuration对象,同时Tool使用GenericOptionsParser
//来获取在命令行中指定所有标准的选项。
int exitCode = ToolRunner.run(new testGenericOptionsParser(), args);
System.exit(exitCode);
}
}
MapReduce任务
Mapreduce任务分为两个处理阶段:map阶段和reduce阶段,每个阶段都是以键值对作为输入和输出。
map阶段的输入是原始数据,map函数是数据的准备阶段,可以使reduce函数能够继续对它处理。
Java Mapreduce
需要实现三部分,map函数,reduce函数,执行作业的代码。
⑴map阶段:
package com.dong.mapred;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper.Context;
/**
*
* mapreduce处理过程:
* 先是mapper,然后是reduce,输出最终的结果,有可能中间还有一些container.
*
* @author liuD
*
*/
public class MaxTemperatureMapper
extends MapReduceBase
implements Mapper<LongWritable,Text,Text,IntWritable>{
private static final int MISSING = 9999;
/**
* Mapper函数解析:
* map类是一个泛型类型有四个形参,分别制定map函数的输入键,输入值,输出键,输出值的类型。
* Hadoop自身提供了一套可以优化网络序列化传输的基本类型,而不是直接使用Java内嵌的类型。
* LongWritable 相当于 Java的 long类型
* Text 相当于 Java String类型
* IntWritable 相当于 Java Integer类型
*
*/
@Override
public void map(LongWritable arg0, Text arg1, OutputCollector<Text, IntWritable> arg2, Reporter arg3)
throws IOException {
//实现代码
}
}
⑵reduce阶段:
package com.dong.mapred;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* reduce函数分析:
* reduce函数有四个形参类型,用于指定输入和输出类型,
* reduce函数的输入类型必须 匹配map函数的输出类型:即Text 类型和IntWritable 类型。
* Context实例用于输出内容的写入,
* @author liuD
*/
public class MaxTempratureReducer
extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
//reudce处理
}
}
⑶最终执行代码阶段
package com.dong.mapred;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapreduce.Job;
import junit.framework.Test;
public class MaxTemperature {
/**
* 函数分析:
* JobConf 对象为作业对象,控制整个作业的运行
* 在hadoop集群上运行这个作业的时候,需要把代码打包成jar,但是不用知道jar文件的名称,在Job对象的setJarByClass()方法中传递一个类即可,
* hadoop利用这个类查找包含它的jar文件。
* job.setJarByClass(MaxTemperature.class)
*
* 然后是指定输入和输出数据的路径。
* 通过FileInputFormat类的静态方法addInputPath()
* 通过FileOutputFormat类中的静态方法setOutputPath()来指定输出路径。即reduce函数输出文件的写入目录。在运行作业前该目录不应该存在的,否则hadoop报错并拒绝运行。
*
* setMapperClass(),setReduceClass()方法指定要用的map类型和reduce类型。
*
* setOutputKeyClass(),setOutputValueClass()方法控制reduce函数的输出类型,并且必须和Reduce类产生的相匹配。
* map函数的输出类型默认情况下和reduce函数是相同的。因此,mapper产生出和reduce相同的类型时,不需要单独设置,但是,如果不同
* 则必须通过setMapOutputKeyClass()和setMapOutputValueClass()方法来设置map函数和输出类型。
*
* 运行作业:让作业一个一个的运行,使用JobClient.runJob(conf1),如果作业失败,抛出异常
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if(args.length != 2) {
System.out.println("Usager: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf job = new JobConf();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max Temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass((Class<? extends Reducer>) MaxTempratureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
}
}
⑷在Haoop主机上运行:
export HADOOP_CLASSPATH=hadoop-examples.jar
hadoop xxxx input/sample.txt output
使用MRUnit进行测试
⑴下载MRUnit的jar包,并导入到相应的工程中
http://www.java2s.com/Code/Jar/m/Downloadmrunit100hadoop2jar.htm
⑵进行相应的测试
测试mapper:(以《Hadoop权威指南》中的天气数据为例。
package com.dong.hello;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mrunit.MapDriver;
import org.apache.hadoop.mrunit.ReduceDriver;
import org.junit.jupiter.api.Test;
import com.dong.mapred.MaxTemperatureMapper;
import com.dong.mapred.MaxTempratureReducer;
public class TestMRUnit {
//MRUnit是一个测试库,用于将已知的输入传递给mapper或者检查reducer的输出是否符合预期。
//测试输出是否和预期的输出是否一致
@Test
public void processValidRecored() throws IOException {
Text value = new Text("xxx");
//测试mapper,MRUnit的MapDriver调用runTest()方法执行测试,需要配资mapper,
//输入key和值,期望输出key和期望输出的值。
new MapDriver<LongWritable,Text,Text,IntWritable>()
.withMapper( new MaxTemperatureMapper())
.withInput(new LongWritable(0),value)
.withOutput(new Text("1950"),new IntWritable(-11))
.runTest();
}
}
测试reducer:
package com.dong.hello;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mrunit.MapDriver;
import org.apache.hadoop.mrunit.ReduceDriver;
import org.junit.jupiter.api.Test;
import com.dong.mapred.MaxTemperatureMapper;
import com.dong.mapred.MaxTempratureReducer;
public class TestMRUnit {
//MRUnit是一个测试库,用于将已知的输入传递给mapper或者检查reducer的输出是否符合预期。
//测试reduce
@Test
public void returnMaximumIntegerInValues() throws IOException {
new ReduceDriver<Text,LongWritable,Text,IntWritable>()
.withReducer((Reducer<Text, LongWritable, Text, IntWritable>) new MaxTempratureReducer())
.withInputKey(new Text("1950"))
.withOutput(new Text("1950"),new IntWritable(10))
.runTest();
}
}
MapReduce的工作机制:
1.客户端通过Job对象的submit()方法,也可以是waitForCompletion()方法,来进行提交作业
2.作业的提交:Job的submit()方法创建一个内部的JobSummiter实例,调用其submitJobInternal()方法,
2.1JobSummiter实现的作业提交过程:⑴向资源管理器请求一个新应用ID,用于MapReduce作业ID,⑵检查作业的输出说明和计算作业的输入分片,如果没有指定输出目录或输出目录已经存在,或者分片无法计算的话,则不提交作业,⑶将运行作业所需要的资源(包括JAR文件,配置文件和计算所得的输入分片)复制到一个以作业ID命名的目录下的共享文件系统中⑷通过调用资源管理器的submitApplication()方法提交作业。
3.作业初始化:资源管理器发现调用其submitApplication()消息后,边将请求传递给YARN调度器,调度器分配一个容器,然后资源管理器在节点管理器的管理下载容器中启动application master的进程。
application master是一个Java应用程序,主类是MRAppMaster,对作业的初始化是通过创建多个薄记对象以保持对作业进度的跟踪来完成,接受分片,并对分片创建map任务对象以及由mapreduce.job,reduces属性确定多个reduce任务对象。
application master必须决定如何运行构成MapReduce作业的各个任务,如果作业很小,就选择和自己在同一个JVM上运行。uber任务运行:当application master 判断在新的容器中分配和运行任务的开销大于并行运行他们的开销时,这种任务称为uber任务。
大作业小作业:默认情况,小作业即少于1个mapper且只有1个reducer且输入大小小于一个HDFS块的作业。启用Uber任务,通过将mapreduce.job.ubertask.enable设置为true.
4.任务分配:如果作业不适合作为uber任务运行,那么application master 就会为该作业中的所有map任务和reduce任务向资源管理器请求容器。首先是Map任务发出请求,map任务的请求高于reduce任务的请求,直到5%的map任务已经完成时,reduce任务的请求才会发出。reduce任务能够在集群中的任意位置运行,但map任务的请求有着数据本地化局限。也可以为请求任务指定内存需求和CPU数,在默认情况下,每个map任务和reduce任务都分配到1024MB的内存和一个虚拟的内核。这些值可以在每个作业的基础上进行配置,通过4个属性:mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.cpu.vcoresp.memory.mb.
5.任务的执行:当为任务分配了一个特定节点上的容器,application master就通过与节点管理器通信来启动容器,该任务有主类为YarnChild的一个Java应用程序执行。运行前,首先将任务需要的资源本地化,包括作业的配置,JAR文件和所有来自分布式缓存的文件,最后,运行map任务或reduce任务。
6.任务和状态的更新:MapReduce作业运行的时间不确定。每个作业和他的每个任务都有一个状态,包括:作业或任务的状态,map和reduce的进度,作业计数器,状态消息或描述。任务在运行时,其进度保持追踪。
7.作业的完成:当application master收到作业最后一个任务已完成的通知,便把作业的状态设置为成功,然后Job轮询状态时,便知道任务已成功完成于是Job打印一条消息告知用户,然后从waitForCompletion()方法返回。最后,作业完成时,application master 和任务容器清理其工作状态,Outputcommitter的commitJob()方法被调用,作业信息有作业历史服务器存档,便于日后用户需要时可以查询。
内容部分参考《Hadoop权威指南》