前言

上一次文章我们主要讲了一些模型框架,最后引出了Netty的框架结构,Netty 主要基于主从 Reactor多线程模型做了⼀定的改进,主从 Reactor 多线程模型有多个 Reactor,如下图:

今天会带在大家一起学习上述架构中的一些核心组件,废话不多说了,下面我们进入正题。

Bootstrap、ServerBootstrap

Bootstrap是引导,⼀个Netty应⽤通常由⼀个Bootstrap开始,主要作⽤是配置Netty程序,采用链式编程模式,串联Netty各个组件。

它们和其它组件之间的关系是它们将 Netty 的其它组件进⾏组装和配置,所以它们会组合和 直接或间接依赖其它的类;如果熟悉Spring Boot 的同学也可认为其实就是程序的启动的入口。

那Bootstrap和ServerBootstrap 有什么区别呢?

其实从名字就可以区分开来,Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。

伪代码:

            ServerBootstrap b = new ServerBootstrap();
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.childOption(ChannelOption.TCP_NODELAY,true);
            b.childOption(ChannelOption.SO_KEEPALIVE,true);
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new HttpHelloWorldServerInitializer());

ChannelFuture

						ServerBootstrap b = new ServerBootstrap();
           Channel ch = b.bind(PORT).sync().channel();

Netty 中所有的 IO 操作都是异步的,不能⽴刻得知消息是否被正确处理,但是可以过⼀会等它执⾏完成或者直接注册⼀个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册⼀个监听,当操作执⾏成功或失败时监听会⾃动触发注册的监听事件。、

常用方法:

Channel channel(),返回当前正在进⾏ IO 操作的通道 
ChannelFuture sync(),等待异步操作执⾏完毕

JDK 中的 Future 接⼝:

public interface Future { 
// 取消该任务 
boolean cancel(boolean mayInterruptIfRunning); 
// 任务是否已取消 
boolean isCancelled(); 
// 任务是否已完成 
boolean isDone();
 // 阻塞获取任务执⾏结果 
V get() throws InterruptedException, ExecutionException; 
// 带超时参数的获取任务执⾏结果
 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

Netty 中的 Future 接⼝(同名)继承了 JDK 中的 Future 接⼝,然后添加了⼀些⽅法:

public interface Future extends java.util.concurrent.Future {
// 是否成功 
boolean isSuccess();
// 是否可取消
 boolean isCancellable();
 // 如果任务执⾏失败,这个⽅法返回异常信息 
Throwable cause(); 
// 添加 Listener 来进⾏回调 
Future<V> addListener(GenericFutureListener<? extends Future<? super V>>
listener);

  Future<V> addListeners(GenericFutureListener<? extends Future<? super
V>>... listeners);
 
  Future<V> removeListener(GenericFutureListener<? extends Future<? super
V>> listener);
  
  Future<V> removeListeners(GenericFutureListener<? extends Future<? super
V>>... listeners);
  // 阻塞等待任务结束,如果任务失败,将“导致失败的异常”重新抛出来
 Future<V> sync() throws InterruptedException;
 // 不响应中断的 sync(),这个⼤家应该都很熟了
 Future<V> syncUninterruptibly();
 // 阻塞等待任务结束,和 sync() 功能是⼀样的,不过如果任务失败,它不会抛出执⾏过程中的异
//常
  Future<V> await() throws InterruptedException;
 Future<V> awaitUninterruptibly();
 boolean await(long timeout, TimeUnit unit) throws InterruptedException;
 boolean await(long timeoutMillis) throws InterruptedException;
 boolean awaitUninterruptibly(long timeout, TimeUnit unit);
 boolean awaitUninterruptibly(long timeoutMillis);
  // 获取执⾏结果,不阻塞。我们都知道 java.util.concurrent.Future 中的 get() 是阻塞
的
 V getNow();
 // 取消任务执⾏,如果取消成功,任务会因为 CancellationException 异常⽽导致失败
 // 也就是 isSuccess()==false,同时上⾯的 cause() ⽅法返回
CancellationException 的实例。
 // mayInterruptIfRunning 说的是:是否对正在执⾏该任务的线程进⾏中断(这样才能停⽌该任
务的执⾏),
 // 似乎 Netty 中 Future 接⼝的各个实现类,都没有使⽤这个参数
 @Override
 boolean cancel(boolean mayInterruptIfRunning);
}

这里重要的部分就是sync() 和 await(),Netty 的 Future 接⼝,加了 sync() 和 await() ⽤于阻塞等待,然后加入Listeners,所以只要任务结束后回调 Listener 们就可以了,那么就不需要主动调⽤ isDone() 来获取状态,或者通过 get() 阻塞⽅法来获取值。

sync() 和 await() 有什么区别呢?仔细阅读源码可以看到:

sync()调用wait()

wait()

checkDeadLock()

  • sync() 内部会先调⽤ await() ⽅法,等 await() ⽅法返回后,会检查下任务是否失败, 如果失败,重新将导致失败的异常抛出来。
  • 也就是说,如果使⽤ await(),任务抛出异常后,await() ⽅***返回,但是不会抛出异常,⽽ sync() ⽅法返回的同时会抛出异常。

Selector

Netty基于
java.nio.channels.Selector对象实现IO多路复⽤,通过Selector⼀个线程可以监听多 个连接的Channel事件。当向⼀个Selector中注册Channel后,Selector内部的机制就可以⾃动不断的 Select这些注册的Channel是否有就绪的IO事件(可读、可写、⽹络连接完成等)。

⼀个NioEventLoop中会有⼀个线程以及⼀个Selector, 这个线程就是我们所说的I/O线程

Channel

Channel 是 Netty ⽹络操作抽象类,使⽤了Facade 模式聚合了⼀组功能,除了包括基本的 I/O 操 作,如 bind、connect、read、write 之外,还包括 Netty 框架相关的⼀些功能,如获取该 Channel 的 EventLoop 。

Facade 模式

外观(Facade)模式的:⼜叫⻔⾯模式,是⼀种通过为多个复杂的⼦系统提供⼀个⼀致的接⼝,使 这些⼦系统更加容易被访问的模式。该模式对外有⼀个统⼀接⼝,外部应⽤程序不⽤关⼼内部⼦系统的 具体的细节,这样会⼤⼤降低应⽤程序的复杂度,提⾼了程序的可维护性。

外观(Facade)模式的结构⽐较简单,主要是定义了⼀个⾼层接⼝。它包含了对各个⼦系统的引 ⽤,客户端可以通过它访问各个⼦系统的功能。   

外观(Facade)模式包含以下主要⻆⾊:

  •  外观(Facade)⻆⾊:为多个⼦系统对外提供⼀个共同的接⼝。
  • ⼦系统(Sub System)⻆⾊:实现系统的部分功能,客户可以通过外观⻆⾊访问它。
  • 客户(Client)⻆⾊:通过⼀个外观⻆⾊访问各个⼦系统的功能。

外观模式代码:

Channel主要功能:

  • 1. ⽹络的读写
  • 2. 客户端发起连接、主动关闭连接
  • 3. 链路关闭
  • 4. 获取通信双⽅的⽹络地址

Channel设计理念

  • 1.Channel 接⼝层,采⽤ Facade 模式进⾏统⼀封装,将⽹络I/O 操作、⽹络I/O 相关联的其他操作 封装起来,统⼀对外提供。
  • 2.Channel 接⼝:⼤⽽全,为SocketChannel 和ServerSocketChannel 提供统⼀试图,由不同⼦类 现实不同的功能,公共功能在抽象⽗类中实现,最⼤程度地实现功能和接⼝的重⽤。
  • 3.具体实现采⽤聚合模式⽽⾮组合模式,将相关的功能类聚合在Channel中,由Channel 统⼀负责 分配和调度,功能实现更加灵活。

常⽤的 Channel 类型:

  • NioSocketChannel,异步的客户端 TCP Socket 连接。
  • NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
  • NioDatagramChannel,异步的 UDP 连接。
  • NioSctpChannel,异步的客户端 Sctp 连接。 N
  • ioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP ⽹络 IO 以及 ⽂件 IO。

ChannelHandler

ChannelHandler属于业务的核⼼接⼝,处理 I/O 事件或拦截 I/O 操作,并将信息转发到其 ChannelPipeline,可以看作为:拦截器或过滤器。

入站和出站

以客户端为主体:客户端发出数据的时候就是出站,服务端为入站。

以服务端为主体:服务端发出数据的时候就是出站,客户端为入站。

我们使用Netty实现业务代码,一般都是处理入站数据。

ChannelHandler下主要是两个⼦接⼝:

1.ChannelInboundHandler(⼊站): 处理输⼊数据和Channel状态类型改变。

  •  适配器: ChannelInboundHandlerAdapter(适配器设计模式)
  •  常⽤的: SimpleChannelInboundHandler

2.ChannelOutboundHandler(出站): 处理输出数据

  •  适配器: ChannelOutboundHandlerAdapter

Channel&ChannelHandler⽣命周期

开启连接: handlerAdded -> channelRegister -> channelActive -> channelRead -> channelReadComplete

  • handlerAdded 指的是当检测到新连接之后,调⽤ ch.pipeline().addLast(new ***Handler()); 之后的回调,表示在当前的 channel 中,已经成功添加⼀个 handler 处理器。
  • channelRegistered 表示当前的 channel 所有的逻辑处理已经和线程建⽴了绑定关系,类似 socket.accept 到新的连接,然后创建⼀个线程来处理这条连接的读写,Netty ⾥⾯是使⽤了线程 池的⽅式, 只需要从线程池⾥⾯去抓⼀个线程绑定在这个 channel 上。
  • channelActive 当 channel 所有的业务逻辑链准备完毕(也就是说 channel 的 pipeline 中已经 添加完所有的 handler)以及绑定好⼀个 NIO 线程之后,这条连接算是真正激活了。
  •  channelRead
  • channelReadComplete

关闭连接: channelInactive -> channelUnregistered -> handlerRemoved

  • channelInactive 表⾯这条连接已经被关闭,这条连接在 TCP 层⾯已经不再是 ESTABLISH 状态了
  • channelUnregistered 连接已经被关闭,那么与这条连接绑定的线程就不需要对这条连接负责了, 表明与这条连接对应的 NIO 线程移除掉对这条连接的处理
  • handlerRemoved 给这条连接上添加的所有的业务逻辑处理器都给移除掉

ChannelHandlerContext

保存 Channel 相关的上下⽂信息,同时关联⼀个 ChannelHandler 对象。

ChannelHandlerContext 中包含⼀个具体的事件处理器 ChannelHandler,同时 ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,⽅便对 ChannelHandler 进⾏调⽤。

每个ChannelHandlerContext之间形成双向链表,DefaultChannelPipeline中保存第⼀个ChannelHandlerContext及最后⼀个 ChannelHandlerContext的引⽤,伪代码:

public class DefaultChannelPipeline implements ChannelPipeline {
 ..... 
   final AbstractChannelHandlerContext head; 
    final AbstractChannelHandlerContext tail; 
.... 
}


AbstractChannelHandlerContext中可以看到

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
       ........... 
      volatile AbstractChannelHandlerContext next; 
      volatile AbstractChannelHandlerContext prev; 
      .... 
}

双向链表(double linked list)

注意:双向链表并不是在channel上实现的,是在Context上实现的,Context包含channel。

ChannelPipeline

ChannelPipeline 是⼀个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和 操作,相当于⼀个贯穿 Channel 的链。如下图:

⼊站事件和出站事件在⼀个双向链表中,⼊站事件会从链表head往后传递到最后⼀个⼊站的 handler,出站事件会从链表tail往前传递到最前⼀个出站的handler,两种类型的handler互不⼲扰。

在Channel创建的时候,会同时创建ChannelPipeline。

Channel ChannelPipeline

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { 
..... 
private final DefaultChannelPipeline pipeline;
			 ..... 
          protected AbstractChannel(Channel parent) 
          { 
                  this.parent = parent; 
                  id = newId(); 
                  unsafe = newUnsafe(); 
                  pipeline = newChannelPipeline(); 
          } 
		.... 
}

在ChannelPipeline中也会持有Channel的引⽤

public class DefaultChannelPipeline implements ChannelPipeline{
        .... 
        private final Channel channel; 
        .... 
            protected DefaultChannelPipeline(Channel channel) { 
                this.channel = ObjectUtil.checkNotNull(channel, "channel"); 
                succeededFuture = new SucceededChannelFuture(channel, null); 
                voidPromise = new VoidChannelPromise(channel, true); 
                tail = new TailContext(this); 
                head = new HeadContext(this);
                 head.next = tail; tail.prev = head; 
        }
         ....
 }

这里是不是感觉挺乱,看一下以下代码实例:

打印结果:

从结果上可以看出Channel和Pipeline其实存在上下文关系。

⼊站事件和出站事件

pipeline保存通道所有的处理器信息,在创建⼀个channel的时候,会创建⼀个这个channel专有的 pipeline,⼊站事件和出站事件都会调⽤这个pipeline上⾯的处理器。

伪代码:

ChannelPipeline p = ...;
 p.addLast("1", new InboundHandlerA()); 
p.addLast("2", new InboundHandlerB());
 p.addLast("3", new OutboundHandlerA()); 
p.addLast("4", new OutboundHandlerB()); 
p.addLast("5", new InboundOutboundHandlerX());

结果:

对于inbound事件传播,顺序是1,2,5,跟添加顺序相同。 
对于outbound事件传播,顺序是5,4,3,跟添加顺序相反。

重点记住: InboundHandler顺序执⾏,OutboundHandler逆序执⾏。

ChannelOption

hannelOption定义了对⼀个Channel的各种属性配置选项,包括了各种底层连接的详细信息,如 keep-alive或者超时属性以及缓冲区的设置等。

EventLoopGroup 和 NioEventLoopGroup

ServerSocketChannel 与 SocketChannel

EventLoopGroup 是⼀组 EventLoop 的抽象,Netty 为了更好的利⽤多核 CPU 资源,⼀般会有多 个 EventLoop 同时⼯作,每个 EventLoop 维护着⼀个 Selector 实例。

EventLoopGroup 提供 next 接⼝,可以从组⾥⾯按照⼀定规则获取其中⼀个 EventLoop 来处理任 务。在 Netty 服务器端编程中,⼀般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。

通常⼀个服务端⼝即⼀个 ServerSocketChannel 对应⼀个 Selector 和⼀个 EventLoop 线程。 BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进⾏ IO 处理

BossEventLoopGroup通常是⼀个单线程的EventLoop,EventLoop维护着⼀个注册了 ServerSocketChannel的Selector实例BossEventLoop不断轮询Selector将连接事件分离出来。

通常是OP_ACCEPT事件,然后将收到SocketChannel交给WokerEventLoopGroup。

WokerEventLoopGroup会有next选择其中⼀个EventLoop来将这个SocketChannel注册到其维护 的Selector并对其后续的IO事件进⾏处理。

EventLoop与Channel

任务执⾏

 关于EventLoop以及EventLoopGroup的映射关系为:

  •  ⼀个EventLoopGroup 包含⼀个或者多个EventLoop;
  • ⼀个EventLoop 在它的⽣命周期内只和⼀个Thread 绑定; 所有由EventLoop 处理的I/O 事件都将在它专有的Thread 上被处理;
  • ⼀个Channel 在它的⽣命周期内只注册于⼀个EventLoop;
  • ⼀个EventLoop 可能会被分配给⼀个或多个Channel。

当⼀个连接到达时,Netty 就会注册⼀个 Channel,然后从 EventLoopGroup 中分配⼀个 EventLoop 绑定到这个Channel上,在该Channel的整个⽣命周期中都是有这个绑定的 EventLoop 来服 务的。

ByteBuf

Netty 提供⼀个专⻔⽤来操作缓冲区的⼯具类Unpooled。

常⽤⽅法伪代码:

/** 通过给定的数据和字符编码返回⼀个ByteBuf 对象(类似NIO中的ByteBuffer,但是有区别)1.10.2 ByteBuf readerIndex(读指针) writerIndex(写指针) maxCapacity(最⼤容量) **/ 
public static ByteBuf copiedBuffer(CharSequence string, Charset charset) { 
        ObjectUtil.checkNotNull(string, "string"); 
        if (CharsetUtil.UTF_8.equals(charset)) { 
                return copiedBufferUtf8(string); 
        } 
        if (CharsetUtil.US_ASCII.equals(charset)) { 
                return copiedBufferAscii(string);
         }
         if (string instanceof CharBuffer) { 
                return copiedBuffer((CharBuffer) string, charset); 
        } 
        return copiedBuffer(CharBuffer.wrap(string), charset); 
}

Netty中实现

Netty中也是通过指针实现,但是与java nio中不同的是采用了又指针。

  • readerIndex(读指针)
  • writerIndex(写指针)
  • maxCapacity(最⼤容量)

源码备注图:

调⽤discardReadBytes⽅法后,readIndex置为0,writeIndex也往前移动了Discardable bytes⻓ 度的距离,扩⼤了可写区域。但是这种做***严重影响效率,它进⾏了⼤量的拷⻉⼯作。

如果要进⾏数据的清除操作,建议使⽤clear⽅法,调⽤clear()⽅法将会将readIndex和writeIndex 同时置为0,不会进⾏内存的拷⻉⼯作,同时要注意,clear⽅法不会清除内存中的内容,只是改变了索引 位置⽽已