1 启动过程源码剖析

1.1 说明:

源码需要剖析到Netty 调用doBind方法, 追踪到 NioServerSocketChannel的doBind
并且要Debug 程序到 NioEventLoop类 的run代码 ,无限循环,在服务器端运行

1.2 Netty启动过程梳理

  1. 创建2个 EventLoopGroup 线程池数组。数组默认大小CPU*2,方便chooser选择线程池时提高性能
  2. BootStrap 将 boss 设置为 group属性,将 worker 设置为 childer 属性
  3. 通过 bind 方法启动,内部重要方法为 initAndRegister 和 dobind 方法
    4 . initAndRegister 方***反射创建 NioServerSocketChannel 及其相关的 NIO 的对象, pipeline , unsafe,同时也为 pipeline 初始了 head 节点和 tail 节点。
  4. 在register0 方法成功以后调用在 dobind 方法中调用 doBind0 方法,该方*** 调用 NioServerSocketChannel 的 doBind 方法对 JDK 的 channel 和端口进行绑定,完成 Netty 服务器的所有启动,并开始监听连接事件

2 Netty 接受请求过程源码剖析

2.1 源码剖析目的

- 服务器启动后肯定是要接受客户端请求并返回客户端想要的信息的,下面源码分析 Netty 在启动之后是如何接受客户端请求的
- 在 io.netty.example 包下

2.2 源码剖析

说明:

  1. 从之前服务器启动的源码中,我们得知,服务器最终注册了一个 Accept 事件等待客户端的连接。我们也知道,NioServerSocketChannel 将自己注册到了 boss 单例线程池(reactor 线程)上,也就是 EventLoop 。
  2. 先简单说下EventLoop的逻辑(后面我们详细讲解EventLoop)
    • EventLoop 的作用是一个死循环,而这个循环中做3件事情:
    • 有条件的等待 Nio 事件。
    • 处理 Nio 事件。
    • 处理消息队列中的任务。
  3. 仍用前面的项目来分析:进入到 NioEventLoop 源码中后,在private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法开始调试
  4. 最终我们要分析到AbstractNioChannel 的 doBeginRead 方法, 当到这个方法时,针对于这个客户端的连接就完成了,接下来就可以监听读事件了

2.3 Netty接受请求过程梳理

总体流程:接受连接----->创建一个新的NioSocketChannel----------->注册到一个 worker EventLoop 上--------> 注册selecot Read 事件。

  1. 服务器轮询 Accept 事件,获取事件后调用 unsafe 的 read 方法,这个 unsafe 是 ServerSocket 的内部类,该方法内部由2部分组成
  2. doReadMessages 用于创建 NioSocketChannel 对象,该对象包装 JDK 的 Nio Channel 客户端。该方***像创建 ServerSocketChanel 类似创建相关的 pipeline , unsafe,config
  3. 随后执行 执行 pipeline.fireChannelRead 方法,并将自己绑定到一个 chooser 选择器选择的 workerGroup 中的一个 EventLoop。并且注册一个0,表示注册成功,但并没有注册读(1)事件

3 Pipeline Handler HandlerContext创建源码剖析

3.1 创建过程梳理

  1. 每当创建 ChannelSocket 的时候都会创建一个绑定的 1. ChannelPipeline,一对一的关系,创建 pipeline 的时候也会创建 tail 节点和 head 节点,形成最初的链表。
  2. 在调用 1. ChannelPipeline 的 addLast 方法的时候,会根据给定的 ChannelHandler创建一个 ChannelHandlerContext,然后,将这个 ChannelHandlerContext插入到链表的尾端(tail 前面)。
  3. ChannelHandlerContext包装ChannelHandler,多个 Context 在 pipeline 中形成了双向链表
  4. 入站方向叫 inbound,由 head 节点开始,出站方法叫 outbound ,由 tail 节点开始

3.2 源码剖析目的

  1. 当一个请求进来的时候,ChannelPipeline 是如何调用内部的这些 handler 的呢?我们一起来分析下。
  2. 首先,当一个请求进来的时候,会第一个调用 pipeline 的 相关方法,如果是入站事件,这些方法由 fire 开头,表示开始管道的流动。让后面的 handler 继续处理

3.3 源码剖析

说明

  1. 当浏览器输入 http://localhost:8007。可以看到会执行handler
  2. 在Debug时,可以将断点下在 DefaultChannelPipeline 类的
  public final ChannelPipeline fireChannelActive() {
   
 	 AbstractChannelHandlerContext.invokeChannelActive(head); //断点 
 	 return this;
  }

三、Pipeline Handler HandlerContext创建源码剖析

1、ChannelPipeline | ChannelHandler | ChannelHandlerContext介绍

1.1 三者关系

  1. 每当 ServerSocket 创建一个新的连接,就会创建一个 Socket,对应的就是目标客户端。

  2. 每一个新创建的 Socket 都将会分配一个全新的 ChannelPipeline(以下简称 pipeline)

  3. 每一个 ChannelPipeline 内部都含有多个 ChannelHandlerContext(以下简称 Context)

  4. 他们一起组成了双向链表,这些 Context 用于包装我们调用 addLast 方法时添加的 ChannelHandler(以下简称 handler)

  1. 上图中:ChannelSocket 和 ChannelPipeline 是一对一的关联关系,而 pipeline 内部的多个 Context 形成了链表,Context只是对Handle的封装

  2. 当一个请求进来的时候,会进入 Socket 对应的 pipeline,并经过 pipeline 所有的 handler,对,就是设计模式中的过滤器模式。

1.2 ChannelPipeline 作用及设计

  1. pipeline 的接口设计

部分源码

可以看到该接口继承了 inBound,outBound,Iterable 接口,表示他可以调用数据出站的方法和入站的方法,同时也能遍历内部的链表, 看看他的几个代表性的方法,基本上都是针对 handler 链表的插入,追加,删除,替换操作,类似是一个 LinkedList。同时,也能返回 channel(也就是 socket)

  1. 在 pipeline 的接口文档上,提供了一幅图

对上图的解释说明

  • 这是一个 handler 的 list,handler 用于处理或拦截入站事件和出站事件,pipeline 实现了过滤器的高级形式,以便用户控制事件如何处理以及 handler 在 pipeline 中如何交互。

  • 上图描述了一个典型的 handler 在 pipeline 中处理 I/O 事件的方式,IO 事件由 inboundHandler 或者 outBoundHandler 处理,并通过调用 ChannelHandlerContext.fireChannelRead 方法转发给其最近的处理程序 。

  • 入站事件由入站处理程序以自下而上的方向处理,如图所示。入站处理程序通常处理由图底部的I / O线程生成入站数据。入站数据通常从如 SocketChannel.read(ByteBuffer) 获取。

  • 通常一个 pipeline 有多个 handler,例如,一个典型的服务器在每个通道的管道中都会有以下处理程序

    • 协议解码器 - 将二进制数据转换为Java对象。

    • 协议编码器 - 将Java对象转换为二进制数据。

  • 业务逻辑处理程序 - 执行实际业务逻辑(例如数据库访问)

    • 你的业务程序不能将线程阻塞,会影响 IO 的速度,进而影响整个 Netty 程序的性能。如果你的业务程序很快,就可以放在 IO 线程中,反之,你需要异步执行。或者在添加 handler 的时候添加一个线程池,例如:
// 下面这个任务执行的时候,将不会阻塞 IO 线程,执行的线程来自 group 线程池
 pipeline.addLast(group,“handler”,new MyBusinessLogicHandler());
//或者在handler中使用任务队列的方式异步执行任务

1.3 ChannelHandler 作用及设计

  1. 源码
public interface ChannelHandler {
   

 //当把 ChannelHandler 添加到 pipeline 时被调用

 void handlerAdded(ChannelHandlerContext ctx) throws Exception;

//当从 pipeline 中移除时调用

 void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

// 当处理过程中在 pipeline 发生异常时调用

@Deprecated

  void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

}

  1. ChannelHandler 的作用就是处理 IO 事件或拦截 IO 事件,并将其转发给下一个处理程序 ChannelHandler。

Handler 处理事件时分入站和出站的,两个方向的操作都是不同的,因此,Netty 定义了两个子接口继承 ChannelHandler

  1. ChannelInboundHandler 入站事件接口

  • channelActive 用于当 Channel 处于活动状态时被调用;

  • channelRead 当从Channel 读取数据时被调用等等方法。

  • 程序员需要重写一些方法,当发生关注的事件,需要在方法中实现我们的业务逻辑,因为当事件发生时,Netty 会回调对应的方法。

  1. ChannelOutboundHandler 出站事件接口

  • bind 方法,当请求将 Channel 绑定到本地地址时调用

  • close 方法,当请求关闭 Channel 时调用等等

  • 出站操作都是一些连接和写出数据类似的方法。

  1. ChannelDuplexHandler 处理出站和入站事件

ChannelDuplexHandler 间接实现了入站接口并直接实现了出站接口。

是一个通用的能够同时处理入站事件和出站事件的类。

1.4 ChannelHandlerContext 作用及设计

  1. ChannelHandlerContext UML图

ChannelHandlerContext 继承了出站方法调用接口和入站方法调用接口

  1. ChannelOutboundInvoker 和 ChannelInboundInvoker 部分源码

这两个 invoker 就是针对入站或出站方法来的,就是在 入站或出站 handler 的外层再包装一层,达到在方法前后拦截并做一些特定操作的目的

  1. ChannelHandlerContext部分源码
  • ChannelHandlerContext 不仅仅时继承了他们两个的方法,同时也定义了一些自己的方法

  • 这些方法能够获取 Context 上下文环境中对应的比如 channel,executor,handler ,pipeline,内存分配器,关联的 handler 是否被删除。

  • Context 就是包装了 handler 相关的一切,以方便 Context 可以在 pipeline 方便的操作 handler

  1. ChannelPipeline | ChannelHandler | ChannelHandlerContext 创建过

分为3个步骤来看创建的过程:

  1. 任何一个 ChannelSocket 创建的同时都会创建 一个 pipeline。
  2. 当用户或系统内部调用 pipeline 的 add*** 方法添加 handler 时,都会创建一个包装这 handler 的 Context。
  3. 这些 Context 在 pipeline 中组成了双向链表。

2.1 Socket 创建的时候创建 pipeline

在 SocketChannel 的抽象父类 AbstractChannel 的构造方法中

protected AbstractChannel(Channel parent) {
   
   this.parent = parent; //断点测试
   id = newId();
   unsafe = newUnsafe();
   pipeline = newChannelPipeline(); 
 }

//Debug 一下, 可以看到代码会执行到这里, 然后继续追踪到

protected DefaultChannelPipeline(Channel channel) {
   
   this.channel = ObjectUtil.checkNotNull(channel, "channel");
   succeededFuture = new SucceededChannelFuture(channel, null);
   voidPromise = new VoidChannelPromise(channel, true);
   tail = new TailContext(this);
   head = new HeadContext(this);
   head.next = tail;
   tail.prev = head;
 }

说明:

1)将 channel 赋值给 channel 字段,用于 pipeline 操作 channel。

2)创建一个 future 和 promise,用于异步回调使用。

3)创建一个 inbound 的 tailContext,创建一个既是 inbound 类型又是 outbound 类型的 headContext.

4)最后,将两个 Context 互相连接,形成双向链表

5)tailContext 和 HeadContext 非常的重要,所有 pipeline 中的事件都会流经他们,

2.2 在 add 添加处理器的时候创建 Context

看下 DefaultChannelPipeline 的 addLast 方法如何创建的 Context,代码如下

  @Override
  public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
   
    if (handlers == null) {
    //断点
      throw new NullPointerException("handlers");
    }

    for (ChannelHandler h: handlers) {
   
       if (h == null) {
   
        break;
      }
      addLast(executor, null, h);
    }
    return this;
  }

//继续Debug

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
   

    final AbstractChannelHandlerContext newCtx;

    synchronized (this) {
   

      checkMultiplicity(handler);
      newCtx = newContext(group, filterName(name, handler), handler);
      addLast0(newCtx);

      // If the registered is false it means that the channel was not registered on an eventloop yet.
      // In this case we add the context to the pipeline and add a task that will call
      // ChannelHandler.handlerAdded(...) once the channel is registered.
      if (!registered) {
   
        newCtx.setAddPending();
        callHandlerCallbackLater(newCtx, true);
        return this;
      }

 
      EventExecutor executor = newCtx.executor();
      if (!executor.inEventLoop()) {
   
        newCtx.setAddPending();
        executor.execute(new Runnable() {
   
          @Override
          public void run() {
   
            callHandlerAdded0(newCtx);
          }
        });
        return this;
      }
    }
    callHandlerAdded0(newCtx);
    return this;
  }

说明

  1. pipeline 添加 handler,参数是线程池,name 是null, handler 是我们或者系统传入的handler。Netty 为了防止多个线程导致安全问题,同步了这段代码,步骤如下:

  2. 检查这个 handler 实例是否是共享的,如果不是,并且已经被别的 pipeline 使用了,则抛出异常。

  3. 调用 newContext(group, filterName(name, handler), handler) 方法,创建一个 Context**。从这里可以看出来了,每次添加一个** handler 都会创建一个关联 Context

  4. 调用 addLast 方法,将 Context 追加到链表中

  5. 如果这个通道还没有注册到 selecor 上,就将这个 Context 添加到这个 pipeline 的待办任务中。当注册好了以后,就会调用 callHandlerAdded0 方法(默认是什么都不做,用户可以实现这个方法)。

  6. 到这里,针对三对象创建过程,了解的差不多了,和最初说的一样,每当创建 ChannelSocket 的时候都会创建一个绑定的 pipeline,一对一的关系,创建 pipeline 的时候也会创建 tail 节点和 head 节点,形成最初的链表。tail 是入站 inbound 类型的 handler, head 既是 inbound 也是 outbound 类型的 handler。在调用 pipeline 的 addLast 方法的时候,会根据给定的 handler 创建一个 Context,然后,将这个 Context 插入到链表的尾端(tail 前面)。到此就 OK了

3.4 ChannelPipeline 调度 handler 梳理

handler请参考:六、Netty的Handler

  1. Context 包装 handler,多个 Context 在 pipeline 中形成了双向链表,入站方向叫 inbound,由 head 节点开始,出站方法叫 outbound ,由 tail 节点开始。
  2. 而节点中间的传递通过 AbstractChannelHandlerContext 类内部的 fire 系列方法,找到当前节点的下一个节点不断的循环传播。是一个过滤器形式完成对handler 的调度