前言

对于HDFS分布式文件系统来说,其Namenode会定期将文件系统的命名空间(文件目录树、文件/目录元信息)保存到fsimage文件中,以防止Namenode掉电或者进程崩溃。但如果Namenode实时地将内存中的元数据同步到fsimage文件中,将会非常消耗资源且造成Namenode运行缓慢。所以Namenode会先将命名空间的修改保存在editlog文件中,然后定期合并fsimage和editlog文件

FSImage类

FSImage类主要实现了以下功能。

  • 1.保在命名空间——将当前时刻Namenode内存中的命名空间保存到fsimage文件中。
  • 2.加载fsimage文件—将磁盘上fsimage文件中保存的命名空间加载到Namenode内存中,这个操作是保存命名空间操作的逆操作。
  • 3.加载editlog文件—Namenode加载了fsimage文件后,内存中只包含了命名空间在保存fsimage文件时的信息,Namenode还需要加载后续对命名空间的修改操作,即editlog文件中记录的内容。所以FSImage类还提供了加载 editlog文件到Namenode内存中的功能。

1.保存命名空间

FSImage类最重要的功能之一就是将当前时刻Namenode的命名空间保存到fsimage文件中
FSImage类中这个功能的实现,保存namespace操作的调用顺序,如图所示:

图片说明

FSNameSystem会调用FSImage.saveNamespace()方法触发命名空间的保存操作,saveNamespace()会调用saveFSImagelnAlIDirs()方法执行具体的保存逻辑。我们从入口方法saveFSImagelnAIDirs()开始介绍。
(1)saveFSImagelnAllDirs()
Namenode 可以定义多个存储路径来保存fsimage文件,对于每一个存储路径,saveFSImagelnAllDirs()方法都会启动一个线程负责在这个路径上保存fsimage文件。同时,为了防止保存过程中出现错误,命名空间信息首先会被保存在一个fsimage.ckpt文件中,当保存操作全部完成之后,才会将fsimage.ckpt 重命名为fsimage文件。之后saveFSImagelnAlIDirs()方*清理Namenode元数据存储文件夹中过期的editlog文件和fsimage文件。
saveFSImagelnAllDirs()方法的代码如下:

private synchronized void saveFSImageInA1lpirs(FSNamesystem source,NameNodeFile nnf,long txid,Canceler canceler)throws IoException{
//构造保存命名空间操作的上下文
SaveNamespaceContext ctx=new SaveNamespaceContext(
source,txid,canceler);List<Thread>saveThreads =new Arraylist<Thread>();
//在每一个保存路径上启动一个线程,该线程使用FSImageSaver 类保存fsimage文件for(Iterator<storageDirectory>it
=storage.dirIterator(NameNodeDirType.IMAGE);it.hasNext();){
StorageDirectory sd=it.next();FSImageSaver saver=new FSImageSaver(ctx,sd,nnf);Thread saveThread=new Thread(saver,saver.tostring());saveThreads.add(saveThread);saveThread.start();
//等待所有线程执行完毕
waitForThreads(saveThreads);saveThreads.clear();storage.reportErrorsOnDirectories(ctx.getErrorSDs());
//保存文件失败则抛出异常
if(storage.getNumStorageDirs(NameNodeDirType.IMAGE)==0){
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
//将fsimage.ckpt改名为fsimage renameCheckpoint(txid,NameNodeFile.IMAGE_NEW,nnf,false);
//我们已经完成了fsimage的保存,那么可以将存储上的一部分edit1og和fsimage删除purgeoldstorage(nnf);
]finally{
//通知所有等待的线程
ctx.markComplete();ctx=nul1;prog.endPhase(Phase.SAVING_CHECKPOINT);

通过分析saveFSImagelnAIDirs()方法可知,命名空间具体的保存操作是由FSImageSaver这个类来承担的,FSImageSaver是FSlmage中的内部类,也是一个线程类,它的run()方法调用了saveFSImage()方法来保存fsimage文件。我们再看一下saveFSlmage()方法的实现。
saveFSImage()方法 使用一个FSImageFormat.Saver 对象来完成保存操作,FSImageFormat.Saver 类会以fsimage文件定义的格式保存Namenode的命名空间信息,需要注意命名空间信息会先写入fsimage.ckpt文件中。saveFSImage()方法还会生成fsimage文件的md5校验文件,以确保fsimage文件的正确性。saveFSlmage()方法的代码如下:

void saveFSImage(SaveNamespaceContext context,StorageDirectory sd)
throws IOException{
1ong txid=context.getTxId();//获取当前命名空间中记录的最新事务的txid
//fsimage文件
File newFile=NNStorage.getstorageFile(sd,NameNodeFile.IMAGE_NEW,txid);File dstFile=NNStorage.getStorageFile(sd,NameNodeFile.IMAGE,txid);
//FSImageFormatProtobuf.Saver类负责保存
fsimage FSImageFormatProtobuf.Saver saver=new FSImageFormatProtobuf.Saver(context);
FSImageCompression compression =FSImageCompression.createCompression(conf);//压缩类
saver.save(newFile,compression);//调用Saver类保存fsimage文件MD5FileUtils.saveMD5File(dstFile,saver.getSavedDigest());//保存MD5校验值storage.setMostRecentCheckpointInfo(txid,Time.now());

saveFSlmage()方法构造了一个FSImageFormatProtobuf.Saver对象来保存命名空间,FSlmageFormatProtobuf是一个工具类,它提供了以protobuf 格式读取和写入fsimage文件的方法。HDFS2.4版本的实现中,FSImage使用FSImageFormat 类作为读取和写入fsimage文件的标准格式类,而HDFS2.6版本则使用了FSImageFormatProtobuf替代了FSImageFormat类。
FSImageFormatProtobuf 除了有Saver 内部类用于保存命名空间到fsimage文件外,还提供了一个Loader内部类用于解析和加载fsimage文件。这里我们先学习FSImageFormatProtobuf.Saver类的实现。

(2)FSImageFormatProtobuf.Saver
HDFS2.6版本使用了protobuf作为fsimage文件序列化的工具,fsimage文件的格式也被重新定义了。它包括了4个部分的信息。

  • MAGIC:fsimage的文件头,是“HDFSIMG1”这个字符串的二进制形式,MAGIC头标识了当前fsimage文件是使用protobuf格式序列化的。FSImage类在读取fsimage文件时,会先判断fsimage文件是否包含了MAGIC头,如果包含了则使用protobuf格式反序列化fsimage文件。
  • SECTIONS:fsimage 文件会将同一类型的Namenode 元信息保存在一个section中,例如将文件系统元信愿保存在NameSystemSection中,将文件系统目录树中的所有INode信息保存在INodeSection中,将快照信息保存在SnapshotSection中等。
  • FileSummary:FileSummary记录了fsimage文件的元信息,以及fsimage文件保存的所有section的信息。FileSummary中的ondiskVersion字段记录了fsimage文件的版本号(2.6版本中这个字段的值为1),layout Version字段记录了当前HDFS的文件系统布局版本号,codec字段记录了fsimage文件的压缩编码,sections字段则记录了fsimage文件中各个section字段的元信息,每个fsimage文件中记录的section在FileSummary中都有一个与之对应的section字段。FileSummary的section字段记录了对应的fsimage中section的名称、在fsimage文件中的长度,以及这个section在fsimage中的起始位置。FSImage类在读取fsimage文件时,会先从fsimage中读取出FileSummary部分,然后利用FileSummary记录的元信息指导fsimage文件的反序列化操作。
  • EileSummaryLength:FileSummaryLength 记录了FileSummary 在fsimage文件中所占的长度,FSImage类在读取fsimage文件时,会首先读取FileSummaryLength获取FileSummary部分的长度,然后根据这个长度从fsimage中反序列化出FileSummary。

FSImageFormatProtobuf.Saver类就是以protobuf格式将Namenode的命名空间保存至 fsimage文件的工具类,这个类的入口方法是save()方法。save()方法打开 fsimage文件的输出流并且获得文件通道,然后调用savelntemal()方法将命名空间保存到 fsimage文件中savelntemal()方法首先构造底层fsimage文件的输出流,构造 fsimage文件的描述类 FileSummary,然后在 FileSummary 中记录 o ndisk Version layout Version codec 等信息。接下来 savelntemal()方法依次向fsimage文件中写入命名空间信息、inode信息、快照信息、安全 信息、缓存信息、StringTable信息等。注意上述信息都是以section为单位写入的,每个section 的格式定义请参考fsimage.proto文件。savelntemal()方法以section为单位写入元数据信息时, 还会在FileSummary中记录这个section的长度,以及section在 fsimage文件中的起始位置等 信息。当完成了所有section的写入后,FileSummary对象也就构造完毕了,savelntemal()最后会将FileSummary对象写入fsimage文件中,savelntemal()方法的代码如下:

private void saveInternal(FileOutputStream fout,FSImageCompression compression,String filepath)throws IOException{
//构造输出流,一边写入数据,一边写入校验值
MessageDigest digester=MD5Hash.getDigester();
underlyingoutputStream=new DigestoutputStream(new BufferedoutputStream(fout),digester);
underlyingoutputStream.write(FSImageUtil.MAGIC_HEADER);
fileChanne1=fout.getchannel();
//FileSummary为fsimage文件的描述部分,也是protobuf定义的FileSummary.Builderb=FileSummary.newBuilder()
.setOndiskVersion(FSImageUtil.FILE_VERSION)
.setLayoutVersion(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
codec=compression.getImagecodec();//获取压缩格式,并装饰输出流
if(codec!=nul1){
b.setCodec(codec.getClass().getCanonicalName());
sectionoutputStream=codec.createOutputStream(underlyingoutputStream);
}else{
sectionOutputStream=underlyingoutputStream;
saveNameSystemsection(b);//保存命名空间信息
context.checkCancelled();//检查是否取消了保存操作
saveINodes(b);//保存命名空间中的inode信息
saveSnapshots(b);//保存快照信息
saveSecretManagerSection(b);//保存安全信息
saveCacheManagerSection(b);//保存缓存信息
savestringTableSection(b);//保存stringTable 
flushSectionoutputStream();//flush 输出流
FileSummary summary=b.build();
saveFileSummary(underlyingoutputStream,summary);//将FileSummary写入文件
underlyingoutputStream.close();//关闭底层输出流
savedDigest=new MD5Hash(digester.digest());

2.加载fsimage文件

当Namenode启动时,首先会将fsimage文件中记录的命名空间加载到Namenode内存中, 然后再一条一条地将editlog文件中记录的更新操作加载并合并到命名空间中。接下来Namenode会等待各个Datanode向自己汇报数据块信息来组装blockMap,从而离开安全模式。 Namenode每次启动时都会调用FSImage.loadFSImage()方法执行加载fsimage和editlog文件的操作
我们看一下loadFSImage()方法的代码。

 boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
/ / 准 备 工 作 •••
//获取editlog文件IO流
initEditLog();
if(LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,getLayoutVersion())){
long toAtLeastTxId=editLog.isopenForWrite()?inspector.getMaxSeenTxId():0;editStreams=editLog.selectInputStreams(
imageFiles.get(0).getCheckpointTxId()+1,toAtLeastTxId,recovery,false);
}else{
editStreams=FSImagePreTransactionalstorageInspector
.getEditLogStreams(storage);
//调用1oadFSImageFile()方法,加载fsimage文件
for(inti=0;i<imageFiles.size();i++){
try{
imageFile=imageFiles.get(i);
loadFSImageFile(target,recovery,imageFile);
break;
)catch(IOException ioe){
LoG.error("Failed to load image from"+imageFile,ioe);
target.clear();
imageFile=nul1;
//如果加载失败,则抛出异常
if(imageFile==null){
FSEditLog.closeA11Streams(editStreams);
throw new IOException("Failed to load an FSImage file!");
//调用1oadEdit()方法加载并合并editlog 
long txnsAdvanced=loadEdits(editStreams,target,recovery);
needToSavel=needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),txnsAdvanced);
editLog.setNextTxId(lastAppliedTxId +1);
return needToSave;

在loadFSImage()方法中加载fsimage文件是通过调用FSImage.loadFSImageFIle()方法实现 的,加 载 editlog文件则是通过调用FSImage.loadEdits()方法实现
FSImage.loadFSImage()方法调用 FSImage.loadFSImageFile()方法执 行加载fsimage文件的操作。loadFSImageFile。方法的调用顺序如图所示。

图片说明

loadFSImageFile()方法根据当前Namenode的版本调用不同的加载方法,不同版本加载 方法的区别主要是在fsimage文件的校验操作上,例如对于2.6版本的加载操作,每个fsimage 文件都有一个对应的md5校验文件。fsimage文件最终的加载工作是由私有的loadFSImage() 方法实现的,loadFSImage()方法构造一个 FSImageFormat.LoaderDelegator 对象加载 fsimage 文件,这个对象的load()方法判断当前fsimage使用了什么序列化方式,如果在fsimage文 件 中 有MAGIC_HEADER,则 该 fsimage文件使用的是protobuf序列化方式,那么构造 FSImageFormatProtobuf.Loader执行加载操作,否则使用FSImageFormat.Loader类执行加载操作。
load()的代码如下:

public void load(File file,boolean requireSameLayoutVersion)
throws IOException{
FileInputStream is=null;

try{
is=new FileInputStream(file);
byte[]magic=new byte[FSImageUti1.MAGIC_HEADER.1ength];roUtils.readFully(is,magic,0,magic.length);
//fsimage文件中包括magicHeader,使用的是protobuf序列化方式if(Arrays.equals(magic,FSImageUtil.MAGIC_HEADER)){
//构造FSImageFormatProtobuf.Loader加载fsimage文件FSImageFormatProtobuf.Loader loader =new FSImageFormatProtobuf.Loader(
conf,fsn,requireSameLayoutVersion);
imp1=loader;
loader.load(file);
)else{
//否则构造FSImageFormat.Loader加载fsimage文件
Loader loader=new Loader(conf,fsn);
imp1=loader;
loader.1oad(file);
)finally{
roUtils.cleanup(LOG,is);

HDFS2.6版本使用protobuf作为fsimage的序列化工具。所以在加载fsimage操作中,最终会调用FSImageFormatProtobuf.Loader作为fsimage 文件的加载类。FSImageFormatProtobuf.Loader.loadlntenal()方法执行了加载 fsimage 文件的操作,loadlntenal()方法打开fsimage文件通道,然后读取fsimage文件中的FileSummary对象,FileSummary对象中记录了 fsimage中保存的所有section的信息。loadlntenal()会对 FileSummary对象中保存的section排序,然后遍历每个section并调用对应的方法从fsimage 文件中加载这个section。
loadlntenal()方法的代码如下:

private void loadInternal(RandomAccessFile raFile,FileInputStream fin)
throws IOException{
//从fsimage文件末尾加载FileSummary,也就是fsimage文件内容的描述FileSummary summary=FSImageUti1.1oadSummary(raFile);
//获取通道
FileChannel channel=fin.getchannel();
//构造FSImageFormatPBINode.Loader和FSImageFormatPBSnapshot.Loader 加载INode 以及Snapshot 
FSImageFormatPBINode.Loader inodeloader =new FSImageFormatPBINode.Loader(
fsn,this);
FSImageFormatPBSnapshot.Loader snapshotLoader=new FSImageFormatPBSnapshot.Loader(
fsn,this);
//对fsimage文件描述中记录的sections进行排序
ArrayList<FileSummary.Section>sections=Lists.newArrayList(summary
.getSectionsList());

Collections. sort(sections, new Comparator<FileSummary. Section>(){
@ override public int compare(FileSummary. Section s1, FileSummary. Section s2){
SectionName n1=SectionName. fromString(sl. getName()); 
SectionName n2=SectionName. fromString(s2. getName()); if (n1==nul1){
return n2==null?0:-1;) else if (n2== null){
return-1;
} else{
return n1. ordinal()-n2. ordinal();
}}
});


//遍历每个section,并调用对应的方法加载这个section for(FileSummary.Sections:sections){
channel.position(s.getoffset());
//在通道中定位这个section的起始位置
InputStream in=new BufferedInputStream(new LimitInputStream(fin,s.getLength()));in=FSImageUtil.wrapInputStreamForCompression(conf,summary.getCodec(),in);
string n=s.getName();
switch(SectionName.fromstring(n)){
//调用对应的方法加载不同的section 
case NS_INFO:
1oadNamesystemsection(in);
break;
case STRING_TABLE:loadstringTablesection(in);break;
case INODE:{
currentStep=new Step(StepType.INODES);inodeLoader.loadINodeSection(in);break;
case INODE REFERENCE:snapshotLoader.loadINodeReferenceSection(in);break;
case INODE_DIR:inodeloader.loadINodeDirectorysection(in);break;case FILES UNDERCONSTRUCTION:inodelLoader.loadFilesUnderConstructionsection(in);break;
case SNAPSHOT:
snapshotLoader. loadSnapshotSection(in); break; 
case SNAPSHOT DIFF: snapshotLoader. loadSnapshotDiffSection(in); break; case SECRET_MANAGER:{
1oadSecretManagerSection(in); break; 
case CACHE_MANAGER:{
1oadCacheManagerSection(in); break; 
default: LoG. warn("Unrecognized section"+n); break;
)}

对于不同类型的section,loadlnternal()方法调用不同的方法加载这个section,例如对于INodeSection会调用InodeLoader.loadINodeSection()方法加载。load()方法的实现都比较简单,就是按照protobuf格式加载不同的section


3.加载editlog文件

Namenode将fsimage中记录的特定时刻的命名空间镜像加载到内存之后,还需要加载后续对命名空间的修改,也就是editlog中记录的修改命名空间的操作。Namenode会调用FSImage.loadEdits()方法将editlog文件中记录的更新操作与当前Namenode的命名空间进行合并FSImage.loadEdits()方法构造一个FSEditLogLoader对象,然后遍历Namenode所有存储路径上保存的editlog文件的输入流,并调用FSEditLogLoader.loadFSEdits()方法加载指定路径上的editlog文件。loadEdits()方法的代码如下:

private long loadEdits(Iterable<EditLogInputStream>editStreams,FSNamesystem target,StartupOption startOpt,MetaRecoveryContext recovery)
throws IOException{
//记录命名空间中加载的最新的事务id 
long prevLastAppliedTxId=lastAppliedTxId;
try{
//构造FSEditLogLoader对象用于加载editlog文件
FSEditLogLoader loader =new FSEditLogLoader(target,lastAppliedTxId);
//遍历所有存储路径上editlog文件对应的输入流
for(EditLogInputStream editIn:editStreams){
//调用FSEditLogLoader.loadFSEdits()从某个存储路径上的edit1og文件加载修改操作
try{
loader.loadFSEdits(editIn,lastAppliedTxId +1,startopt,recovery);
}finally{
//lastAppliedTxId记录从editlog加载的最新的事务id lastAppliedTxId=loader.getLastAppliedTxId();if(editIn.getlastTxId()!=HdfsConstants.INVALID_TXID){
1astAppliedTxId=editIn.getLastTxId();
)finally{
//关闭所有editlog文件的输入流
FSEditLog.closeA11Streams(editStreams);updateCountForQuota(target.dir.rootDir);
return lastAppliedTxId-prevLastAppliedTxId;

FSEditLogLoader.loadFSEdits()会使用EditLoglnputStream对象读取并加载 editlog文件,这个方法的操作比较冗长,基本可以归纳为从ediltLog文件中读取一个操作,并使用FSEditLogOp对象封装,然后调用applyEditLogOp()方法修改Namenode的命名空间。
FSEditLogLoader.loadFSEdits()方法的简版代码如下:

//从editlog文件中读取一个操作
FSEditLogop op;
try{
op=in.readOp();
if(op==null){
break;
}
)catch(Throwable e){
//...
//.…
//在当前命名空间中执行对应的修改操作
1ong inodeId=applyEditLogop(op,fsDir,in.getVersion(),1astINoderd);
if(lastINodeId<inodeId){
lastINodeId=inodeId;

4.检查点机制

一个正常大小的editlog文件往往在几十到几百个字节之间,但在某些极端的情况下,editlog文件会变得非常大,甚至将磁盘空间写满。通过上面小节的学习我们知道,在Namenode 启动过程中一个很重要的部分就是逐条读取editlog文件中的记录,之后与Namenode命名空间合并。巨大的editlog文件会导致Namenode的启动时间过长,为了解决这个问题,HDFS 引入了检查点机制(checkpointing)

如图所示,HDFS的检查点机制会定时将editlog文件与fsimage文件合并以产生新 的fsimage文件。这样Namenode就可以直接从fsimage加载元数据,而不用读取与合并editlog 中的记录了。有了检查点机制,Namenode命名空间的重建效率会大大提高,同时Namenode 的启动时间也会相应减少。

但是合并fsimage与editlog以产生新的fsimage文件是一项非常消耗I/O和 CPU资源的 操作。在执行检查点操作期间,Namenode还需要限制其他用户对HDFS的访问和操作。为了预防这种情况的发生,HDFS将检查点操作从Active Namenode移动到了 Secondary Namenode 或者Standby Namenode 上 , 至于具体是哪一种Namenode,则取决于当前的HDFS是否开启了HA功能。

图片说明

在以下两种情况下,Namenode会触发一次检查点操作
① 超过了配置的检查点操作时 长 (dfs.namenode.checkpoint.period配置项配置);
② 从上一次检查点操作后,发生的事务 (transaction) 数超过了配置(dfs.namenode.checkpoint.txns配置项配置)。
(1) Secondary Namenode执行检查点操作
在非HA部署环境下,检查点操作是由Secondary Namenode来执行的。Secondary Namenode是 HDFS1.X版本中一个非热备的Namenode备份节点,整个检查点流程如图所示。

图片说明

整个检查点流程如下所示。

  • Secondary Namenode检查两个触发检查点流程的条件是否满足。由于在非HA状态下,Secondary Namenode和Namenode之间并没有共享的editlog文件目录,所以最新的事务 id (transactionld) 是 Secondary Namenode 通过调用 RPC 方法 Namenode Protocol.getTransactionld ()获取的。
  • Secondary Namenode 调用 RPC 方法 NamenodeProtocol.rollEditLog()触发 editlog 重置 操作,将当前正在写的editlog段落结束,并创建新的edit.new文件,这个操作还会 返回当前fsimage以及刚刚重置的editlog的事务id (seen id)o这样当Secondary Namenode从Namenode读取editlog文件时,新的操作就可以写入edit.new文件中, 不影响editlog记录功能。在 HA模式下,并不需要显式地触发editlog的重置操作, 因为 Standby Namenode 会定期重置 editlog
  • 有了最新的txid以及seen id, Secondary Namenode就会发起HTTP GET请求到 Namenode 的 GetlmageServlet 以获取新的 fsimage 和 editlog 文件。需要注意,Secondary Namenode在进行上一次的检查点操作时,可能已经获取了部分fsimage和edits文件。
  • Secondary Namenode会加载新下载的fsimage文件以重建Secondary Namenode的命 名空间。
  • Secondary Namenode读取edits中的记录,并与当前的命名空间合并,这样Secondary Namenode的命名空间和Namenode的命名空间就同步了。
  • Secondary Namenode将最新的同步的命名空间写入新的fsimage文件中。
  • Secondary Namenode 向 Namenode 的 ImageServlet 发送 HTTP GET 请求/getimage? putimage=l。这个请求的URL中还包含了新的fsimage文件的事务ID,以及Secondary Namenode用于下载的端口和IP地址。
  • Namenode 会根据 Secondary Namenode 提供的信息向 Secondary Namenode 的 GetlmageServlet发起HTTP GET请求下载fsimage文件。Namenode首先将下载文件 命名为fsimage.ckpt_,然后创建MD5校验和,最后将fsimage.ckpt_重命名为fsimage_。
    至此,一个完整的fsimage的检查点操作就完成了。

(2)Standby Namenode执行检查点操作
HDFS2.X版本中加入了 Namenode HA策略,这使得HA机制下检查点操作的流程与非 HA机制下的完全不同,因为在HA配置下已经没有Secondary Namenode这个节点了,而是 直接通过配置奇数个JoumalNode来实现Namenode热备HA策略。 这里我们首先介绍HDFS2.X版本中的HA策略,在同一个HA HDFS集群中,将会同时运行两个Namenode实例,其中一个为Active Namenode,用于实时处理所有客户端请求另一个为Standby Namenode, Standby Namenode 的命名空间与ActiveNamenode是完全保持一致的。所以当ActiveNamenode岀现故障时, Standby Namenode可以立即切换成Active状态。 为了让Standby Namenode的命名空间与Active Namenode保持同步,它们都将和 JoumalNodes守护进程通信。当Active Namenode执行任何修改命名空间的操作时,它至少需 要将产生的editlog文件持久化到N- (N-1)/2个JoumalNode节点上才能保证命名空间修改的安 全性。换句话说,如果在HA策略下启动了 N 个JoumalNode进程,那么整个JoumalNode集群最多允许(N-1)/2个进程死掉,这样才能保证editlog成功完整地被写入。比如集群中有3 个 JoumalNode时,最多允许1个JoumalNode挂掉;集群中有5 个JoumalNode时,最多允许2 个JoumalNode挂掉。Standby Namenode则负责观察editlog文件的变化,它能够从JoumalNodes 中读取editlog文件,然后合并更新到它自己的命名空间中。一旦Active Namenode岀现故障, Standby Namenode就会保证从JoumalNodes中读出全部的editlog文件,然后切换成Active状 态。Standby Namenode读取全部的editlog文件可确保在发生故障转移之前,和 Active Namenode拥有完全同步的命名空间状态。

Standby Namenode始终保持着一个最新版本的命名空间,它会不断地将读入的editlog文 件与当前的命名空间合并,所以检查点机制在HA模式下就简单了很多。Standby Namenode 只需判断当前是否满足触发检查点操作的两个条件,如果满足触发条件,则 将 Standby Namenode的命名空间写入一个新的fsimage文件中,并通过HTTP将这个fsimage文件传回 Active Namenode HA状态下的Standby Namenode检查点流程如图:

图片说明

  • Standby Namenode检查是否满足触发检査点操作的两个条件。
  • Standby Namenode将当前的命名空间保存到fsimage.ckpt_txid文件中,这里的txid是当前最新的editlog文件中记录的事务ido之后Standby Namenode写入fsimage文 件 的MD5校验和,最后重命名这个fsimage.ckpt_txid文件为fsimage_txido当执行 这个操作时,其他的Standby Namenode操作将会被阻塞,例如Active Namenode发 生错误时,需要进行主备切换或者访问Standby Namenode的Web接口等操作。注意, Active Namenode的操作并不会被影响,例如listing、readingwriting文件等。
  • Standby Namenode 向 Active Namenode 的 ImageServlet 发送 HTTP GET 请求 /getimage?putimage=1 这个请求的URL中包含了新的fsimage文件的事务id ,以及 Standby Namenode用于下载的端口和IP地址。
  • Active Namenode 会根据 Standby Namenode 提供的信息向 Standby Namenode 的 ImageServlet发起HTTP GET请求下载fsimage文件。Namenode首先将下载文件命 名 为 fsimage,ckpt_* , 然 后 创 建 M D5校验和,最 后 将 fsimage.ckpt_重命名为 fsimage_

知道了检查点操作的流程之后,我们看 一 下HDFS代码是如何实现检查点功能的。 StandbyNamenode会 持 有 一 个 StandbyCheckpointer类 , 这 个 类 维 护 着 一 个 叫 作 CheckpointerThread 的线程,CheckpointerThread 线程会每隔 1000*Math.min(checkpointCheck Period, checkpointPeriod)秒 检 测 是 否 执 行 一 次 检 查 点 逻 辑 (checkpointCheckPeriod 由 dfs.namenode.checkpoint.period 配置,,checkpointPeriod 则由dfknamenode.checkpoint.check.period 配置)。整个检查点逻辑是在CheckpointeiThread.doWork()方法中实现的,流程与上面介绍的 一 样 。doWork()方法首先会判断是否满足检查点操作的两个条件,如果满足则调用 doCheckpoint()执行检查点操作

private void dowork(){
while(shouldRun){
try{
//间隔时长1000*Math.min(checkpointCheckPeriod,checkpointPeriod)
Thread.sleep(1000*checkpointConf.getCheckPeriod());
}catch(InterruptedException ie){
if(!shouldRun){
break;
try{
long now=now();
//获得最后一次往JournalNode写入的txid和最近一次做检查点的txid的差值1ong uncheckpointed=countUncheckpointedTxns();
//计算当前时间和上一次检查点操作时间的间隔
long secssinceLast=(now-lastCheckpointTime)/1000;
boolean needCheckpoint=false;
//第一种符合合并的情况:当最后一次往 JournalNode写入的txid和最近一次做检查点的txid的差值大于或者等于dfs.namenode.checkpoint.txns配置的数量(默认为1000000)时做一次合并if(uncheckpointed>=checkpointConf.getTxncount()){
needCheckpoint=true;
//第二种符合合并的情况:当时间间隔大于或者等于dfs.namenode.checkpoint.period配置的时间时做合并
else if(secssincelast>=checkpointConf.getPeriod()){
needCheckpoint=true;
//满足检查点执行条件,则调用docheckpoint()方法执行检查点操作if(needcheckpoint){
docheckpoint();
lastCheckpointTime=now;
}catch(SaveNamespaceCancelledException ce){
canceledCount++;
}catch(InterruptedException ie){
continue;
}catch(Throwable t){
)finally{
synchronized(cancelLock){
canceler=nul1;
}
}
}
}
}

可以看到,整个检查点执行操作的逻辑都是在doCheckpoint方法中实现的doCheckpoint()方法首先获取当前保存的fsimage的prevCheckpointTxld,然后 获取最近更新的 editlog 的 thisCheckpointTxId, 只有新的 thisCheckpointTxId 大于 prevCheckpointTxId, 也就是当前命名空间有更新,但是并没有保存到新的fsimage文件中时,才有必要进行一次检査点操作。 判断完成后,doCheckpointO会调用saveNamespace()方法将最新的命名空间保存到fsimage文件 中。之后构造一个线程,将新产生的fsimage文件通过HTTP方式上传到AvtiveNamenode中

private void doCheckpoint()throws InterruptedException,IOException{
//.
namesystem.1ongReadLockInterruptibly();
try{
//获取当前Standby Namenode上保存的最新的fsimage对象
FSImage img =namesystem.getFSImage();
//获取fsimage中保存的txid,也就是上一次进行检查点操作时保存的txid 
long prevCheckpointTxId=img.getStorage().getMostRecentCheckpointTxId();
//获取当前命名空间的最新的txid,也就是收到的editlog的最新的txid
1ong thischeckpointTxId=img.getLastAppliedorwrittenTxId();
//thischeckpointTxId一定大于prevcheckpointTxId 
assert thisCheckpointTxId >=prevCheckpointTxId;
//如果相等则没有必要执行检查点操作,当前fsimage已经是最新的了
if(thischeckpointTxId==prevcheckpointTxId){
return;
}
if(namesystem.isRollingUpgrade()
&&!namesystem.getFSImage().hasRollbackFSImage()){
//如果当前Namenode正在执行升级操作,则创建fsimage rollback文件imageType=NameNodeFile.IMAGE_ROLLBACK;
)else{
//在正常情况下创建fsimage文件
imageType=NameNodeFile.IMAGE;
)
//调用saveNamespace()将当前命名空间保存到新的文件中img.saveNamespace(namesystem,imageType,canceler);
}finally{
namesystem.1ongReadunlock();
}
//构造一个线程,通过HTTP将fsimage上传到Active Namenode中
ExecutorService executor =Executors.newSingleThreadExecutor(uploadThreadFactory); Future<Void>upload=executor. submit(new Callable<Void>(){
@ override 
public Void cal1() throws IOException{
TransferFsImage. uploadImageFromStorage(activeNNAddress, conf, namesystem. getFSImage(). getstorage(), imageType, txid, canceler); 
return null;
}); 
executor. shutdown(); 
try{
upload. get();
) catch(InterruptedException e){
upload. cance1(true); 
throw e;
) catch(ExecutionException e){
throw new IOException("Exception during image upload:"+e. getMessage(), e. getCause());

doCheckpoint()方法调用了FSImage.saveNamespace()方法将当前命名空间保存到新的fsimage文件中。saveNamespace()方法首先重置(roll)了editlog文件,将当前的edit inprogress文件关闭并重命名,为与fsimage文件合并做准备。之后调用saveFSImagelnAlIDirs()将 fsimage和editlog文件加载到命名空间中,并将更新的命名空间保存到新的fsimage文件中。最后开启新的 editlog inprogress文件,用于记录新的操作。

public synchronized void saveNamespace(FSNamesystem source,NameNodeFile nnf,Canceler canceler)throws IOException{
boolean editLogwasopen=editLog.issegmentopen();if(editLogWasOpen){
//将当前editinprogress文件关闭,并重命名
editLog.endCurrentLogSegment(true);
long imageTxId=getLastAppliedorwrittenTxId();
try{
//调用saveFSImageInA11Dirs()方法将当前的命名空间保存到新的fsimage文件中
saveFSImageInA11Dirs(source,nnf,imageTxId,canceler);storage.writeAl1();
}finally{
if(editLogwasOpen){
//开启新的editlog_inprogress文件用于记录请求
editLog.startLogSegment(imageTxId+1,true);storage.writeTransactionIdFileToStorage(imageTxId +1);
}

将 fsimage 保存成功后,doCheckpoint()方法就会调用 TransferFsImage.uploadlmageFrom Storage()方法将新生成的fsimage文件上传到Active Namenode中了至此,HDFS中完整的检査点流程就介绍完了。