我们在服务暴露的时候可以选择往注册中心注册,在服务引用的时候,可以选择从注册中心获取服务的地址。注册中心本身是一个服务,它用来管理着服务的一些信息,比如保存了服务提供者、服务消费者的地址信息以及服务接口的全限定名,并且管理着这些信息的关系。
如果系统进行直连的话,在服务在运行期间,如果系统崩溃或者新增机器的时候,服务消费者不能动态感知到服务提供者的变化。
使用注册中心的好处:
-
服务提供者和服务消费者实现解耦;
-
服务提供者能够从注册中心获取当前有多少服务消费者再跟它本身通信。可以为限流策略动态提供数据支持;
-
服务消费者在进行通信的时候,服务出现异常,在消费者侧的感知是该连接断开,通信超时,在注册中心也会把相应的节点移除,防止其他消费者从注册中心获取该服务时获取到不可用的机器地址信息。服务消费者可以直接从集群中再次选择别的服务提供者进行通信;
-
当出现横向扩展或者缩减,也就是当服务提供者集群机器新增或者缩减时,注册中心可以感知到变化,修改相应的数据,动态通知订阅该服务的消费者。可以进一步减少消费者在选择服务提供者的时候选择到不可用的服务提供者;
- 当出现纵向扩展,也就是新的服务暴露的时候,服务注册到注册中心,服务消费者可以直接从注册中心中获取新服务的地址信息。
注册中心存在的问题:
如果只有一个注册中心,就会出现单点问题,也就是当这个注册中心服务挂了,那么服务消费者将无法获取到服务提供者的地址信息,服务提供者也无法注册服务,导致服务之间无法进行调用,出现灾难性问题。所以必然会有多个注册中心被部署来解决单点问题。由于注册中心内维护着信息是实时改变的,所以将面临多个注册中心数据一致性问题。针对分布式中数据一致性问题,就会涉及到CAP定理。如何在一致性(C)、可用性(A)、分区容错性(P)中做选择,将是实现注册中心一直会面临的问题,到底使用了哪种一致性协议来解决分布式一致性问题。
分布式的弊端:数据不一致的问题,也就是数据被动修改后没有及时同步到所有服务或者有多个服务同时去修改数据,导致数据不正确。
zookeeper就是负责协调多个服务器间的数据和状态同步的。
zookeeper的数据结构:
项目中对Zookeeper的创建持久化节点,获取子节点,清空注册中心的数据的代码如下:
@Slf4j public final class CuratorUtils { private static final int BASE_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 5; private static final String CONNECT_STRING = "127.0.0.1:2181"; public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc"; private static Map<String, List<String>> serviceAddressMap = new ConcurrentHashMap<>(); private static Set<String> registeredPathSet = ConcurrentHashMap.newKeySet(); private static CuratorFramework zkClient; static { zkClient = getZkClient(); } private CuratorUtils() { } /** * 创建持久化节点。不同于临时节点,持久化节点不会因为客户端断开连接而被删除 * * @param path 节点路径 */ public static void createPersistentNode(String path) { try { if (registeredPathSet.contains(path) || zkClient.checkExists().forPath(path) != null) { log.info("节点已经存在,节点为:[{}]", path); } else { //eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999 zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); log.info("节点创建成功,节点为:[{}]", path); } registeredPathSet.add(path); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } } /** * 获取某个字节下的子节点,也就是获取所有提供服务的生产者的地址 * * @param serviceName 服务对象接口名 eg:github.javaguide.HelloService * @return 指定字节下的所有子节点 */ public static List<String> getChildrenNodes(String serviceName) { if (serviceAddressMap.containsKey(serviceName)) { return serviceAddressMap.get(serviceName); } List<String> result; String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName; try { result = zkClient.getChildren().forPath(servicePath); serviceAddressMap.put(serviceName, result); registerWatcher(zkClient, serviceName); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } return result; } /** * 清空注册中心的数据 */ public static void clearRegistry() { registeredPathSet.stream().parallel().forEach(p -> { try { zkClient.delete().forPath(p); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } }); log.info("服务端(Provider)所有注册的服务都被清空:[{}]", registeredPathSet.toString()); } private static CuratorFramework getZkClient() { // 重试策略。重试3次,并且会增加重试之间的睡眠时间。 RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() //要连接的服务器(可以是服务器列表) .connectString(CONNECT_STRING) .retryPolicy(retryPolicy) .build(); curatorFramework.start(); return curatorFramework; } /** * 注册监听指定节点。 * * @param serviceName 服务对象接口名 */ private static void registerWatcher(CuratorFramework zkClient, String serviceName) { String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName; PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true); PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> { List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath); serviceAddressMap.put(serviceName, serviceAddresses); }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); try { pathChildrenCache.start(); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } } }
每一个节点都存储数据,叫Znode
zookeeper的一致性协议采用原子消息广播协议(ZAB)。
ZAB中的角色、流程、两种模式以及服务器事务同步:
角色
- leader节点:全局唯一,负责协调所要处理的事务,可用作读或写。选举时间:1. 集群初始化的时候;2.leader节点崩溃后 3. leader节点与集群中超过一半的节点断开
- follower节点:leader节点的副本,用作读操作。
流程
1. zookeeper客户端随机连接到集群中的一个节点,如果是写请求且当前节点不是leader,节点会向leader提交事务,leader会广播事务;如果是读请求,直接从该节点读取数据;
2. 事务请求代打leader服务器后,leader将事务转化为一个提议,并将提议分发给群集中的follower节点,等待follower节点反馈
3. follower节点如果正常,则会给leader节点正确的反馈
4. leader如果收到半数以上的follower的正确反馈,会向所有follower分发commit消息。要求follower提交该事务
两种模式
消息广播模式:正常的模式
崩溃恢复模式:解决leader单点问题,在恢复后解决数据不一致问题
多服务器事务同步
事务同步就是用来保障集群中存在过半的机器能够和leader服务器的数据状态保持一致。同步操作会发生在:
1. leader节点被重新选举,需要进行事务同步
2. 集群中新增了一台服务器
leader为每一个follower提供一个队列,leader将没有被follower执行的事务发送到对应的follower。在每条消息后面会加上commit信息,表示该事务已经被提交。当所有事务都被提交后,leader会将follower加入可用的机器列表里。
zookeeper的优势
数据模型简单
支持事件监听
崩溃恢复模式
劣势
CP原则,强调强一致性,放弃了部分可用性 【分布式系统网络异常不可避免,必须满足P,分区容错性】
只有leader节点能够写,写操作存在瓶颈
基于zookeeper实现服务注册
@Slf4j public class ZkServiceRegistry implements ServiceRegistry { @Override public void registerService(String serviceName, InetSocketAddress inetSocketAddress) { //根节点下注册子节点:服务 String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + serviceName + inetSocketAddress.toString(); CuratorUtils.createPersistentNode(servicePath); } }
基于 zookeeper 实现服务发现
package github.javaguide.registry; import github.javaguide.loadbalance.LoadBalance; import github.javaguide.loadbalance.RandomLoadBalance; import github.javaguide.utils.zk.CuratorUtils; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.List; /** * 基于 zookeeper 实现服务发现 * * @author Yuan Yangxin * @createTime 2020年06月01日 15:16:00 */ @Slf4j public class ZkServiceDiscovery implements ServiceDiscovery { private final LoadBalance loadBalance; public ZkServiceDiscovery() { this.loadBalance = new RandomLoadBalance(); } @Override public InetSocketAddress lookupService(String serviceName) { // TODO(Yuan Yangxin):feat: 负载均衡 // 这里采用了随机负载均衡的算法,从可选节点中随机选择节点 List<String> serviceAddresses = CuratorUtils.getChildrenNodes(serviceName); String serviceAddress = loadBalance.selectServiceAddress(serviceAddresses); log.info("成功找到服务地址:{}", serviceAddress); String[] socketAddressArray = serviceAddress.split(":"); String host = socketAddressArray[0]; int port = Integer.parseInt(socketAddressArray[1]); return new InetSocketAddress(host, port); } }
随机负载均衡的具体实现为:
@Override protected String doSelect(List<String> serviceAddresses) { Random random = new Random(); return serviceAddresses.get(random.nextInt(serviceAddresses.size())); }