http://blog.csdn.net/allwefantasy/article/details/50663533

 

前言

Kafka 我个人感觉是性能优化的典范。而且使用Scala开发,代码写的也很漂亮的。重点我觉得有四个

  • NIO
  • Zero Copy
  • 磁盘顺序读写
  • Queue数据结构的极致使用

Zero-Copy 实际的原理,大家还是去Google下。这篇文章重点会分析这项技术是怎么被嵌入到Kafa里的。包含两部分:

  1. Kafka在什么场景下用了这个技术
  2. Zero-Copy 是如何被调用,并且发挥作用的。

Kafka在什么场景下使用该技术

答案是:

消息消费的时候

包括外部Consumer以及Follower 从partiton Leader同步数据,都是如此。简单描述就是:

Consumer从Broker获取文件数据的时候,直接通过下面的方法进行channel到channel的数据传输。

<code class="hljs axapta has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">java.nio.FileChannel.transferTo(
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> position, 
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">count</span>,                                
WritableByteChannel target)`</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul>

也就是说你的数据源是一个Channel,数据接收端也是一个Channel(SocketChannel),则通过该方式进行数据传输,是直接在内核态进行的,避免拷贝数据导致的内核态和用户态的多次切换。

Kafka 如何使用Zero-Copy流程分析

估计看完这段内容,你对整个Kafka的数据处理流程也差不多了解了个大概。为了避免过于繁杂,以至于将整个Kafka的体系都拖进来,我们起始点从KafkaApis相关的类开始。

数据的生成

对应的类名称为:

kaka.server.KafkaApis

该类是负责真正的Kafka业务逻辑处理的。在此之前的,譬如 SocketServer等类似Tomcat服务器一样,侧重于交互,属于框架层次的东西。KafkaApis 则类似于部署在Tomcat里的应用。

<code class="hljs vbscript has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">def handle(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">request</span>: RequestChannel.<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">Request</span>) {
       ApiKeys.forId(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">request</span>.requestId) match {
        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">case</span> ApiKeys.PRODUCE => handleProducerRequest(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">request</span>)
        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">case</span> ApiKeys.FETCH => handleFetchRequest(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">request</span>)
        .....</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li></ul>

handle 方法是所有处理的入口,然后根据请求的不同,有不同的处理逻辑。这里我们关注ApiKeys.FETCH这块,也就是有消费者要获取数据的逻辑。进入 handleFetchRequest方法,你会看到最后一行代码如下:

<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">replicaManager<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.fetchMessages</span>(  
       fetchRequest<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.maxWait</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.toLong</span>, 
      fetchRequest<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.replicaId</span>, 
      fetchRequest<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.minBytes</span>,  
      authorizedRequestInfo,  
      sendResponseCallback)
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li></ul>

ReplicaManager 包含所有主题的所有partition消息。大部分针对Partition的操作都是通过该类来完成的。

replicaManager.fetchMessages 这个方法非常的长。我们只关注一句代码:

<code class="hljs bash has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">val logReadResults = <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">read</span>FromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>

该方法获取本地日志信息数据。内部会调用kafka.cluster.Log对象的read方法:

<code class="hljs livecodeserver has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;"><span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">log</span>.<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">read</span>(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">offset</span>, fetchSize, maxOffsetOpt)</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>

Log 对象是啥呢?其实就是对应的一个Topic的Partition. 一个Partition是由很多端(Segment)组成的,这和Lucene非常相似。一个Segment就是一个文件。实际的数据自然是从这里读到的。代码如下:

<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">val fetchInfo = entry<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getValue</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.read</span>(startOffset, maxOffset, maxLength, maxPosition)</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>

这里的fetchInfo(FetchDataInfo)对象包含两个字段:

  • offsetMetadata
  • FileMessageSet

FileMessageSet 其实就是用户在这个Partition这一次消费能够拿到的数据集合。当然,真实的数据还躺在byteBuffer里,并没有记在到内存中。FileMessageSet 里面包含了一个很重要的方法:

<code class="hljs python has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;"><span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">def</span> <span class="hljs-title" style="box-sizing: border-box;">writeTo</span><span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(destChannel: GatheringByteChannel, writePosition: Long, size: Int)</span>:</span> Int = {
    ......

    val bytesTransferred = (destChannel match {
      case tl: TransportLayer => tl.transferFrom(channel, position, count)
      case dc => channel.transferTo(position, count, dc)
    }).toInt

    bytesTransferred
  }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul>

这里我们看到了久违的transferFrom方法。那么这个方法什么时候被调用呢?我们先搁置下,因为那个是另外一个流程。我们继续分析上面的代码。也就是接着从这段代码开始分析:

<code class="hljs bash has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">val logReadResults = <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">read</span>FromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>

获取到这个信息后,会执行如下操作:

<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">val fetchPartitionData = logReadResults<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.mapValues</span>(result =>  FetchResponsePartitionData(result<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.errorCode</span>, result<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.hw</span>, result<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.info</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.messageSet</span>))
responseCallback(fetchPartitionData)</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li></ul>

logReadResults 的信息被包装成FetchResponsePartitionData, FetchResponsePartitionData 包喊了我们的FileMessageSet 对象。还记得么,这个对象包含了我们要跟踪的tranferTo方法。然后FetchResponsePartitionData 会给responseCallback作为参数进行回调。

responseCallback 的函数签名如下(我去掉了一些我们不关心的信息):

<code class="hljs vbscript has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus

      def fetchResponseCallback(delayTimeMs: <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">Int</span>) {
        val <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">response</span> = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)
        requestChannel.sendResponse(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> RequestChannel.<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">Response</span>(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">request</span>, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> FetchResponseSend(<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">request</span>.connectionId, <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">response</span>)))
      }

    }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li></ul>

我们重点关注这个回调方法里的fetchResponseCallback。 我们会发现这里 FetchResponsePartitionData 会被封装成一个FetchResponseSend ,然后由requestChannel发送出去。

因为Kafka完全应用是NIO的异步机制,所以到这里,我们无法再跟进去了,需要从另外一部分开始分析。

数据的发送

前面只是涉及到数据的获取。读取日志,并且获得对应MessageSet对象。MessageSet 是一段数据的集合,但是该数据没有真实的被加载。 
这里会涉及到Kafka 如何将数据发送回Consumer端。

在SocketServer,也就是负责和所有的消费者打交道,建立连接的中枢里,会不断的进行poll操作

<code class="hljs scss has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">override def <span class="hljs-function" style="box-sizing: border-box;">run()</span> {
    <span class="hljs-function" style="box-sizing: border-box;">startupComplete()</span>
    <span class="hljs-function" style="box-sizing: border-box;">while(isRunning)</span> {
      try {
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// setup any new connections that have been queued up</span>
        <span class="hljs-function" style="box-sizing: border-box;">configureNewConnections()</span>
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// register any new responses for writing</span>
        <span class="hljs-function" style="box-sizing: border-box;">processNewResponses()</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li></ul>

首先会注册新的连接,如果有的话。接着就是处理新的响应了。还记得刚刚上面我们通过requestChannelFetchResponseSend发出来吧。

<code class="hljs cs has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> def <span class="hljs-title" style="box-sizing: border-box;">processNewResponses</span>() {
    <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">var</span> curr = requestChannel.receiveResponse(id)
    <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">while</span>(curr != <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">null</span>) {
      <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">try</span> {
        curr.responseAction match {         
          <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">case</span> RequestChannel.SendAction =>
            selector.send(curr.responseSend)
            inflightResponses += (curr.request.connectionId -> curr)

        }
      } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">finally</span> {
        curr = requestChannel.receiveResponse(id)
      }
    }
  }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li></ul>

这里类似的,processNewResponses方***先通过send方法把FetchResponseSend注册到selector上。 这个操作其实做的事情如下:

<code class="hljs cs has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;"><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//SocketServer.scala    </span>
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">send</span>(Send send) {
        KafkaChannel channel = channelOrFail(send.destination());
        channel.setSend(send);
    }

<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//KafkaChannel.scala</span>
   <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">setSend</span>(Send send) {
         <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.send = send;          <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.transportLayer.addInterestOps(SelectionKey.OP_WRITE);     
    }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul>

为了方便看代码,我对代码做了改写。我们看到,其实send就是做了一个WRITE时间注册。这个是和NIO机制相关的。如果大家看的有障碍,不妨先学习下相关的机制。

回到 SocketServer 的run方法里,也就是上面已经贴过的代码:

<code class="hljs scala has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">  <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">override</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">def</span> run() {
    startupComplete()
    <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">while</span>(isRunning) {
      <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">try</span> {
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// setup any new connections that have been queued up</span>
        configureNewConnections()
        <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// register any new responses for writing</span>
        processNewResponses()

        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">try</span> {
          selector.poll(<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">300</span>)
        } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">catch</span> {
          <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">case</span>...
        }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li></ul>

SocketServer 会poll队列,一旦对应的KafkaChannel 写操作ready了,就会调用KafkaChannel的write方法:

<code class="hljs java has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;"><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//KafkaChannel.scala</span>
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> Send <span class="hljs-title" style="box-sizing: border-box;">write</span>() <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> IOException {
        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (send != <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">null</span> && send(send)) 
    }
<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//</span>
<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//KafkaChannel.scala</span>
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">boolean</span> <span class="hljs-title" style="box-sizing: border-box;">send</span>(Send send) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> IOException {
        send.writeTo(transportLayer);
        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> send.completed();
    }
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li></ul>

依然的,为了减少代码,我做了些调整,其中write会调用 send方法,对应的Send对象其实就是上面我们注册的FetchResponseSend 对象。

这段代码里真实发送数据的代码是send.writeTo(transportLayer);

对应的writeTo方法为:

<code class="hljs python has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {
    case(topic, data) => new TopicDataSend(dest, TopicData(topic,
                                                     data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)}))
    }))
override <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">def</span> <span class="hljs-title" style="box-sizing: border-box;">writeTo</span><span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(channel: GatheringByteChannel)</span>:</span> Long = {
    .....    
     written += sends.writeTo(channel)
    ....
  }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li></ul>

这里我依然做了代码简化,只让我们关注核心的。 这里最后是调用了sends的writeTo方法,而sends 其实是个MultiSend。 
这个MultiSend 里有两个东西:

  • topicAndPartition.partition: 分区
  • message:FetchResponsePartitionData

还记得这个FetchResponsePartitionData 么?我们的MessageSet 就被放在了FetchResponsePartitionData这个对象里。

TopicDataSend 也包含了sends,该sends 包含了 PartitionDataSend,而 PartitionDataSend则包含了FetchResponsePartitionData。

最后进行writeTo的时候,其实是调用了

<code class="hljs fsharp has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;"><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//partitionData 就是 FetchResponsePartitionData</span>
<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//messages 其实就是FileMessageSet</span>
<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul>

如果你还记得的话,FileMessageSet 也有个writeTo方法,就是我们之前已经提到过的那段代码:

<code class="hljs python has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;"><span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">def</span> <span class="hljs-title" style="box-sizing: border-box;">writeTo</span><span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(destChannel: GatheringByteChannel, writePosition: Long, size: Int)</span>:</span> Int = {
    ......

    val bytesTransferred = (destChannel match {
      case tl: TransportLayer => tl.transferFrom(channel, position, count)
      case dc => channel.transferTo(position, count, dc)
    }).toInt

    bytesTransferred
  }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul>

终于走到最底层了,最后其实是通过tl.transferFrom(channel, position, count) 来完成最后的数据发送的。这里你可能比较好奇,不应该是调用transferTo 方法么? transferFrom其实是Kafka自己封装的一个方法,最终里面调用的也是transerTo:

<code class="hljs java has-numbering" style="display: block; padding: 0px; background: transparent; color: inherit; box-sizing: border-box; font-family: "Source Code Pro", monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal;">  <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span>
    <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> <span class="hljs-title" style="box-sizing: border-box;">transferFrom</span>(FileChannel fileChannel, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> position, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> count) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> IOException {
        <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> fileChannel.transferTo(position, count, socketChannel);
    }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right: 1px solid rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul>

总结

Kafka的整个调用栈还是非常绕的。尤其是引入了NIO的事件机制,有点类似Shuffle,把流程调用给切断了,无法简单通过代码引用来进行跟踪。Kafka还有一个非常优秀的机制就是DelayQueue机制,我们在分析的过程中,为了方便,把这块完全给抹掉了。