map-side join:

核心思想:将小表进行分布式缓存,在map-task阶段读取缓存文件数据存储到内存数据结构中,以供reduce阶段连接查找。

适用场景:有一个或者多个小表(文件)

优点:将小表缓存,可以高效查询;由于在map阶段进行连接,所以将会大大减小map到reduce端的数据传输,从而减少不必要的shuffle耗时,提高整个mr的执行效率

缺点:如果业务全是大表不适合

示例

三张表

//login表
//注释行不为表的内容,每列之间制表符分隔
//uid sexid logindate

1	1	2017-04-17 08:16:20
2	2	2017-04-15 06:18:20
3	1	2017-04-16 05:16:24
4	2	2017-04-14 03:18:20
5	1	2017-04-13 02:16:25
6	2	2017-04-13 01:15:20
7	1	2017-04-12 08:16:34
8	2	2017-04-11 09:16:20
9	0	2017-04-10 05:16:50
//sex表
0	不知道
12
//user表
//user uname
1	小红
2	小行
3	小通
4	小闪
5	小镇
6	小振
7	小秀
8	小微
9	小懂
10	小明
11	小刚
12	小举
13	小黑
14	小白
15	小鹏
16	小习

预期结果

//目标输出:
loginuid	sex		uname	logindate
1	男	小红	2017-04-17 08:16:20
2      女	小行	2017-04-15 06:18:20
3      男	小通	2017-04-16 05:16:24
4      女	小闪	2017-04-14 03:18:20
5      男	小镇	2017-04-13 02:16:25
6      女	小振	2017-04-13 01:15:20
7      男	小秀	2017-04-12 08:16:34
9     不知道	小微	2017-04-10 05:16:50
8     女	小懂	2017-04-11 09:16:20

代码示例

//代码示例
public class MapSideJoin {
    public static class MyMapper extends Mapper<Object, Text,Text, NullWritable>{
        public Text k = new Text();

        //读取缓存文件
        //定义存储sex缓存的数据结构
        Map<String, String> sexMap = new ConcurrentHashMap<String, String>();
        Map<String, String> userMap = new ConcurrentHashMap<String, String>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Path path[] = DistributedCache.getLocalCacheFiles(context.getConfiguration());
            for (Path p : path) {
                String filename = p.getName();
                BufferedReader buf = null;
                if(filename.equals("sex")){
                    buf = new BufferedReader(new FileReader(new File(p.toString())));
                    while (buf.ready()){
                        String line = buf.readLine();
                        String sexs[] = line.split("\t");
                        sexMap.put(sexs[0], sexs[1]);
                    }
                }else if(filename.equals("user")){
                    buf = new BufferedReader(new FileReader(new File(p.toString())));
                    while (buf.ready()) {
                        String line = buf.readLine();
                        String users [] = line.split("\t");
                        userMap.put(users[0], users[1]);
                    }
                }
                if(buf != null){
                    buf.close();
                }
            }
        }

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String row = value.toString();
            String [] fields = row.split("\t");
            String uid = fields[0];
            String sex_id = fields[1];
            String uname = "";
            String sexlab = "";
            sexlab = sexMap.getOrDefault(sex_id,"不知道");
            uname = userMap.getOrDefault(uid,"");
            this.k.set(uid+"\t"+sexlab+"\t"+uname+"\t"+fields[2]);
            context.write(k,NullWritable.get());
        }

    }
    /** * 驱动方法 * @param args */
    public static void main(String[] args) {
        try {
        	//0.我的用户显示有乱码问题,这里临时设置为root用户
            System.setProperty("HADOOP_USER_NAME", "root");
            //1、获取配置对象并进行属性设置
            Configuration conf = new Configuration();
            //对conf设置
            //2、获取job
            Job job = Job.getInstance(conf, "MapSideClass");
            //3、对job设置运行主类
            job.setJarByClass(MapSideJoin.class);

            //4、对job的map端属性设置
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);


            //设置缓存 (缓存文件读取不了)
            //job.setCacheFiles();
            //sex表的位置
            job.addCacheFile(new URI("hdfs://hadoop01:9000/sex"));
            //user表的位置
            job.addCacheFile(new URI("hdfs://hadoop01:9000/user"));

            //6、设置job的输入路径和输出路径
            FileInputFormat.addInputPath(job, new Path(args[0])));
            FileOutputFormat.setOutputPath(job, new Path(args[1])));

            //7、提交作业
            int success = job.waitForCompletion(true) ? 0 : 1;
            //8、退出
            System.exit(success);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }
}

然后打jar包,发到集群,在集群上运行.
注:因为涉及到使用缓存文件,只能在集群提交集群运行,第一个参数为login表