MapReduce是一种编程模型,将任务分为两个阶段:Map和Reduce,用户只需编写
map()
和reduce()
两个函数就可以完成简单的分布式程序的设计。
MapReduce能够解决的问题有一个共同特点:任务可以被分解成多个子问题,且这些子问题相对独立,彼此之间不会有牵制,待并行完成这些子问题后,任务便被解决。
第一个MapReduce程序(WordCount)
代码
package edu.hut; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCount { public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 默认读取一行的数据 String line = value.toString(); String[] words = line.split(" "); // 将行内的内容都写入缓冲区 for (String word : words) { k.set(word); context.write(k, v); } } } public static class WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> { LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (IntWritable count : values) { sum += count.get(); } v.set(sum); context.write(key, v); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 在Windows中运行时需要加上这个 System.setProperty("HADOOP_USER_NAME","atguigu"); Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://hadoop102:9000"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
pom文件
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>edu.hut.WordCount</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>