总结
客户端发送请求:
- 调用某个接口的方法会调用之前生成的代理类,代理类会通过 cluster(默认是 FailoverCluster) 从 Directory 获取一堆 invokers
- 然后会经过 router 路由的过滤(看配置也会添加 mockInvoker 用于服务降级)、然后再通过 SPI 得到 LoadBalance 进行负载均衡选择一个 Invoker 发起远程调用
- 封装 request 请求并记录此请求和请求的 ID 等待服务端的响应,再通过 NettyClient 发起远程调用。
服务端接收请求:
- 服务端接受请求之后,根据协议得到信息并反序列化成对象,再按照派发策略派发消息,默认是 ALL,全部请求扔给业务线程池
- 业务线程池会根据消息类型判断得到 serviceKey,从之前服务暴露的 exporterMap 中获取相应的 exporter,得到Invoker,然后最终调用真正的实现类
- 再组装好结果返回,这个响应会带上之前请求的 ID。客户端收到这个响应之后会通过 ID 找到存储的 Future,然后塞入响应再唤醒等待 future 的线程,最后客户端得到响应,完成远程调用的过程。
客户端源码解析
客户端告知服务端的具体信息应该包含哪些?
- 首先客户端肯定要告知要调用是服务器的哪个接口,当然还需要方法名、方法的参数类型、方法的参数值,还有可能存在多个版本的情况,所以还得带上版本号
- 服务端可以清晰的得知客户端要调用的是哪个方法、可以进行精确调用
MockClusterInvoker.invoke()
- 客户端调用具体的接口会调用生成的代理类,而代理类会生成一个 RPCInvocation 对象调用 MockClusterInvoker.invoke()
- 判断配置里面有没有配置 mock
- this.invoker.invoke(invocation) 调用 AbstractClusterInvoker.invoker()
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 获取 mock 配置
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
// 不走 mock 逻辑
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
// 强制走 mock 逻辑
result = doMockInvoke(invocation, null);
} else {
// 调用失败了失败的 mock逻辑
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
·················
}
}
return result;
}
AbstractClusterInvoker.invoke()
- 调用 directory.list() 进行路由过滤
- 调用 ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension() SPI 机制获取 LoadBalance 实现类
- 最后调用子类 doInvoke() 默认调用 FailoverClusterInvoker.doInvoker()
- 这个结构体现 模板方法,因为路由过滤、获取LoadBalance实现类都是每个子类都要执行的,所以抽象到抽象类中
public Result invoke(final Invocation invocation) throws RpcException {
// 检查是否被销毁
checkWhetherDestroyed();
LoadBalance loadbalance = null;
// 路由过滤
List<Invoker<T>> invokers = list(invocation);
// 通过 SPI 机制获取 LoadBalance 实现类
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 调用子类方法,使用模板方法
return doInvoke(invocation, invokers, loadbalance);
}
AbstractDirectory.list()
- 通过方法名找到 Invoker,然后经过服务的路由过滤
- 在进行 LoadBalance 的选择,得到一个 Invoker,默认使用 FailoverClusterInvoker 的容错方式
- 容错机制、负载均衡
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
// 通过方法名找到对应的 Invoker
List<Invoker<T>> invokers = doList(invocation);
// 路由过滤
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
// 根据参数判断是否需要进行路由
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
FailoverClusterInvoker.doInvoke()
- FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表,并且经过路由之后,会让 LoadBalance 从 Invoker 列表中选择一个 Invoker
- select() 通过负载均衡选择一个 Invoker
- RpcContext.getContext().setInvokers() 上下文保存调用过的 Invoker
- invoker.invoke() 发起远程调用、会调用 AbstractInvoker.invoke() 会返回具体实现的子类 DubboInvoker.doInvoke()
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
······················
// 重试次数
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
························
// 负载均衡选择一个 Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
// 上下文保存调用过的 Invoker
RpcContext.getContext().setInvokers((List) invoked);
try {
// 发起调用
Result result = invoker.invoke(invocation);
·············
return result;
} catch (RpcException e) {
·················
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(·················);
}
DubboInvoker.doInvoke()
- 远程调用三种方式:oneway、异步、同步
- oneway:当你不关心你的请求是否发送成功的情况下,就用 oneway 的方式发送,这种方式消耗最小,啥都不用记、啥都不用管;
- 异步调用:Dubbo 天然就是异步的,可以看到 client 发送请求之后会得到一个 ResponseFuture,然后把 future 包装一下塞到上下文中,这样用户就可以从上下文拿到这个 future,然后用户可以做了一波操作之后再调用 future.get() 等待结果;
- 同步调用:Dubbo 框架帮助我们异步转同步了,从代码可以看到在 Dubbo 源码中就调用了 future.get(), 之后就阻塞住了,必须等待结果到了之后才能返回,所以是同步的
Dubbo 本质是异步的,dubbo 框架帮我们转化为同步,区别其实就是 future.get() 在用户代码被调用还是框架代码被调用。
其实我个人总结就是因为会调用 RpcContext.getContext().setFuture() 存储 future,实现了异步调用。而同步就不使用这个上下文,直接等待 currentClient.request() 结果的返回
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 设置 path 到 attachment
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// 选择 client
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 是否异步调用
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 是否 oneway 方式发送
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获取超时时间
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 不需要返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent); // 发送
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 异步发送
ResponseFuture future = currentClient.request(inv, timeout);
// 设置 future
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
// 同步发送
RpcContext.getContext().setFuture(null);
// 调用 future.get() 等待
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
··················
}
}
currentClient.request()
- 会调用 HeaderExchangeChannel.request() 组装 request,
- 通过 DefaultFuture() 构造 future
- 再调用 channel.send() 也就是调用子类 NettyClient 发起请求
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
···············
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 调用 nettyClient 的 sent 发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
Request、DefaultFuture
由于是异步,那么 future 通过上下文保存之后,等待响应回来了如何找到对应的 future 呢?
- 通过唯一 ID,Request 会生成一个全局唯一 ID,然后 future 内部会将自己和 ID 存储到一个 ConcurrentHashMap。这个 ID 发送到服务端之后,服务端也会把这个 ID 返回来,这样通过 ID 再去 ConcurrentHashMap 里面就可以找到对应的 future。
介绍 Request、DefaultFuture:
- Request类:通过构造函数调用静态方法 newId() 创造唯一ID, 在 newId() 静态方法通过原子类的方式自增
- DefaultFuture类:通过 ConcurrentHashMap 存储唯一 ID 与 Future 的映射关系,调用 received() 方法,通过 Response 中的 ID 从 map 中获取 Future
// Request
public class Request {
// 原子类
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
// 构造函数调用 newId() 创建唯一 ID
public Request() {
mId = newId();
}
// 通过原子类实现自增 ++
private static long newId() {
return INVOKE_ID.getAndIncrement();
}
}
// DefaultFuture
public class DefaultFuture implements ResponseFuture {
// 存储对应的Future
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 将唯一 ID 与 future 的关系存放到 ConcurrentHashMap中
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
// 接受响应、调用received()
public static void received(Channel channel, Response response) {
try {
// 通过 ID 从 map 中获取 future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn(···················);
}
} finally {
CHANNELS.remove(response.getId());
}
}
}
服务端源码解析
服务端接收到请求之后就会解析请求得到消息,这消息有五种派发策略:
默认走 all 策略,也就是所有消息都派发到业务线程池中。
策略 | 用途 |
---|---|
all | 所有消息都派发到线程池,包括请求、响应、连接事件、断开事件等 |
direct | 所有消息都不派发到线程池,全部在 IO 线程上直接执行 |
message | 只有请求和响应消息派发到线程池,其他消息均在 IO 线程上执行 |
execution | 只有请求消息派发到线程池,不含响应。其他消息均在 IO 线程上执行 |
connection | 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池 |
AllChannelHandler.received()
- getExecutorService():获取线程池,通过调用 WrappedChannelHandler 类构造函数的 SPI 机制获取自定义的线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
- 将消息封装成 ChannelEventRunnable() 放入业务线程池中执行
- 根据 ChannelState 调用对应的处理方法,这里是 ChannelState.RECEIVED,所以调用 handler.received,最终调用 HandlerExchangeHandler.handleRequest()
public void received(Channel channel, Object message) throws RemotingException {
// 获取线程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
// 如果需要返回响应,将错误封装到响应中
if(request.isTwoWay()){
String msg = ;
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
HandlerExchangeHandler.handleRequest()
- 通过请求 ID 构造响应 response,并获取请求的信息
- 通过调用 handler.reply() 调用 DubboProtocol.reply()
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
// 通过请求 ID 构造 Response
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
······················
}
// 获取请求的信息,方法名等
Object msg = req.getData();
try {
// 最终调用 DubboProtocol.reply()
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
DubboProtocol.reply()
- 根据 message 封装成 Invocation
- 通过 DubboProtocol.this.getInvoker() 根据请求信息获取对应的 Invoker
- 调用 invoker.invoke(),会通过 ProxyFactory 使用 javassist 动态代理生成 Invoker,通过protocol.export() 生成 exporter。参考:服务暴露
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
························
} else {
Invocation inv = (Invocation)message;
// 通过 message 获取对应的 Invoker
Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
if (Boolean.TRUE.toString().equals(inv.getAttachments().get("_isCallBackServiceInvoke"))) {
// 回调逻辑
·························
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
}
DubboProtocol.this.getInvoker()
前提了解:
服务暴露通过ProxyFactory.getInvoker() 生成 Invoker, 包装后调用 protocol.exporter() 生成 exporter,并将 serviceKey、exporter 存放在 exporterMap 中。详细过程参考服务暴露
- 通过请求信息构造出 serviceKey,从服务暴露的时候存储的 exporterMap 中获取对应的 exporter 服务
- 通过 exporter.getInvoker() 返回 Invoker
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
····························
String serviceKey = serviceKey(port, path, (String)inv.getAttachments().get("version"), (String)inv.getAttachments().get("group"));
// 通过 serviceKey 从 map 中获取之前暴露的服务
DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(·················);
} else {
// 返回 Invoker
return exporter.getInvoker();
}
}