Watcher机制原理

我们知道ZooKeeper提供了分布式数据的发布/订阅功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。在ZooKeeper中,引入了Watcher机制实现这种分布式的通知功能。
ZooKeeper 允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

图片说明

ZooKeeper的Watcher机制主要包括客户端线程客户端WatchManagerZooKeeper服务器三部分。在具体工作流程上,客户端在向ZooKeeper 服务器注册Watcher的同时,会将Watcher 对象存储在客户端的WatchManager中。当ZooKeeper服务器端触发Watcher事件后,会向客户端发送通知,客户端线程从WatchManager中取出对应的Watcher对象来执行process方法的回调逻辑。

Watcher特性

一次性

无论是服务端还是客户端,一旦一个Watcher被触发,ZooKeeper都会将其从相应的存储中移除。因此,开发人员在Watcher的使用上要记住的一点是需要反复注册。这样的设计有效地减轻了服务端的压力。试想,如果注册一个Watcher之后一直有效,那么,针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。

客户端串行执行

客户端 Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。

轻量

WatchedEvent是ZooKeeper整个Watcher通知机制的最小通知单元,这个数据结构中只包含三部分内容:通知状态事件类型节点路径。也就是说,Watcher通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对NodeDataChanged事件,ZooKeeper的Watcher只会通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据一—这也是ZooKeeper的Watcher机制的一个非常重要的特性。
另外,客户端向服务端注册 Watcher的时候,并不会把客户端真实的Watcher对象传递到服务端,仅仅只是在客户端请求中使用boolean类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的ServerCnxn对象
如此轻量的Watcher机制设计,在网络开销和服务端内存开销上都是非常廉价的。

Watcher 事件

同一个事件类型在不同的通知状态中代表的含义有所不同,如图:
图片说明

NodeDataChanged事件:此处说的变更包括节点的数据内容和数据的版本号 dataVersion。因此,即使使用相同的数据内容来更新,还是会触发这个事件通知,因为对于ZooKeeper来说,无论数据内容是否变更,一旦有客户端调用了数据更新的接口,且更新成功,就会更新dataVersion值。
NodeChildrenChanged事件:会在数据节点的子节点列表发生变更的时候被触发,这里说的子节点列表变化特指子节点个数和组成情况的变更,即新增子节点或删除子节点,而子节点内容的变化是不会触发这个事件的。

回调方法process()

process方法是Watcher接口中的一个回调方法,当Zookeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。
process方法定义如下:
abstract public void process(WatchedEvent event);
方法的参数定义:WatchedEvent。
WatchedEvent 包含了每一个事件的三个基本属性:通知状态(keeperState)事件类型(eventType)节点路径(path)。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。

图片说明

提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输,其数据结构如图所示。

图片说明

服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent事件还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。
需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对Zookeeper服务端事件的封装都是及其简单的。举个例子来说,当/zk-book 这个节点的数据发生变更时,服务端会发送给客户端一个“ZNode数据内容变更”事件,客户端只能接收到如下信息:

KeeperState:SyncConnected
EventType:NodeDataChanged
Path:/zk-book

从上面展示的信息中,我们可以看到,客户端无法直接从该时间的原始数据内容以及变更后的新数据内容,而是需要客户端再次主动去重新获取数据。

工作机制

ZooKeeper的Watcher机制,总的来说可以概括为以下三个过程:客户端注册Watcher服务端处理Watcher客户端回调 Watcher

客户端注册 Watcher

我们提到在创建一个ZooKeeper客户端对象实例时,可以向构造方法中传入一个默认的Watcher:
public Zookeeper(String connectstring,int sessionTimeout,Watcher watcher);
这个Watcher将作为整个ZooKeeper会话期间的默认Watcher,会一直被保存在客户端ZKWatchManager 的 defaultwatcher 中。另外,ZooKeeper客户端也可以通过getDatagetChildrenexist三个接口来向ZooKeeper 服务器注册Watcher,无论使用哪种方式,注册Watcher的工作原理都是一致的,这里我们以getData这个接口为例来说明。getData接口用于获取指定节点的数据内容,主要有两个方法:
public byte[]getData(String path,boolean watch,Stat stat)
public byte[]getData(final String path,Watcher watcher,Stat stat)
在这两个接口上都可以进行Watcher的注册,第一个接口通过一个boolean参数来标识是否使用上文中提到的默认Watcher来进行注册,具体的注册逻辑和第二个接口是一致的。
在向getData接口注册Watcher后,客户端首先会对当前客户端请求 request进行标记,将其设置为“使用Watcher监听”,同时会封装一个Watcher的注册信息WatchRegistration对象,用于暂时保存数据节点的路径和Watcher的对应关系,具体的逻辑代码如下:

public Stat getData(final String path, Watcher watcher, Stat stat)
…
WatchRegistration wcb=null; if(watcher!=null){
wcb=new DatawatchRegistration(watcher, clientPath);
}
request. setwatch(watcher!=null); ReplyHeader r=cnxn. submitRequest(h, request, response, wcb);
..…
}

在ZooKeeper中,Packet可以被看作一个最小的通信协议单元,用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个Packet对象。因此,在ClientCnxn中WatchRegistration又会被封装到Packet中去,然后放入发送队列中等待客户端发送:

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration){
Packet packet=null;
..…
synchronized(outgoingQueue){
packet=new Packet(h,r, request, response, watchRegistration);
.·
outgoingQueue. add(packet);
.…
}

随后,ZooKeeper客户端就会向服务端发送这个请求,同时等待请求的返回。完成请求发送后,会由客户端SendThread线程的readResponse方法负责接收来自服务端的响应,finishPacket方***从Packet中取出对应的Watcher并注册到ZKWatchManager中去:

private void finishPacket(Packet p){
if(p.watchRegistration!=null){
p.watchRegistration.register(p.replyHeader.getErr());
}

从上面的内容中,我们已经了解到客户端已经将Watcher 暂时封装在了WatchRegistration对象中,现在就需要从这个封装对象中再次提取出Watcher来:

protected Map<String, Set-watcher>> getwatches(int rc){
return watchManager. datawatches;
}
public void register(int rc){
if(shouldAddwatch(rc)){
Map<String, Set<watchers> watches=getwatches(rc); synchronized(watches){
Set<Watcher> watchers =watches. get(clientPath); if(watchers==null){
watchers=new HashSet<watcher>(); watches. put(clientPath, watchers);
}
watchers. add(watcher);
}

在register方法中,客户端会将之前暂时保存的Watcher对象转交给ZKWatchManager,并最终保存到datawatches中去。ZKWatchManager.datawatches是一个Map<String,Set-watcher>>类型的数据结构,用于将数据节点的路径和Watcher对象进行一一映射后管理起来。整个客户端Watcher的注册流程如图所示。
图片说明

当客户端每调用一次getData()接口,就会注册上一个Watcher,但是这些Watcher实体并不会随着客户端的请求发送到服务端去,如果客户端注册的所有Watcher都被传递到服务端的话,那么服务端肯定会出现内存紧张或其他性能问题了,幸运的是,在ZooKeeper的设计中充分考虑到了这个问题。
在上面的流程中,我们提到把WatchRegistration封装到了Packet对象中去,但事实上,在底层实际的网络传输序列化过程中,并没有将 WatchRegistration对象完全地序列化到底层字节数组中去。为了证实这一点,我们可以看下Packet内部的序列化过程:

public void createBB(){
try{
ByteArrayoutputStream baos=new ByteArrayoutputStream(); BinaryoutputArchive boa=BinaryoutputArchive. getArchive(baos); boa. writeInt(-1,"1en");//We'1l fill this in later if(requestHeader!=null){
requestHeader. serialize(boa,"header");
}
if(request instanceof ConnectRequest){
request. serialize(boa,"connect");
//append "am-I-allowed-to-be-readonly"flag boa. writeBool(readOnly,"readonly");
} else if(request !=null){
request. serialize(boa,"request");
}}

从上面的代码片段中,我们可以看到,在Packet.createBB()方法中,ZooKeeper只会将requestHeader和request两个属性进行序列化,也就是说,尽管WatchRegistration 被封装在了Packet中,但是并没有被序列化到底层字节数组中去,因此也就不会进行网络传输了。

服务端处理Watcher

ServerCnxn存储
我们首先来看下服务端接收Watcher并将其存储起来的过程,如图所示是ZooKeeper服务端处理Watcher的序列图。

图片说明
从图中我们可以看到,服务端收到来自客户端的请求之后,在FinalRequest Processor.processRequest()中会判断当前请求是否需要注册 Watcher:

case OpCode. getData:{
..…
byte b[]=zks. getZKDatabase(). getData(getDataRequest. getPath(), stat, getDataRequest. getwatch()? cnxn: null); rsp=new GetDataResponse(b, stat); break;
}

从getData 请求的处理逻辑中,我们可以看到,当getDataRequest.getwatch()
为true的时候,ZooKeeper就认为当前客户端请求需要进行Watcher注册,于是就会将当前的ServerCnxn对象和数据节点路径传入getData方法中去。那么为什么要传入ServerCnxn呢?ServerCnxn是一个ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接。ServerCnxn接口的默认实现是NIOServerCnxn,同时从3.4.0版本开始,引入了基于Netty的实现:NettyServerCnxn。无论采用哪种实现方式,都实现了Watcher的process接口,因此我们可以把ServerCnxn看作是一个Watcher对象。数据节点的节点路径和ServerCnxn最终会被存储在WatchManager的watchTable和watch2Paths中。WatchManager是ZooKeeper 服务端 Watcher的管理者,其内部管理的watchTable和watchzPaths两个存储结构,分别从两个维度对watcher进行存储。

  • watchTable是从数据节点路径的粒度来托管Watcher。
  • watch2Paths是从Watcher的粒度来控制事件触发需要触发的数据节点。

同时,WatchManager还负责Watcher事件的触发,并移除那些已经被触发的Watcher。注意,WatchManager只是一个统称,在服务端,DataTree中会托管两个WatchManager,分别是datawatches和childwatches,分别对应数据变更Watcher和子节点变更Watcher。
在本例中,因为是getData接口,因此最终会被存储在datawatches中,其数据结构如图所示

图片说明

Watcher触发

在上面我们提到,NodeDataChanged事件的触发条件是“Watcher监听的对应数据节点的数据内容发生变更”,其具体实现如下:

public Stat setData(String path,byte data[],int version,long zxid,long time)throws KeeperException.NoNodeException{
Stats=new Stat();
DataNode n=nodes. get(path); if(n==null){
throw new KeeperException. NoNodeException();
}
byte lastdata[]=null; synchronized(n){ lastdata=n. data; n. data=data; n. stat. setMtime(time);n. stat. setMzxid(zxid); n. stat. setVersion(version); n. copystat(s);
}
//...…
dataMatches. triggerwatch(path, EventType. NodeDataChanged); returns;

在对指定节点进行数据更新后,通过调用WatchManager的triggerwatch方法来触发相关的事件:

public Setswatcher>triggerwatch(String path,EventType type){
return triggerwatch(path,type,null);
}
public Set<watcher> triggerwatch(String path,EventType type,Set-watcher>
supress){
WatchedEvent e=new WatchedEvent(type,KeeperState.SyncConnected,path);HashSet<Watcher>watchers;synchronized(this){
watchers=watchTable.remove(path);
//.……
//如果不存在Watcher,直接返回
for(Watcher w:watchers){
HashSet<String>paths=watch2Paths.get(w);if(paths!=null){
paths.remove(path);
}}
for(Watcher w:watchers){
if(supress!=null&&supress.contains(w)){
continue;
}
w.process(e);
}
return watchers;

无论是datawatches还是childwatches管理器,Watcher的触发逻辑都是一致的,基本步骤如下。

  • 1.封装WatchedEvent。
    首先将通知状态(KeeperState)、事件类型(EventType)以及节点路径(Path)
    封装成一个WatchedEvent对象。
  • 2.查询Watcher。
    根据数据节点的节点路径从watchTable中取出对应的Watcher。如果没有找到Watcher,说明没有任何客户端在该数据节点上注册过Watcher,直接退出。而如果找到了这个Watcher,会将其提取出来,同时会直接从watchTable和watch2Paths中将其删除——从这里我们也可以看出,Watcher在服务端是一次性的,即触发一次就失效了。
  • 3.调用process方法来触发Watcher。
    在这一步中,会逐个依次地调用从步骤2中找出的所有Watcher的process方法。那么这里的process方法究竟做了些什么呢?在上文中我们已经提到,对于需要注册Watcher的请求,ZooKeeper会把当前请求对应的ServerCnxn作为一个Watcher进行存储,因此,这里调用的process方法,事实上就是ServerCnxn的对应方法:
public class NIOServerCnxn extends ServerCnxn{
//...
synchronized public void process(WatchedEvent event){
ReplyHeader h=new ReplyHeader(-1,-1L,0);
//.……
//Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e=event. getwrapper(); sendResponse(h,e,"notification");
}
//..…

从上面的代码片段中,我们可以看出在process方法中,主要逻辑如下。

  • 在请求头中标记“-1”,表明当前是一个通知。
  • 将WatchedEvent包装成WatcherEvent,以便进行网络传输序列化。
  • 向客户端发送该通知。

从以上几个步骤中可以看到,ServerCnxn的process方法中的逻辑非常简单,本质上并不是处理客户端Watcher真正的业务逻辑,而是借助当前客户端连接的ServerCnxn 对象来实现对客户端的WatchedEvent传递,真正的客户端Watcher回调与业务逻辑执行都在客户端。
客户端回调Watcher上面我们已经讲解了服务端是如何进行Watcher触发的,并且知道了最终服务端会通过使用ServerCnxn对应的TCP连接来向客户端发送一个WatcherEvent事件,下面我们来看看客户端是如何处理这个事件的。

SendThread接收事件通知

首先我们来看下ZooKeeper客户端是如何接收这个客户端事件通知的:

class SendThread extends Thread{
//..…
void readResponse(ByteBuffer incomingBuffer)throws IOException{
//.…
if(replyHdr.getXid()==-1){
//-1 means notification
//.……
WatcherEvent event=new WatcherEvent();event.deserialize(bbia,"response");
//convert from a server path to a client path if(chrootPath!=null){
String serverPath=event.getPath();if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");else if(serverPath.tength()>chrootPath.tength())
event.setPath(serverPath.substring(chrootPath.length()));
//…
WatchedEvent we=new WatchedEvent(event);
//...
eventThread.queueEvent(we);return;}
//...

对于一个来自服务端的响应,客户端都是由SendThread.readResponse(ByteBuffer incomingBuffer)方法来统一进行处理的,如果响应头replyHdr中标识了XID为-1,表明这是一个通知类型的响应,对其的处理大体上分为以下4个主要步骤。

  • 1.反序列化。
    ZooKeeper客户端接到请求后,首先会将字节流转换成WatcherEvent对象。
  • 2.处理chrootPath。
    如果客户端设置了chrootPath属性,那么需要对服务端传过来的完整的节点路径进行chrootPath处理,生成客户端的一个相对节点路径。例如客户端设置了chrootPath为/appl,那么针对服务端传过来的响应包含的节点路径为/appl/locks,经过chrootPath处理后,就会变成一个相对路径:/locks。
  • 3.还原WatchedEvent。
    在本节的“回调方法process()部分”中提到,process接口的参数定义是WatchedEvent,因此这里需要将 WatcherEvent对象转换成Watched Event。
  • 4.回调Watcher。
    最后将WatchedEvent对象交给EventThread线程,在下一个轮询周期中进行Watcher回调。

EventThread处理事件通知

在上面内容中我们讲到,服务端的Watcher事件通知,最终交给了EventThread线程来处理,现在我们就来看看EventThread的一些核心逻辑。EventThread线程是ZooKeeper客户端中专门用来处理服务端通知事件的线程,其数据结构如图:

图片说明
在上文中,我们讲到SendThread接收到服务端的通知事件后,会通过调用EventThread.queueEvent方法将事件传给EventThread线程,其逻辑如下:

public void queueEvent(WatchedEvent event){
if(event.getType()==EventType.None
&& sessionState==event.getState()){
return;sessionState=event.getState();
//materialize the watchers based on the event WatcherSetEventPair pair=new WatcherSetEventPair(
watcher.materialize(event.getState(),event.getType(),event.getPath()),event);
//queue the pair(watch set&event)for later processing waitingEvents.add(pair);
}

queueEvent方法首先会根据该通知事件,从ZKWatchManager中取出所有相关的Watcher:

public Set<watcher>materialize(Watcher. Event. KeeperState state, Watcher. Event. EventType type, String clientPath){
Set<Watcher>result=new HashSet<watcher>(); switch(type){
//..…
case NodeDataChanged: case NodeCreated: synchronized(datawatches){
addTo(datawatches. remove(clientPath), result);
}
synchronized(existwatches){
addTo(existwatches. remove(clientPath), result);
}
break;//....
return result;
}
final private void addTo(Set-Watcher> from, Set-watcher> to){
if(froml=null){
to. addALL(from);
}}

客户端在识别出事件类型EventType后,会从相应的Watcher 存储(即datawatches、existwatches或childwatches中的一个或多个,本例中就是从datawatches和existwatches两个存储中获取)中去除对应的Watcher。
注意,此处使用的是remove接口,因此也表明了客户端的Watcher机制同样也是一次性的,即一旦被触发后,该Watcher就失效了。
获取到相关的所有Watcher之后,会将其放入waitingEvents这个队列中去。
WaitingEvents是一个待处理Watcher的队列,EventThready run方法不断对该队列进行处理:

public void run(){
try{
isRunning=true;while(true){
Object event=waitingEvents.take();if(event==eventofDeath){
wasKilled=true;
}else{
processEvent(event);
//..…
private void processEvent(Object event){
try{
if(event instanceof WatcherSetEventPair){
//each watcher will process the event WatcherSetEventPair pair=(WatcherSetEventPair)event;for(Watcher watcher:pair.watchers){
try{
watcher.process(pair.event);
}catch(Throwablet){
//.……

从上面的代码片段中我们可以看出,EventThread线程每次都会从waiting Events队列中取出一个Watcher,并进行串行同步处理。注意,此处processEvent方法中的Watcher 才是之前客户端真正注册的Watcher,调用其process方法就可以实现Watcher的回调了。