一道面试题

 

让我们开门见山,直面主题:Dubbo 服务里面有个服务端,还有个消费端你知道吧?

服务端和消费端都各有一个线程池你知道吧?

那么面试题来了:一般情况下,服务提供者比服务消费者多吧。一个服务消费方可能会并发调用多个服务提供者,每个用户线程发送请求后,会进行超时时间内的等待。多个服务提供者可能同时做完业务,然后返回,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将线程池里面的每个响应对象传递给相应等待的用户线程,且不出错呢?

先说答案。

这个题和答案其实就写在 Dubbo 的官网上:

http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html

以下回答来自官网:

答案是通过调用编号进行串联。

DefaultFuture 被创建时(下面我们会讲这个 DefaultFuture 是个什么东西),会要求传入一个 Request 对象。

此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。

线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。

最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:

上面是官网上的答案,写的比较清楚了,但是官网上是在写服务调用过程的时候顺便讲解了这个考察点,源码散布在各处,看起来比较散乱,不太直观。有的读者反映看的不是特别的明白。

我知道你为什么看的不是那么明白,我在之前的文章里面说过了,你根本就只是在官网白嫖,也不自己动手,像极了看我文章时候的样子:

好了,反正我也习惯被白嫖了,蹭我还写的动,你们就可劲嫖吧。

源码之中无秘密。带你从源码之中寻找答案,让你把官网上的回答和源码能对应起来,这样就更方便你自己动手了。

需要说明一下的是本文 Dubbo 源码版本为 2.7.5。而官网文档演示的源码版本是 2.6.4 。这两个版本上还是有一点差异的,写到的地方我会进行强调。

Demo演示

 

Demo 大家可以直接参照官方的快速启动:

dubbo.apache.org/zh-cn/docs/user/quick-start.html

我这里就是一个非常简单的服务端:

客户端在单元测试里面进行消费:

是的,细心的老朋友肯定看出来了,这个 Demo 我已经用过非常多次了。基本上我每篇 Dubbo 相关的文章里面都会出现这个 Demo。

我建议你自己也花了 10 分钟时间搭一个吧。对你的学习有帮助。别懒,好吗?

我给你一个地址,然后你拉下来就能跑,这种也不是不行。这种我也考虑过。主要是治一治你不想自己动手的毛病,其次那不是我也懒得弄嘛。

好了,上面的 Demo 跑一下:

输出也是在我们的意料之中。当然了,大家都知道这个输出也必须是这样的。

那么你再细细的品一品。

我们扣一下题,把最开始的问题简化一下。

最开始的问题是一个服务消费端,多个服务提供者,然后服务提供者同时返回响应数据,消费端怎么处理。

其实核心问题就是服务消费端同时收到了多个响应数据,它应该怎么把响应数据对应的请求找到,只有正确找到了请求,才能正确返回数据。

所以我们把重心放到客户端。

在上面的例子中:参数 why1 和 why2 几乎是同时发到服务端的请求 ,然后服务端对于这两个请求也几乎同时响应到了客户端。

在服务端没有返回的时候客户端的两个请求是在干什么?是不是在用户线程上里面等着的接收数据?

那么问题就来了:Dubbo 是怎么把这两个响应对象和两个等待接收数据的用户线程配对成功的?

接下来,我们就带着这个问题,去源码里面寻找答案。

请求发起,等待响应

 

首先前面两节我们都说到了客户端用户线程的等待,也就是一次请求在等待响应。

这个等待在代码里面是怎么体现的呢?

答案藏在这个方法里面:

org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke

首先你看这个类名,AsyncToSyncInvoker,异步调用转同步调用,就感觉不简单,里面肯定搞事情了。

标号为 ① 的地方,是 invoker 调用,调用之后的返回是一个AsyncRpcResult。

在这个方法继续往下 Debug,没几步就可以走到这个地方:

org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int, java.util.concurrent.ExecutorService)

135 行就是 channel.send(req)。在往外发请求了,在发请求之前构建了一个 DefaultFuture。然后在请求发送出去之后,140 行返回了这个 future 。

最关键的秘密就藏在 133 行的这个 newFuture 里面。

看一看对应代码:

这个 newFuture 主要干了两件事:

  • 初始化 DefaultFuture 。

  • 检测是否超时。

我们看看初始化 DefaultFuture 的时候干了啥事:

首先我们在这里看到了 FUTURES 对象,这是一个 ConcurrentHashMap。这个 FUTURES 就是官网上说的静态 Map:

Map 里面的 key 是调用编号,也就是第 82 行代码中,从 request 里面获得的 id:

这个 id 是 AtomicLong 从 0 开始自增出来的。

代码里面还给了这样一行注释:

getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID

说这个方法当增加到 MAX_VALUE 后再次调用会变成 MIN_VALUE。但是没有关系,负数也是可以当做 ID 来用的。

这个 DefaultFuture 对象构建完成后是返回回去了。

返回到哪里去呢?

就是 DubboInvoker 的 doInvoker 方法中下面框起来的代码:

在 103 行,包装之后的 DefaultFuture 会通过构造方法放到 AsyncRpcResult 对象中:

而 DubboInvoker 的 doInvoker 方法返回的这个 result,即 AsyncRpcResult 就是前面标号为 ① 这里的返回值:

接着说说标号为 ② 的地方。

首先是判断当前调用模式是否是同步调用。我们这里就是同步调用,于是进入到 if 判断里面的逻辑。在这里面一看,调用的 get 方法,还带有超时时间。

看一下这个 get 方法是怎么样的:

可以看到这个 get 方法不是一个简单的异步编程的 CompletableFuture.get 。里面还包含了一个 ThreadlessExecutor 的 waitAndDrain 方法的逻辑。

这个方法一进来就是 queue.take 方法,阻塞等待。

这个队列里面装的是什么东西?

全局查找往这个队列里面放东西的逻辑,只有下面这一处:

说明这个队列里面扔的是一个 runable 的任务。

这个任务是什么呢?

我们这里先买个关子,放到下一小节里面去讲。

你只要知道:如果队列里面没有任务,那么用户线程就会一直在 take 这里阻塞等待。

有的小伙伴就要问了:这里怎么能是阻塞式的无限等待呢?接口调用不是有超时时间吗?

注意了,这里并不是无限等待。Dubbo 会保证当接口不管是否超时,都会有一个 Runable 的任务被扔到队列里面。所以 take 这里最多也就是等待超时时间这么长时间。

先记着这里,下面会给大家讲到超时检测的逻辑。

看到这里,我们已经和官网上的回答产生一点联系了,我再给大家捋一捋我们现在有的东西:

第一点:用户线程在 AsyncToSyncInvoker 类里面调用了下面这个方法,在等结果。代码和官网上的描述的对应关系如下:

官网上说:会调用不同 DefaultFuture 对象的 get 方法进行等待,这应该是 2.6.x 版本的做法了。

在 2.7.5 版本中是在 AsyncRpcResult 对象的 get 方法中进行等待。而在该方法中,其实是调用了队列的 take 方法,阻塞等待。

在这两个不同对象上的等待是两种完全不同的实现方式。2.7.5 版本里面这样做也是为了做客户端的共享线程池。实现起来优雅了很多,大家可以拿着两个版本的代码自行比较一下,理解到他的设计思路之后觉得真的是妙啊。

但是不论哪个版本,万变不离其宗,请求发出去后,还是需要在用户线程等待。

第二点:发送 request 对象之前构建了一个 DefaultFuture 对象。在这个对象里面维护了一个静态 MAP:

有了调用编号和 DefaultFuture 对象的映射关系。等收到 Response 响应之后,我们从 Response 中取出这个调用编号,就知道这个调用编号对应的是哪个 DefaultFuture 了,妙啊。

但是,等等。“从 Response 中取出这个调用编号”,那不是意外着我们得把调用编号送到服务端去?在哪送的?

答案是在协议里面,还记得上一篇文章中讲协议的时候里面也有个调用编号吗?

呼应上了没有?

每个请求和响应的 header 里面都有一个请求编号,这个编号是一一对应的,这是协议规定好的。

在发送 request 之前,对其进行 encode 的时候写进去的:

org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#encodeRequest

然后 Dubbo 就拿着这个携带着 requestId 的请求这么轻轻的一发。

你猜怎么着?

就等着响应了。

接受响应,寻找请求

 

请求发出去是一件很简单的事情。

但是作为响应回来之后就懵逼。一个响应回来了,找不到是谁发起的它,你说它难受不难受?难受就算了,你就不怕它随便找一个请求就返回了,当场让你懵逼。

你说响应消息是在哪儿处理的?

上篇文章专门讲过哈,说不知道的都是假粉丝:

org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody

你看上门代码截图的第 66 行:get request id(获取请求编号)。

从哪里获取?

从 header 中获取。

header 中的请求编号是哪里来的?

发起 request 请求的时候,从 request 对象中取出来写到协议里面的。

request 对象中的请求编号是哪里来的?

通过 AtomicLong 从 0 开始自增来的。

好了,知道这个 id 是怎么来的了,也获取到了。它是在哪里用的呢?

org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean)

标号为 ① 的地方就是根据 response 里面的 id,即调用编号从 FUTURES 这个 MAP 中移除并获取出对应的请求。

如果获取到的请求是 null,说明超时了。

如果获取到的请求不为 null,则判断是否超时了。超时逻辑我们最后再讲。

标号为 ② 地方是要把响应返回给对应的用户线程了。

在 doReceived 里面使用了响应式编程:

这的 this 就是当前类,即 DefaultFuture。

那么这个 doReceived 方法是怎么调到这里的呢?

之前的文章说过 Dubbo 默认的派发策略是 ALL,所以所有的响应都会被派发到客户端线程池里面去,也就是这个地方:

当接收到服务端的响应后,响应事件也会被扔到线程池里面,从代码中可以看到,扔进去的就是一个 Runable 任务。

然后执行了 execute 方法,这个方法就和上一小节讲请求的地方呼应上了。

还记得我们的请求是调用了 queue.take  方法,进入阻塞等待吗?

而这里就是在往 queue 里面添加任务。

队列里面有任务啦!在阻塞等待的用户线程就活过来了!

接下来用户线程怎么执行?

看代码:

取到任务后执行了任务的 run 方法。注意是 run 方法哦,并不会起新的线程。

而这个任务是什么任务?

是 ChannelEventRunnable。看一下这个任务重写的 run 方法:

这不是巧了吗,这不是?

上周的文章也说到了这个方法。

而 handler.received 方法最终就会调用到我们前说的 doReceived 方法:

闭环完成。

所以当用户线程执行完这个 Runable 任务后,继续往下执行:

这里返回的 Result 就是最终的服务端返回的数据了,或者是返回的异常。

现在你再回过头去看官网这张图,应该就能看明白了:

超时检查

 

前面说 newFuture 的时候不是说它还干了一件事就是检测是否超时嘛。其实原理也是很简单:

首先有一个 TimeoutCheckTask 类,这是一个待执行的任务。

触发后会根据调用编号去 FUTURES 里面取 DefaultFuture。

前面我刚刚说了:如果一个 future 正常完成之后,会从 FUTURES 里面移除掉。

那么如果到点了,根据编号没有取到 Future 或者取到的这个 Future 的状态是 done 了,则说明这个请求没有超时。

如果这个 Future 还在 FUTURES 里面,含义就是到点了你咋还在里面呢?那肯定是超时了,调用 notifyTimeout 方法,是否超时参数给 true:

这个 received 方法全局只有两个调用的地方,一个是前面讲的正常返回后的调用,一个就是这里超时之后的调用:

也就是不论怎样,最终都会调用这个 received 方法,最终都会通过这个方法把对应调用编号的 DefaultFuture 对象从 FUTURE 这个 MAP 中移除。

上面这个任务怎么触发呢?

Dubbo 自己搞了个 HashedWheelTimer ,这是什么东西?

时间轮调度算法呀:

你发起一个请求,指定时间内没有返回结果,于是就取消(future.cancel)这个请求。

这个需求不就类似于你下单买个东西,30 分钟还没有支付,于是平台自动给你取消了订单吗?

时间轮,可以解决你这个问题。之前的这篇文章中有介绍:《面试时遇到『看门狗』脖子上挂着『时间轮』,我就问你怕不怕?》

一个 2.7.5 版本关于检查 Dubbo 超时的小知识点,送给大家。

验证编号

 

前面一直在强调,这个调用编号很重要。

所以为了让大家有个更加直观的认识,我截个简单的图,给大家验证一下这个编号确实是贯穿请求和响应的。

首先,改造一下我们的服务端:

当传进来的 name 是指定参数(why-debug)时,直接返回。否则都睡眠 10 秒,目的是让客户端用户线程一直等待响应。

客户端改造如下:

先连续发 40 个请求到服务端,对于这些请求服务端都需要 10 秒的时间才能处理完成。

然后再发生一个特定请求到服务端,能即使返回。并在 39 行打上断点。

首先,看一下 DefaultFuture 里面的调用编号。

没看之前,你先猜一下,当前 debug 的这个请求的调用编号是多少?

是不是 40 号(编号从 0 开始)?

来验证一下:

所以在发送请求的地方,在 header 里面设置调用编号为 40:

然后看一下响应回来之后,对应的调用编号是否是 40:

这样,一个调用编号,串联起了请求和响应。让请求必有回应,让响应必定能找到是哪个请求发起的。

这就是:事事有回音。

 如果觉得本文对你有帮助,可以点赞关注支持一下,也可以关注我公众号,上面有更多技术干货文章以及相关资料共享,大家一起学习进步!