MapReduce是一种编程模型,将任务分为两个阶段:Map和Reduce,用户只需编写map()reduce()两个函数就可以完成简单的分布式程序的设计。
MapReduce能够解决的问题有一个共同特点:任务可以被分解成多个子问题,且这些子问题相对独立,彼此之间不会有牵制,待并行完成这些子问题后,任务便被解决。

第一个MapReduce程序(WordCount)

  • 代码

      package edu.hut;
    
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import java.io.IOException;
    
      public class WordCount {
    
          public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
              Text k = new Text();
              IntWritable v = new IntWritable(1);
    
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  // 默认读取一行的数据
                  String line = value.toString();
                  String[] words = line.split(" ");
                  // 将行内的内容都写入缓冲区
                  for (String word : words) {
                      k.set(word);
                      context.write(k, v);
                  }
              }
          }
    
          public static class WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
    
              LongWritable v = new LongWritable();
    
              @Override
              protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                  long sum = 0;
                  for (IntWritable count : values) {
                      sum += count.get();
                  }
                  v.set(sum);
                  context.write(key, v);
              }
          }
    
          public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    
              // 在Windows中运行时需要加上这个
              System.setProperty("HADOOP_USER_NAME","atguigu");
              Configuration conf = new Configuration();
              conf.set("fs.default.name", "hdfs://hadoop102:9000");
              Job job = Job.getInstance(conf);
    
              job.setJarByClass(WordCount.class);
    
              job.setMapperClass(WordCountMapper.class);
              job.setReducerClass(WordCountReducer.class);
    
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(IntWritable.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(LongWritable.class);
    
              FileInputFormat.setInputPaths(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
              boolean result = job.waitForCompletion(true);
              System.exit(result ? 0 : 1);
          }
      }
  • pom文件

      <dependencies>
          <dependency>
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
              <version>RELEASE</version>
          </dependency>
          <dependency>
              <groupId>org.apache.logging.log4j</groupId>
              <artifactId>log4j-core</artifactId>
              <version>2.8.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-common</artifactId>
              <version>2.7.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>2.7.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-hdfs</artifactId>
              <version>2.7.2</version>
          </dependency>
      </dependencies>
    
      <build>
          <plugins>
              <plugin>
                  <artifactId>maven-compiler-plugin</artifactId>
                  <version>2.3.2</version>
                  <configuration>
                      <source>1.8</source>
                      <target>1.8</target>
                  </configuration>
              </plugin>
              <plugin>
                  <artifactId>maven-assembly-plugin </artifactId>
                  <configuration>
                      <descriptorRefs>
                          <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                      <archive>
                          <manifest>
                              <mainClass>edu.hut.WordCount</mainClass>
                          </manifest>
                      </archive>
                  </configuration>
                  <executions>
                      <execution>
                          <id>make-assembly</id>
                          <phase>package</phase>
                          <goals>
                              <goal>single</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>