ZooKeeper 典型的应用场景

Zookeeper 从设计模式角度来看,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应,从而实现集群中类似 Master/Slave 管理模式,关于 Zookeeper 的详细架构等内部细节可以阅读 Zookeeper 的源码

下面详细介绍这些典型的应用场景,也就是 Zookeeper 到底能帮我们解决那些问题?下面将给出答案。

统一命名服务(Name Service)

分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住,通常情况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复。说到这里你可能想到了 JNDI,没错 Zookeeper 的 Name Service 与 JNDI 能够完成的功能是差不多的,它们都是将有层次的目录结构关联到一定资源上,但是 Zookeeper 的 Name Service 更加是广泛意义上的关联,也许你并不需要将名称关联到特定资源上,你可能只需要一个不会重复名称,就像数据库中产生一个唯一的数字主键一样。

Name Service 已经是 Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就可以很容易创建一个目录节点。

配置管理(Configuration Management)

配置的管理在分布式应用环境中很常见,例如同一个应用系统需要多台 PC Server 运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的 PC Server,这样非常麻烦而且容易出错。

像这样的配置信息完全可以交给 Zookeeper 来管理,将配置信息保存在 Zookeeper 的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中。

图 2. 配置管理结构图
图 2. 配置管理结构图

集群管理(Group Membership)

Zookeeper 能够很容易的实现集群管理的功能,如有多台 Server 组成一个服务集群,那么必须要一个“总管”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台 Server,同样也必须让“总管”知道。

Zookeeper 不仅能够帮你维护当前的集群中机器的服务状态,而且能够帮你选出一个“总管”,让这个总管来管理集群,这就是 Zookeeper 的另一个功能 Leader Election。

它们的实现方式都是在 Zookeeper 上创建一个 EPHEMERAL 类型的目录节点,然后每个 Server 在它们创建目录节点的父目录节点上调用 getChildren(String path, boolean watch) 方法并设置 watch 为 true,由于是 EPHEMERAL 目录节点,当创建它的 Server 死去,这个目录节点也随之被删除,所以 Children 将会变化,这时 getChildren上的 Watch 将会被调用,所以其它 Server 就知道已经有某台 Server 死去了。新增 Server 也是同样的原理。

Zookeeper 如何实现 Leader Election,也就是选出一个 Master Server。和前面的一样每台 Server 创建一个 EPHEMERAL 目录节点,不同的是它还是一个 SEQUENTIAL 目录节点,所以它是个 EPHEMERAL_SEQUENTIAL 目录节点。之所以它是 EPHEMERAL_SEQUENTIAL 目录节点,是因为我们可以给每台 Server 编号,我们可以选择当前是最小编号的 Server 为 Master,假如这个最小编号的 Server 死去,由于是 EPHEMERAL 节点,死去的 Server 对应的节点也被删除,所以当前的节点列表中又出现一个最小编号的节点,我们就选择这个节点为当前 Master。这样就实现了动态选择 Master,避免了传统意义上单 Master 容易出现单点故障的问题。

图 3. 集群管理结构图
图 3. 集群管理结构图

这部分的示例代码如下,完整的代码请看附件:

清单 3. Leader Election 关键代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void findLeader() throws InterruptedException {
        byte[] leader = null;
        try {
            leader = zk.getData(root + "/leader", true, null);
        } catch (Exception e) {
            logger.error(e);
        }
        if (leader != null) {
            following();
        } else {
            String newLeader = null;
            try {
                byte[] localhost = InetAddress.getLocalHost().getAddress();
                newLeader = zk.create(root + "/leader", localhost,
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            } catch (Exception e) {
                logger.error(e);
            }
            if (newLeader != null) {
                leading();
            } else {
                mutex.wait();
            }
        }
    }

同步锁(Locks)

同步锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

图 4. Zookeeper 实现 Locks 的流程图
图 4. Zookeeper 实现 Locks 的流程图

同步锁的实现代码如下,完整的代码请看附件:

清单 4. 同步锁的关键代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void getLock() throws KeeperException, InterruptedException{
        List< String > list = zk.getChildren(root, false);
        String[] nodes = list.toArray(new String[list.size()]);
        Arrays.sort(nodes);
        if(myZnode.equals(root+"/"+nodes[0])){
            doAction();
        }
        else{
            waitForLock(nodes[0]);
        }
    }
    void waitForLock(String lower) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        if(stat != null){
            mutex.wait();
        }
        else{
            getLock();
        }
    }

队列管理

Zookeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 Zookeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start。

用下面的流程图更容易理解:

图 5. 同步队列流程图
图 5. 同步队列流程图

同步队列的关键代码如下,完整的代码请看附件:

清单 5. 同步队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void addQueue() throws KeeperException, InterruptedException{
        zk.exists(root + "/start",true);
        zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
        CreateMode.EPHEMERAL_SEQUENTIAL);
        synchronized (mutex) {
            List< String > list = zk.getChildren(root, false);
            if (list.size() < size) {
                mutex.wait();
            } else {
                zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT);
            }
        }
}

当队列没满是进入 wait(),然后会一直等待 Watch 的通知,Watch 的代码如下:

1
2
3
4
5
6
7
8
public void process(WatchedEvent event) {
        if(event.getPath().equals(root + "/start") &&
         event.getType() == Event.EventType.NodeCreated){
            System.out.println("得到通知");
            super.process(event);
            doAction();
        }
    }

FIFO 队列用 Zookeeper 实现思路如下:

实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO。

下面是生产者和消费者这种队列形式的示例代码,完整的代码请看附件:

清单 6. 生产者代码
1
2
3
4
5
6
7
8
9
boolean produce(int i) throws KeeperException, InterruptedException{
        ByteBuffer b = ByteBuffer.allocate(4);
        byte[] value;
        b.putInt(i);
        value = b.array();
        zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT_SEQUENTIAL);
        return true;
    }
清单 7. 消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int consume() throws KeeperException, InterruptedException{
        int retvalue = -1;
        Stat stat = null;
        while (true) {
            synchronized (mutex) {
                List< String > list = zk.getChildren(root, true);
                if (list.size() == 0) {
                    mutex.wait();
                } else {
                    Integer min = new Integer(list.get(0).substring(7));
                    for(String s : list){
                        Integer tempValue = new Integer(s.substring(7));
                        if(tempValue < min) min = tempValue;
                    }
                    byte[] b = zk.getData(root + "/element" + min,false, stat);
                    zk.delete(root + "/element" + min, 0);
                    ByteBuffer buffer = ByteBuffer.wrap(b);
                    retvalue = buffer.getInt();
                    return retvalue;
                }
            }
        }
}

总结

Zookeeper 作为 Hadoop 项目中的一个子项目,是 Hadoop 集群管理的一个必不可少的模块,它主要用来控制集群中的数据,如它管理 Hadoop 集群中的 NameNode,还有 Hbase 中 Master Election、Server 之间状态同步等。

本文介绍的 Zookeeper 的基本知识,以及介绍了几个典型的应用场景。这些都是 Zookeeper 的基本功能,最重要的是 Zoopkeeper 提供了一套很好的分布式集群管理的机制,就是它这种基于层次型的目录树的数据结构,并对树中的节点进行有效管理,从而可以设计出多种多样的分布式的数据管理模型,而不仅仅局限于上面提到的几个常用应用场景。

zookeeper分布式锁与分布式队列实战

下面再深入探索一下两种比较常用的场景,一个是分布式锁,一个是分布式队列。首先先使用Java API连接zookeeper

6.1 Java 使用zookeeper

客户端要连接 Zookeeper服务器可以通过创建 org.apache.zookeeper.ZooKeeper 的一个实例对象,然后调用这个类提供的接口来和服务器交互。

ZooKeeper 主要是用来维护和监控一个目录节点树中存储的数据的状态,所有我们能够操作 ZooKeeper 和操作目录节点树大体一样,如创建一个目录节点,给某个目录节点设置数据,获取某个目录节点的所有子目录节点,给某个目录节点设置权限和监控这个目录节点的状态变化。

下面通过代码实例,来熟悉一下JavaAPI的常用方法。

[java]  view plain  copy
  1. import java.util.List;  
  2.   
  3. import org.apache.zookeeper.CreateMode;  
  4. import org.apache.zookeeper.WatchedEvent;  
  5. import org.apache.zookeeper.Watcher;  
  6. import org.apache.zookeeper.ZooDefs.Ids;  
  7. import org.apache.zookeeper.ZooKeeper;  
  8. import org.apache.zookeeper.data.Stat;  
  9.   
  10. public class ZkTest {  
  11.   
  12.     private static final String CONNECT_STRING = "127.0.0.1:2181";  
  13.     private static final int SESSION_TIMEOUT = 3000;  
  14.   
  15.     public static void main(String[] args) throws Exception {  
  16.   
  17.         // 定义一个监控所有节点变化的Watcher  
  18.         Watcher allChangeWatcher = new Watcher() {  
  19.             @Override  
  20.             public void process(WatchedEvent event) {  
  21.                 System.out.println("**watcher receive WatchedEvent** changed path: " + event.getPath()  
  22.                         + "; changed type: " + event.getType().name());  
  23.             }  
  24.         };  
  25.   
  26.         // 初始化一个与ZK连接。三个参数:  
  27.         // 1、要连接的服务器地址,"IP:port"格式;  
  28.         // 2、会话超时时间  
  29.         // 3、节点变化监视器  
  30.         ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, allChangeWatcher);  
  31.   
  32.         // 新建节点。四个参数:1、节点路径;2、节点数据;3、节点权限;4、创建模式  
  33.         zk.create("/myName""chenlongfei".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  34.         System.out.println("create new node '/myName'");  
  35.   
  36.         // 判断某路径是否存在。两个参数:1、节点路径;2、是否监控(Watcher即初始化ZooKeeper时传入的Watcher)  
  37.         Stat beforSstat = zk.exists("/myName"true);  
  38.         System.out.println("Stat of '/myName' before change : " + beforSstat.toString());  
  39.   
  40.         // 修改节点数据。三个参数:1、节点路径;2、新数据;3、版本,如果为-1,则匹配任何版本  
  41.         Stat afterStat = zk.setData("/myName""clf".getBytes(), -1);  
  42.         System.out.println("Stat of '/myName' after change: " + afterStat.toString());  
  43.   
  44.         // 获取所有子节点。两个参数:1、节点路径;2、是否监控该节点  
  45.         List<String> children = zk.getChildren("/"true);  
  46.         System.out.println("children of path '/': " + children.toString());  
  47.   
  48.         // 获取节点数据。三个参数:1、节点路径;2、书否监控该节点;3、版本等信息可以通过一个Stat对象来指定  
  49.         byte[] nameByte = zk.getData("/myName"truenull);  
  50.         String name = new String(nameByte, "UTF-8");  
  51.         System.out.println("get data from '/myName': " + name);  
  52.   
  53.         // 删除节点。两个参数:1、节点路径;2、 版本,-1可以匹配任何版本,会删除所有数据  
  54.         zk.delete("/myName", -1);  
  55.         System.out.println("delete '/myName'");  
  56.   
  57.         zk.close();  
  58.     }  

运行程序,打印结果如下:

更详细的API请参考官方网站。

 

Zookeeper 从设计模式角度来看,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。

下面通过两个ZooKeeper的典型用用场景来体会下ZooKeeper的特性与使用方法。


6.2 zookeeper分布式锁详解

先来回顾一下多线程中的锁控制。

[java]  view plain  copy
  1. public class MultiThreadTest {  
  2.   
  3.     // 以一个静态变量来模拟公共资源  
  4.     private static int counter = 0;  
  5.   
  6.     // 多线程环境下,会出现并发问题  
  7.     public static void plus() {  
  8.           
  9.         // 计数器加一  
  10.         counter++;  
  11.           
  12.         // 线程随机休眠数毫秒,模拟现实中的耗时操作  
  13.         int sleepMillis = (int) (Math.random() * 100);  
  14.         try {  
  15.             Thread.sleep(sleepMillis);  
  16.         } catch (InterruptedException e) {  
  17.             e.printStackTrace();  
  18.         }  
  19.     }  
  20.   
  21.     // 线程实现类  
  22.     static class CountPlus extends Thread {  
  23.         @Override  
  24.         public void run() {  
  25.             for (int i = 0; i < 20; i++) {  
  26.                 plus();  
  27.             }  
  28.             System.out.println(Thread.currentThread().getName() + "执行完毕:" + counter);  
  29.         }  
  30.   
  31.         public CountPlus(String threadName) {  
  32.             super(threadName);  
  33.         }  
  34.   
  35.     }  
  36.   
  37.     public static void main(String[] args) throws Exception {  
  38.   
  39.         // 开启五个线程  
  40.         CountPlus threadA = new CountPlus("threadA");  
  41.         threadA.start();  
  42.   
  43.         CountPlus threadB = new CountPlus("threadB");  
  44.         threadB.start();  
  45.   
  46.         CountPlus threadC = new CountPlus("threadC");  
  47.         threadC.start();  
  48.   
  49.         CountPlus threadD = new CountPlus("threadD");  
  50.         threadD.start();  
  51.   
  52.         CountPlus threadE = new CountPlus("threadE");  
  53.         threadE.start();  
  54.     }  
  55. }  

 上例中,开启了五个线程,每个线程通过plus()方法对静态变量counter分别进行20次累加,预期counter最后会变成100。运行程序:


可以发现,五个线程执行完毕之后,counter并没有变成100。plus()方法涉及到对公共资源的改动,但是并没有对它进行同步控制,可能会造成多个线程同时对公共资源发起改动,进而出现并发问题。问题的根源在于,上例中没有保证同一时刻只能有一个线程可以改动公共资源。

给plus()方法加上synchronized关键字,重新运行程序:

可见,最终达到了预期结果。

synchronized关键字的作用是对plus()方法加入锁控制,一个线程想要执行该方法,首先需要获得锁(锁是唯一的),执行完毕后,再释放锁。如果得不到锁,该线程会进入等待池中等待,直到抢到锁才能继续执行。这样就保证了同一时刻只能有一个线程可以改动公共资源,避免了并发问题。

 

共享锁在同一个进程中很容易实现,可以靠Java本身提供的同步机制解决,但是在跨进程或者在不同 Server 之间就不好实现了,这时候就需要一个中间人来协调多个Server之间的各种问题,比如如何获得锁/释放锁、谁先获得锁、谁后获得锁等。


ZooKeeper Watcher机制

ZooKeeper是用来协调(同步)分布式进程的服务,提供了一个简单高性能的协调内核,用户可以在此之上构建更多复杂的分布式协调功能。

多个分布式进程通过ZooKeeper提供的API来操作共享的ZooKeeper内存数据对象ZNode来达成某种一致的行为或结果,这种模式本质上是基于状态共享的并发模型,与Java的多线程并发模型一致,他们的线程或进程都是”共享式内存通信“。Java没有直接提供某种响应式通知接口来监控某个对象状态的变化,只能要么浪费CPU时间毫无响应式的轮询重试,或基于Java提供的某种主动通知(Notif)机制(内置队列)来响应状态变化,但这种机制是需要循环阻塞调用。而ZooKeeper实现这些分布式进程的状态(ZNode的Data、Children)共享时,基于性能的考虑采用了类似的异步非阻塞的主动通知模式即Watch机制,使得分布式进程之间的“共享状态通信”更加实时高效,其实这也是ZooKeeper的主要任务决定的—协调。Consul虽然也实现了Watch机制,但它是阻塞的长轮询。

  • ZooKeeper VS JVM

从某种角度来说,可以这样对比(个人看法,可以讨论),ZooKeeper对等于JVM,ZooKeeper包含状态对象(ZNode)和分布式进程的底层执行引擎Zab,而JVM内部包含堆(多线程共享的大量对象存放区域)和多线程执行正确性约束规范JMM(Java内存模型),JMM确保了多线程的执行顺序是正确的。Zab协议使得ZooKeeper的内部修改状态操作直接是有序串行的,而JVM内部则是乱序并行的,需要添加额外的机制才能保证时序(内存屏障、处理器原子指令),而状态读取时,JVM和ZooKeeper都存在直接读取时读到旧数据,但ZooKeeper有Watch机制使得响应式读取更高效,而JVM只能使用底层的内存屏障刷新共享状态,以便其他线程再次读取时获得正确的新数据。

ZooKeeper提供的接口使得所有的分布式进程的执行都是异步非阻塞的(WaitFree算法),内部是基于Version的CAS操作,而JVM提供了阻塞的和非阻塞的多种接口,有Synchronized、Volatile、AtomicOperations。基于接口之上构建线程或分布式进程之间更复杂的同步或协调功能时,Java并发库直接提供了闭锁、循环栅栏、信号量等同步工具以及基础的抽象队列同步器,而ZooKeeper则需要用户基于接口自行构建各种分布式协调功能(分布式锁、分布式发布订阅、集群成员关系管理)。如下图:

  ZooKeeper JVM
共享状态对象 ZNode 堆中对象
底层执行模式 Zab顺序执行 多处理器并发执行(内存屏障、原子机器指令)
API接口 Get、Watch_Get、Cas_Set、Exist Synchronized、volatile、final、Atomic
协调或同步功能 分布式发布订阅、锁、读写锁 并发库同步工具、基于抽象队列同步器构建的同步组件
  • ZooKeeper的Watch架构

Watch的整体流程如下图所示,客户端先向ZooKeeper服务端成功注册想要监听的节点状态,同时客户端本地会存储该***相关的信息在WatchManager中,当ZooKeeper服务端监听的数据状态发生变化时,ZooKeeper就会主动通知发送相应事件信息给相关会话客户端,客户端就会在本地响应式的回调相关Watcher的Handler。

  • ZooKeeper的Watch特性
  1. Watch是一次性的,每次都需要重新注册,并且客户端在会话异常结束时不会收到任何通知,而快速重连接时仍不影响接收通知。
  2. Watch的回调执行都是顺序执行的,并且客户端在没有收到关注数据的变化事件通知之前是不会看到最新的数据,另外需要注意不要在Watch回调逻辑中阻塞整个客户端的Watch回调
  3. Watch是轻量级的,WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径。ZooKeeper服务端只会通知客户端发生了什么,并不会告诉具体内容。
  • Watcher接口设计


如上图所示,Watch被设计成一个接口,任何实现了Watcher接口的类就是一个新的Watcher,Watcher内部包含2个枚举类,一个KeeperState,表示当事件发生时ZooKeeper的状态,另一个是事件发生的类型,主要分为2类(一类是ZNode内容的变化,另一类是ZNode子节点的变化),具体的描述见下表。

KeeperState EventType TriggerCondition EnableCalls Desc

SyncConnected

(3)

None

(-1)

客户端与服务器成功建立会话   此时客户端与服务器处于连接状态
同上 NodeCreated

(1)

Watcher监听的对应数据节点被创建 Exists 同上
同上 NodeDeleted

(2)

Watcher监听的对应数据节点被删除 Exists, GetData, and GetChildren 同上
同上 NodeDataChanged

(3)

Watcher监听的数据节点的数据内容和数据版本号发生变化 Exists and GetData 同上
同上 NodeChildrenChanged

(4)

Watcher监听的数据节点的子节点列表发生变化,子节点内容变化不会触发 GetChildren 同上
Disconnected

(0)

None

(-1)

客户端与ZooKeeper服务器断开连接   此时客户端与服务器处于断开连接的状态
Expried

(-112)

None

(-1)

会话超时   此时客户端会话失效,通常同时也会收到SessionExpiredException异常
AuthFailed

(4)

None

(-1)

通常有两种情况:

1.使用错误的scheme进行权限检查

2.SASL权限检查失败

  收到AuthFailedException异常
  • WatchEvent的设计


如上图所示,WatchEvent有2种表示模式,一种是逻辑表示即WatchedEvent,是直接封装了各种抽象的逻辑状态(KeeperState,EventType),适用于客户端和服务端各自内部处理,另一种是物理表示即封装的更多是底层基础的传输数据结构(int,String),并且实现了序列化接口,主要用来做底层的数据传输。



借助Zookeeper 可以实现这种分布式锁:需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren()方法获取列表中最小的目录节点,如果最小节点就是自己创建的目录节点,那么它就获得了这个锁,如果不是那么它就调用 exists() 方法并监控前一节点的变化,一直到自己创建的节点成为列表中最小编号的目录节点,从而获得锁。释放锁很简单,只要删除它自己所创建的目录节点就行了。

流程图如下:

 

下面我们对刚才的代码进行改造,不用synchronize关键字而是使用ZooKeeper达到锁控制的目的,模拟分布式锁的实现。 

[java]  view plain  copy
  1. import java.util.Collections;  
  2. import java.util.List;  
  3.   
  4. import org.apache.zookeeper.CreateMode;  
  5. import org.apache.zookeeper.KeeperException;  
  6. import org.apache.zookeeper.WatchedEvent;  
  7. import org.apache.zookeeper.Watcher;  
  8. import org.apache.zookeeper.ZooDefs.Ids;  
  9. import org.apache.zookeeper.ZooKeeper;  
  10. import org.apache.zookeeper.data.Stat;  
  11.   
  12. public class ZkDistributedLock {  
  13.   
  14.     // 以一个静态变量来模拟公共资源  
  15.     private static int counter = 0;  
  16.   
  17.     public static void plus() {  
  18.   
  19.         // 计数器加一  
  20.         counter++;  
  21.   
  22.         // 线程随机休眠数毫秒,模拟现实中的费时操作  
  23.         int sleepMillis = (int) (Math.random() * 100);  
  24.         try {  
  25.             Thread.sleep(sleepMillis);  
  26.         } catch (InterruptedException e) {  
  27.             e.printStackTrace();  
  28.         }  
  29.     }  
  30.   
  31.     // 线程实现类  
  32.     static class CountPlus extends Thread {  
  33.   
  34.         private static final String LOCK_ROOT_PATH = "/Locks";  
  35.         private static final String LOCK_NODE_NAME = "Lock_";  
  36.   
  37.         // 每个线程持有一个zk客户端,负责获取锁与释放锁  
  38.         ZooKeeper zkClient;  
  39.   
  40.         @Override  
  41.         public void run() {  
  42.   
  43.             for (int i = 0; i < 20; i++) {  
  44.   
  45.                 // 访问计数器之前需要先获取锁  
  46.                 String path = getLock();  
  47.   
  48.                 // 执行任务  
  49.                 plus();  
  50.   
  51.                 // 执行完任务后释放锁  
  52.                 releaseLock(path);  
  53.             }  
  54.               
  55.             closeZkClient();  
  56.             System.out.println(Thread.currentThread().getName() + "执行完毕:" + counter);  
  57.         }  
  58.   
  59.         /** 
  60.          * 获取锁,即创建子节点,当该节点成为序号最小的节点时则获取锁 
  61.          */  
  62.         private String getLock() {  
  63.             try {  
  64.                 // 创建EPHEMERAL_SEQUENTIAL类型节点  
  65.                 String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,  
  66.                         Thread.currentThread().getName().getBytes(), Ids.OPEN_ACL_UNSAFE,  
  67.                         CreateMode.EPHEMERAL_SEQUENTIAL);  
  68.                 System.out.println(Thread.currentThread().getName() + " create path : " + lockPath);  
  69.   
  70.                 // 尝试获取锁  
  71.                 tryLock(lockPath);  
  72.   
  73.                 return lockPath;  
  74.             } catch (Exception e) {  
  75.                 e.printStackTrace();  
  76.             }  
  77.             return null;  
  78.         }  
  79.   
  80.         /** 
  81.          * 该函数是一个递归函数 如果获得锁,直接返回;否则,阻塞线程,等待上一个节点释放锁的消息,然后重新tryLock 
  82.          */  
  83.         private boolean tryLock(String lockPath) throws KeeperException, InterruptedException {  
  84.   
  85.             // 获取LOCK_ROOT_PATH下所有的子节点,并按照节点序号排序  
  86.             List<String> lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);  
  87.             Collections.sort(lockPaths);  
  88.   
  89.             int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));  
  90.             if (index == 0) { // lockPath是序号最小的节点,则获取锁  
  91.                 System.out.println(Thread.currentThread().getName() + " get lock, lockPath: " + lockPath);  
  92.                 return true;  
  93.             } else { // lockPath不是序号最小的节点  
  94.   
  95.                 // 创建Watcher,监控lockPath的前一个节点  
  96.                 Watcher watcher = new Watcher() {  
  97.                     @Override  
  98.                     public void process(WatchedEvent event) {  
  99.                         System.out.println(event.getPath() + " has been deleted");  
  100.                         synchronized (this) {  
  101.                             notifyAll();  
  102.                         }  
  103.                     }  
  104.                 };  
  105.                 String preLockPath = lockPaths.get(index - 1);  
  106.                 Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);  
  107.   
  108.                 if (stat == null) { // 由于某种原因,前一个节点不存在了(比如连接断开),重新tryLock  
  109.                     return tryLock(lockPath);  
  110.                 } else { // 阻塞当前进程,直到preLockPath释放锁,重新tryLock  
  111.                     System.out.println(Thread.currentThread().getName() + " wait for " + preLockPath);  
  112.                     synchronized (watcher) {  
  113.                         watcher.wait();  
  114.                     }  
  115.                     return tryLock(lockPath);  
  116.                 }  
  117.             }  
  118.   
  119.         }  
  120.   
  121.         /** 
  122.          * 释放锁,即删除lockPath节点 
  123.          */  
  124.         private void releaseLock(String lockPath) {  
  125.             try {  
  126.                 zkClient.delete(lockPath, -1);  
  127.             } catch (InterruptedException | KeeperException e) {  
  128.                 e.printStackTrace();  
  129.             }  
  130.         }  
  131.   
  132.         public void setZkClient(ZooKeeper zkClient) {  
  133.             this.zkClient = zkClient;  
  134.         }  
  135.           
  136.         public void closeZkClient(){  
  137.             try {  
  138.                 zkClient.close();  
  139.             } catch (InterruptedException e) {  
  140.                 e.printStackTrace();  
  141.             }  
  142.         }  
  143.   
  144.         public CountPlus(String threadName) {  
  145.             super(threadName);  
  146.         }  
  147.     }  
  148.   
  149.     public static void main(String[] args) throws Exception {  
  150.   
  151.         // 开启五个线程  
  152.         CountPlus threadA = new CountPlus("threadA");  
  153.         setZkClient(threadA);  
  154.         threadA.start();  
  155.   
  156.         CountPlus threadB = new CountPlus("threadB");  
  157.         setZkClient(threadB);  
  158.         threadB.start();  
  159.   
  160.         CountPlus threadC = new CountPlus("threadC");  
  161.         setZkClient(threadC);  
  162.         threadC.start();  
  163.   
  164.         CountPlus threadD = new CountPlus("threadD");  
  165.         setZkClient(threadD);  
  166.         threadD.start();  
  167.   
  168.         CountPlus threadE = new CountPlus("threadE");  
  169.         setZkClient(threadE);  
  170.         threadE.start();  
  171.     }  
  172.   
  173.     public static void setZkClient(CountPlus thread) throws Exception {  
  174.         ZooKeeper zkClient = new ZooKeeper("127.0.0.1:2181"3000null);  
  175.         thread.setZkClient(zkClient);  
  176.     }  
  177.   
  178. }  

注意:运行程序之前需要创建“/Locks”作为存放锁信息的根节点。

一旦某个Server想要获得锁,就会在/Locks”下创建一个EPHEMERAL_SEQUENTIAL类型的名为“Lock_”子节点,ZooKeeper会自动为每个子节点附加一个递增的编号,该编号为int类型,长度为10,左端以0补全。“/Locks”下会维持着这样一系列的节点:

Lock_0000000001,Lock_0000000002, Lock_0000000003, Lock_0000000004…

一旦这些创建这些节点的Server断开连接,该节点就会被清除(当然也可以主动清除)。

由于节点的编号是递增的,创建越晚排名越靠后。遵循先到先得的原则,Server创建完节点之后会检查自己的节点是不是最小的,如果是,那就获得锁,如果不是,排队等待。执行完任务之后,Server清除自己创建的节点,这样后面的节点会依次获得锁。

程序的运行结果如下:



总结分布式锁算法流程如下:

  1. 客户端连接zookeeper,并在/lock下创建临时的有序的子节点,第一个客户端对应的子节点为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推。
  2. 客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁;
  3. 执行业务代码;
  4. 完成业务流程后,删除对应的子节点释放锁。

开源Curator实现分布式锁

虽然zookeeper原生客户端暴露的API已经非常简洁了,但是实现一个分布式锁还是比较麻烦的…我们可以直接使用curator这个开源项目提供的zookeeper分布式锁实现。

我们只需要引入下面这个包(基于maven):

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>

然后就可以用啦!代码如下:

public static void main(String[] args) throws Exception {
    //创建zookeeper的客户端
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);
    client.start();

    //创建分布式锁, 锁空间的根节点路径为/curator/lock
    InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
    mutex.acquire();
    //获得了锁, 进行业务流程
    System.out.println("Enter mutex");
    //完成业务流程, 释放锁
    mutex.release();
    
    //关闭客户端
    client.close();
}

可以看到关键的核心操作就只有mutex.acquire()和mutex.release(),简直太方便了!

下面来分析下获取锁的源码实现。acquire的方法如下:

/* * 获取锁,当锁被占用时会阻塞等待,这个操作支持同线程的可重入(也就是重复获取锁),acquire的次数需要与release的次数相同。 * @throws Exception ZK errors, connection interruptions */
@Override
public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

这里有个地方需要注意,当与zookeeper通信存在异常时,acquire会直接抛出异常,需要使用者自身做重试策略。代码中调用了internalLock(-1, null),参数表明在锁被占用时永久阻塞等待。internalLock的代码如下:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{

    //这里处理同线程的可重入性,如果已经获得锁,那么只是在对应的数据结构中增加acquire的次数统计,直接返回成功
    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }

    //这里才真正去zookeeper中获取锁
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        //获得锁之后,记录当前的线程获得锁的信息,在重入时只需在LockData中增加次数统计即可
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    //在阻塞返回时仍然获取不到锁,这里上下文的处理隐含的意思为zookeeper通信异常
    return false;
}

代码中增加了具体注释,不做展开。看下zookeeper获取锁的具体实现:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    //参数初始化,此处省略
    //...
   
    //自旋获取锁
    while ( !isDone )
    {
        isDone = true;

        try
        {
            //在锁空间下创建临时且有序的子节点
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            //判断是否获得锁(子节点序号最小),获得锁则直接返回,否则阻塞等待前一个子节点删除通知
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            //对于NoNodeException,代码中确保了只有发生session过期才会在这里抛出NoNodeException,因此这里根据重试策略进行重试
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    //如果获得锁则返回该子节点的路径
    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

上面代码中主要有两步操作:

  • driver.createsTheLock:创建临时且有序的子节点,里面实现比较简单不做展开,主要关注几种节点的模式:1)PERSISTENT(永久);2)PERSISTENT_SEQUENTIAL(永久且有序);3)EPHEMERAL(临时);4)EPHEMERAL_SEQUENTIAL(临时且有序)。
  • internalLockLoop:阻塞等待直到获得锁。

看下internalLockLoop是怎么判断锁以及阻塞等待的,这里删除了一些无关代码,只保留主流程:

//自旋直至获得锁
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
    //获取所有的子节点列表,并且按序号从小到大排序
    List<String>        children = getSortedChildren();
    
    //根据序号判断当前子节点是否为最小子节点
    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
    if ( predicateResults.getsTheLock() )
    {
        //如果为最小子节点则认为获得锁
        haveTheLock = true;
    }
    else
    {
        //否则获取前一个子节点
        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

        //这里使用对象监视器做线程同步,当获取不到锁时监听前一个子节点删除消息并且进行wait(),当前一个子节点删除(也就是锁释放)时,回调会通过notifyAll唤醒此线程,此线程继续自旋判断是否获得锁
        synchronized(this)
        {
            try 
            {
                //这里使用getData()接口而不是checkExists()是因为,如果前一个子节点已经被删除了那么会抛出异常而且不会设置事件***,而checkExists虽然也可以获取到节点是否存在的信息但是同时设置了***,这个***其实永远不会触发,对于zookeeper来说属于资源泄露
                client.getData().usingWatcher(watcher).forPath(previousSequencePath);

                //如果设置了阻塞等待的时间
                if ( millisToWait != null )
                {
                    millisToWait -= (System.currentTimeMillis() - startMillis);
                    startMillis = System.currentTimeMillis();
                    if ( millisToWait <= 0 )
                    {
                        doDelete = true;    // 等待时间到达,删除对应的子节点
                        break;
                    }
                    
                    //等待相应的时间
                    wait(millisToWait);
                }
                else
                {
                   //永远等待
                    wait();
                }
            }
            catch ( KeeperException.NoNodeException e ) 
            {
                //上面使用getData来设置***时,如果前一个子节点已经被删除那么会抛出NoNodeException,只需要自旋一次即可,无需额外处理
            }
        }
    }
}

6.3 分布式队列

很多单机上很平常的事情,放在集群环境中都会发生质的变化。

以一个常见的生产者-消费者模型举例:有一个容量有限的邮筒,寄信者(即生产者)不断地将信件塞入邮筒,邮递员(即消费者)不断地从邮筒取出信件发往目的地。运行期间需要保证:

(1)邮筒已达上限时,寄信者停止活动,等带邮筒恢复到非满状态

(2)邮筒已空时,邮递员停止活动,等带邮筒恢复到非空状态

该邮筒用有序队列实现,保证FIFO(先进先出)特性。

在一台机器上,可以用有序队列来实现邮筒,保证FIFO(先进先出)特性,开启两个线程,一个充当寄信者,一个充当邮递员,通过wait()/notify()很容易实现上述功能。

但是,如果在跨进程或者分布式环境下呢?比如,一台机器运行生产者程序,另一台机器运行消费者程序,代表邮筒的有序队列无法跨机器共享,但是两者需要随时了解邮筒的状态(是否已满、是否已空)以及保证信件的有序(先到达的先发送)。

这种情况下,可以借助ZooKeeper实现一个分布式队列。新建一个“/mailBox”节点代表邮筒。一旦有信件到达,就在该节点下创建PERSISTENT_SEQUENTIAL类型的子节点,当子节点总数达到上限时,阻塞生产者,然后使用getChildren(String path, Watcher watcher)方法监控子节点的变化,子节点总数减少后再回复生产;而消费者每次选取序号最小的子节点进行处理,然后删除该节点,当子节点总数为0时,阻塞消费者,同样设置监控,子节点总数增加后再回复消费。

代码如下:

[java]  view plain  copy
  1. import java.io.IOException;  
  2. import java.util.Collections;  
  3. import java.util.List;  
  4.   
  5. import org.apache.zookeeper.CreateMode;  
  6. import org.apache.zookeeper.KeeperException;  
  7. import org.apache.zookeeper.WatchedEvent;  
  8. import org.apache.zookeeper.Watcher;  
  9. import org.apache.zookeeper.ZooDefs.Ids;  
  10. import org.apache.zookeeper.ZooKeeper;  
  11. import org.apache.zookeeper.data.Stat;  
  12.   
  13. public class ZkDistributedQueue {  
  14.   
  15.     // 邮箱上限为10封信  
  16.     private static final int MAILBOX_MAX_SIZE = 10;  
  17.   
  18.     // 邮箱路径  
  19.     private static final String MAILBOX_ROOT_PATH = "/mailBox";  
  20.   
  21.     // 信件节点  
  22.     private static final String LETTER_NODE_NAME = "letter_";  
  23.   
  24.     // 生产者线程,负责发送信件  
  25.     static class Producer extends Thread {  
  26.   
  27.         ZooKeeper zkClient;  
  28.   
  29.         @Override  
  30.         public void run() {  
  31.             while (true) {  
  32.                 try {  
  33.                     if (getLetterNum() == MAILBOX_MAX_SIZE) { // 信箱已满  
  34.                         System.out.println("mailBox has been full");  
  35.                         // 创建Watcher,监控子节点的变化  
  36.                         Watcher watcher = new Watcher() {  
  37.                             @Override  
  38.                             public void process(WatchedEvent event) {  
  39.                                 // 生产者已停止,只有消费者在活动,所以只可能出现接收信件的动作  
  40.                                 System.out.println("mailBox has been not full");  
  41.                                 synchronized (this) {  
  42.                                     notify(); // 唤醒生产者  
  43.                                 }  
  44.                             }  
  45.                         };  
  46.                         zkClient.getChildren(MAILBOX_ROOT_PATH, watcher);  
  47.   
  48.                         synchronized (watcher) {  
  49.                             watcher.wait(); // 阻塞生产者  
  50.                         }  
  51.                     } else {  
  52.                         // 线程随机休眠数毫秒,模拟现实中的费时操作  
  53.                         int sleepMillis = (int) (Math.random() * 1000);  
  54.                         Thread.sleep(sleepMillis);  
  55.                           
  56.                         //发送信件,创建新的子节点  
  57.                         String newLetterPath = zkClient.create(  
  58.                                 MAILBOX_ROOT_PATH + "/" + LETTER_NODE_NAME,  
  59.                                 "letter".getBytes(),   
  60.                                 Ids.OPEN_ACL_UNSAFE,   
  61.                                 CreateMode.PERSISTENT_SEQUENTIAL);  
  62.                         System.out.println("a new letter has been received: "   
  63.                                 + newLetterPath.substring(MAILBOX_ROOT_PATH.length()+1)   
  64.                                 + ", letter num: " + getLetterNum());  
  65.                     }  
  66.                 } catch (Exception e) {  
  67.                     System.out.println("producer equit task becouse of exception !");  
  68.                     e.printStackTrace();  
  69.                     break;  
  70.                 }  
  71.             }  
  72.         }  
  73.   
  74.         private int getLetterNum() throws KeeperException, InterruptedException {  
  75.             Stat stat = zkClient.exists(MAILBOX_ROOT_PATH, null);  
  76.             int letterNum = stat.getNumChildren();  
  77.             return letterNum;  
  78.         }  
  79.   
  80.         public void setZkClient(ZooKeeper zkClient) {  
  81.             this.zkClient = zkClient;  
  82.         }  
  83.     }  
  84.   
  85.     // 消费者线程,负责接收信件  
  86.     static class Consumer extends Thread {  
  87.   
  88.         ZooKeeper zkClient;  
  89.   
  90.         @Override  
  91.         public void run() {  
  92.             while (true) {  
  93.                 try {  
  94.                     if (getLetterNum() == 0) { // 信箱已空  
  95.                         System.out.println("mailBox has been empty");  
  96.                         // 创建Watcher,监控子节点的变化  
  97.                         Watcher watcher = new Watcher() {  
  98.                             @Override  
  99.                             public void process(WatchedEvent event) {  
  100.                                 // 消费者已停止,只有生产者在活动,所以只可能出现发送信件的动作  
  101.                                 System.out.println("mailBox has been not empty");  
  102.                                 synchronized (this) {  
  103.                                     notify(); // 唤醒消费者  
  104.                                 }  
  105.                             }  
  106.                         };  
  107.                         zkClient.getChildren(MAILBOX_ROOT_PATH, watcher);  
  108.   
  109.                         synchronized (watcher) {  
  110.                             watcher.wait(); // 阻塞消费者  
  111.                         }  
  112.                     } else {  
  113.                         // 线程随机休眠数毫秒,模拟现实中的费时操作  
  114.                         int sleepMillis = (int) (Math.random() * 1000);  
  115.                         Thread.sleep(sleepMillis);  
  116.   
  117.                         // 接收信件,删除序号最小的子节点  
  118.                         String firstLetter = getFirstLetter();  
  119.                         zkClient.delete(MAILBOX_ROOT_PATH + "/" +firstLetter, -1);  
  120.                         System.out.println("a letter has been delivered: " + firstLetter   
  121.                                 + ", letter num: " + getLetterNum());  
  122.                     }  
  123.                 } catch (Exception e) {  
  124.                     System.out.println("consumer equit task becouse of exception !");  
  125.                     e.printStackTrace();  
  126.                     break;  
  127.                 }  
  128.             }  
  129.         }  
  130.   
  131.         private int getLetterNum() throws KeeperException, InterruptedException {  
  132.             Stat stat = zkClient.exists(MAILBOX_ROOT_PATH, false);  
  133.             int letterNum = stat.getNumChildren();  
  134.             return letterNum;  
  135.         }  
  136.   
  137.         private String getFirstLetter() throws KeeperException, InterruptedException {  
  138.             List<String> letterPaths = zkClient.getChildren(MAILBOX_ROOT_PATH, false);  
  139.             Collections.sort(letterPaths);  
  140.             return letterPaths.get(0);  
  141.         }  
  142.   
  143.         public void setZkClient(ZooKeeper zkClient) {  
  144.             this.zkClient = zkClient;  
  145.         }  
  146.     }  
  147.   
  148.     public static void main(String[] args) throws IOException {  
  149.         // 开启生产者线程  
  150.         Producer producer = new Producer();  
  151.         ZooKeeper zkClientA = new ZooKeeper("127.0.0.1:2181"3000null);  
  152.         producer.setZkClient(zkClientA);  
  153.         producer.start();  
  154.   
  155.         // 开启消费者线程  
  156.         Consumer consumer = new Consumer();  
  157.         ZooKeeper zkClientB = new ZooKeeper("127.0.0.1:2181"3000null);  
  158.         consumer.setZkClient(zkClientB);  
  159.         consumer.start();  
  160.     }  
  161. }  

打印结果如下:


上例中还有一个可以改进的地方,在分布式环境下,像MAILBOX_MAX_SIZE这类常量是被多台机器共用的,而且运行期间可能发生改变,比如邮筒上限需要从10改为20,只能停掉机器,然后改动每台机器上的参数,再重新部署。可是,如果该服务不允许停机,而且部署在数十台机器上,让参数在运行时生效且保持一致,怎么办?

这就涉及到了ZooKeeper另一个典型的应用场景——配置中心。被多台机器共享的参数可以托管在ZNode上,对该参数关心的机器在Znode上注册Watcher,一旦该参数发生变化,注册者会收到消息,然后做出相应的调整。

 

ZooKeeper的作用当然不止于此,更多的应用场景就需要使用者在实际项目中发掘跟探索了,毕竟,纸上得来终觉浅,实践出真知。

ZooKeeper典型使用场景总结

        ZooKeeper是一个高可用的分布式数据管理与系统协调框架。基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基 于这样的特性,使得zookeeper能够应用于很多场景。网上对zk的使用场景也有不少介绍,本文将结合作者身边的项目例子,系统的对zk的使用场景进 行归类介绍。 值得注意的是,zk并不是生来就为这些场景设计,都是后来众多开发者根据框架的特性,摸索出来的典型使用方法。因此,也非常欢迎你分享你在ZK使用上的奇 技淫巧。

        

场景类别

典型场景描述(ZK特性,使用方法)

应用中的具体使用

数据发布与订阅

发布与订阅即所谓的配置管理,顾名思义就是将数据发布到zk节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,地址列表等就非常适合使用。

1. 索引信息和集群中机器节点状态存放在zk的一些指定节点,供各个客户端订阅使用。2. 系统日志(经过处理后的)存储,这些日志通常2-3天后被清除。

 

3. 应用中用到的一些配置信息集中管理,在应用启动的时候主动来获取一次,并且在节点上注册一个Watcher,以后每次配置有更新,实时通知到应用,获取最新配置信息。

4. 业务逻辑中需要用到的一些全局变量,比如一些消息中间件的消息队列通常有个offset,这个offset存放在zk上,这样集群中每个发送者都能知道当前的发送进度。

5. 系统中有些信息需要动态获取,并且还会存在人工手动去修改这个信息。以前通常是暴露出接口,例如JMX接口,有了zk后,只要将这些信息存放到zk节点上即可。

Name Service

这个主要是作为分布式命名服务,通过调用zk的create node api,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。

 

分布通知/协调

ZooKeeper 中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对 ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能 够收到通知,并作出相应处理。

1. 另一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过zk上某个节点关联,大大减少系统耦合。2. 另一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改 了ZK上某些节点的状态,而zk就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。

 

3. 另一种工作汇报模式:一些类似于任务分发系统,子任务启动后,到zk来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合。

分布式锁

分布式锁,这个主要得益于ZooKeeper为我们保证了数据的强一致性,即用户只要完全相信每时每刻,zk集群中任意节点(一个zk server)上的相同znode的数据是一定是相同的。锁服务可以分为两类,一个是保持独占,另一个是控制时序。

 

所 谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把zk上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。

控 制时序,就是所有视图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里 /distribute_lock 已经预先存在,客户端在它下面创建临时有序节点(这个可以通过节点的属性控制:CreateMode.EPHEMERAL_SEQUENTIAL来指 定)。Zk的父节点(/distribute_lock)维持一份sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。

 

集群管理

1. 集群机器监 控:这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群 机器是否存活。过去的做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报“我还活着”。 这种做法可行,但是存在两个比较明显的问题:1. 集群中机器有变动的时候,牵连修改的东西比较多。2. 有一定的延时。

 

利 用ZooKeeper有两个特性,就可以实时另一种集群机器存活性监控系统:a. 客户端在节点 x 上注册一个Watcher,那么如果 x 的子节点变化了,会通知该客户端。b. 创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失。

例 如,监控系统在 /clusterServers 节点上注册一个Watcher,以后每动态加机器,那么就往 /clusterServers 下创建一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,至于后续处理就是监控系统的业务了。
2. Master选举则是zookeeper中最为经典的使用场景了。

在 分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行, 其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。

利用ZooKeeper的强一致性,能够保证在分布式高并***况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。

利用这个特性,就能很轻易的在分布式环境中进行集群选取了。

另外,这种场景演化一下,就是动态Master选举。这就要用到 EPHEMERAL_SEQUENTIAL类型节点的特性了。

上 文中提到,所有客户端创建请求,最终只有一个能够创建成功。在这里稍微变化下,就是允许所有请求都能够创建成功,但是得有个创建顺序,于是所有的请求最终 在ZK上创建结果的一种可能情况是这样: /currentMaster/{sessionId}-1 , /currentMaster/{sessionId}-2 , /currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器作为Master,如果这个机器挂了,由于他创建的节点会马上小时,那么之后最小的那个机器就是Master了。

1. 在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此之间索引数据一致。因此让集群中的Master来进行全量索引的生成, 然后同步到集群中其它机器。2. 另外,Master选举的容灾措施是,可以随时进行手动指定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向 一个地方获取master。

分布式队列

队列方面,我目前感觉有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。对于第二种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里不再赘述。

 

第 二种队列其实是在FIFO队列的基础上作了一个增强。通常可以在 /queue 这个znode下预先建立一个/queue/num 节点,并且赋值为n(或者直接给/queue赋值n),表示队列大小,之后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否可以开始执行 了。这种用法的典型场景是,分布式环境中,一个大任务Task A,需要在很多子任务完成(或条件就绪)情况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下建立自己的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现自己下面的子节点满足指定个数,就可以进行下一步按序进行处理了。


收藏
评论加载中...