map-side join:
核心思想:将小表进行分布式缓存,在map-task阶段读取缓存文件数据存储到内存数据结构中,以供reduce阶段连接查找。
适用场景:有一个或者多个小表(文件)
优点:将小表缓存,可以高效查询;由于在map阶段进行连接,所以将会大大减小map到reduce端的数据传输,从而减少不必要的shuffle耗时,提高整个mr的执行效率
缺点:如果业务全是大表不适合
示例
三张表
//login表
//注释行不为表的内容,每列之间制表符分隔
//uid sexid logindate
1 1 2017-04-17 08:16:20
2 2 2017-04-15 06:18:20
3 1 2017-04-16 05:16:24
4 2 2017-04-14 03:18:20
5 1 2017-04-13 02:16:25
6 2 2017-04-13 01:15:20
7 1 2017-04-12 08:16:34
8 2 2017-04-11 09:16:20
9 0 2017-04-10 05:16:50
//sex表
0 不知道
1 男
2 女
//user表
//user uname
1 小红
2 小行
3 小通
4 小闪
5 小镇
6 小振
7 小秀
8 小微
9 小懂
10 小明
11 小刚
12 小举
13 小黑
14 小白
15 小鹏
16 小习
预期结果
//目标输出:
loginuid sex uname logindate
1 男 小红 2017-04-17 08:16:20
2 女 小行 2017-04-15 06:18:20
3 男 小通 2017-04-16 05:16:24
4 女 小闪 2017-04-14 03:18:20
5 男 小镇 2017-04-13 02:16:25
6 女 小振 2017-04-13 01:15:20
7 男 小秀 2017-04-12 08:16:34
9 不知道 小微 2017-04-10 05:16:50
8 女 小懂 2017-04-11 09:16:20
代码示例
//代码示例
public class MapSideJoin {
public static class MyMapper extends Mapper<Object, Text,Text, NullWritable>{
public Text k = new Text();
//读取缓存文件
//定义存储sex缓存的数据结构
Map<String, String> sexMap = new ConcurrentHashMap<String, String>();
Map<String, String> userMap = new ConcurrentHashMap<String, String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Path path[] = DistributedCache.getLocalCacheFiles(context.getConfiguration());
for (Path p : path) {
String filename = p.getName();
BufferedReader buf = null;
if(filename.equals("sex")){
buf = new BufferedReader(new FileReader(new File(p.toString())));
while (buf.ready()){
String line = buf.readLine();
String sexs[] = line.split("\t");
sexMap.put(sexs[0], sexs[1]);
}
}else if(filename.equals("user")){
buf = new BufferedReader(new FileReader(new File(p.toString())));
while (buf.ready()) {
String line = buf.readLine();
String users [] = line.split("\t");
userMap.put(users[0], users[1]);
}
}
if(buf != null){
buf.close();
}
}
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String row = value.toString();
String [] fields = row.split("\t");
String uid = fields[0];
String sex_id = fields[1];
String uname = "";
String sexlab = "";
sexlab = sexMap.getOrDefault(sex_id,"不知道");
uname = userMap.getOrDefault(uid,"");
this.k.set(uid+"\t"+sexlab+"\t"+uname+"\t"+fields[2]);
context.write(k,NullWritable.get());
}
}
/** * 驱动方法 * @param args */
public static void main(String[] args) {
try {
//0.我的用户显示有乱码问题,这里临时设置为root用户
System.setProperty("HADOOP_USER_NAME", "root");
//1、获取配置对象并进行属性设置
Configuration conf = new Configuration();
//对conf设置
//2、获取job
Job job = Job.getInstance(conf, "MapSideClass");
//3、对job设置运行主类
job.setJarByClass(MapSideJoin.class);
//4、对job的map端属性设置
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//设置缓存 (缓存文件读取不了)
//job.setCacheFiles();
//sex表的位置
job.addCacheFile(new URI("hdfs://hadoop01:9000/sex"));
//user表的位置
job.addCacheFile(new URI("hdfs://hadoop01:9000/user"));
//6、设置job的输入路径和输出路径
FileInputFormat.addInputPath(job, new Path(args[0])));
FileOutputFormat.setOutputPath(job, new Path(args[1])));
//7、提交作业
int success = job.waitForCompletion(true) ? 0 : 1;
//8、退出
System.exit(success);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
}
然后打jar包,发到集群,在集群上运行.
注:因为涉及到使用缓存文件,只能在集群提交集群运行,第一个参数为login表