前言
上一次文章我们主要讲了一些模型框架,最后引出了Netty的框架结构,Netty 主要基于主从 Reactor多线程模型做了⼀定的改进,主从 Reactor 多线程模型有多个 Reactor,如下图:
今天会带在大家一起学习上述架构中的一些核心组件,废话不多说了,下面我们进入正题。
Bootstrap、ServerBootstrap
Bootstrap是引导,⼀个Netty应⽤通常由⼀个Bootstrap开始,主要作⽤是配置Netty程序,采用链式编程模式,串联Netty各个组件。
它们和其它组件之间的关系是它们将 Netty 的其它组件进⾏组装和配置,所以它们会组合和 直接或间接依赖其它的类;如果熟悉Spring Boot 的同学也可认为其实就是程序的启动的入口。
那Bootstrap和ServerBootstrap 有什么区别呢?
其实从名字就可以区分开来,Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。
伪代码:
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.childOption(ChannelOption.TCP_NODELAY,true);
b.childOption(ChannelOption.SO_KEEPALIVE,true);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpHelloWorldServerInitializer());
ChannelFuture
ServerBootstrap b = new ServerBootstrap();
Channel ch = b.bind(PORT).sync().channel();
Netty 中所有的 IO 操作都是异步的,不能⽴刻得知消息是否被正确处理,但是可以过⼀会等它执⾏完成或者直接注册⼀个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册⼀个监听,当操作执⾏成功或失败时监听会⾃动触发注册的监听事件。、
常用方法:
Channel channel(),返回当前正在进⾏ IO 操作的通道
ChannelFuture sync(),等待异步操作执⾏完毕
JDK 中的 Future 接⼝:
public interface Future {
// 取消该任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已取消
boolean isCancelled();
// 任务是否已完成
boolean isDone();
// 阻塞获取任务执⾏结果
V get() throws InterruptedException, ExecutionException;
// 带超时参数的获取任务执⾏结果
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Netty 中的 Future 接⼝(同名)继承了 JDK 中的 Future 接⼝,然后添加了⼀些⽅法:
public interface Future extends java.util.concurrent.Future {
// 是否成功
boolean isSuccess();
// 是否可取消
boolean isCancellable();
// 如果任务执⾏失败,这个⽅法返回异常信息
Throwable cause();
// 添加 Listener 来进⾏回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>>
listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super
V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super
V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super
V>>... listeners);
// 阻塞等待任务结束,如果任务失败,将“导致失败的异常”重新抛出来
Future<V> sync() throws InterruptedException;
// 不响应中断的 sync(),这个⼤家应该都很熟了
Future<V> syncUninterruptibly();
// 阻塞等待任务结束,和 sync() 功能是⼀样的,不过如果任务失败,它不会抛出执⾏过程中的异
//常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 获取执⾏结果,不阻塞。我们都知道 java.util.concurrent.Future 中的 get() 是阻塞
的
V getNow();
// 取消任务执⾏,如果取消成功,任务会因为 CancellationException 异常⽽导致失败
// 也就是 isSuccess()==false,同时上⾯的 cause() ⽅法返回
CancellationException 的实例。
// mayInterruptIfRunning 说的是:是否对正在执⾏该任务的线程进⾏中断(这样才能停⽌该任
务的执⾏),
// 似乎 Netty 中 Future 接⼝的各个实现类,都没有使⽤这个参数
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
这里重要的部分就是sync() 和 await(),Netty 的 Future 接⼝,加了 sync() 和 await() ⽤于阻塞等待,然后加入Listeners,所以只要任务结束后回调 Listener 们就可以了,那么就不需要主动调⽤ isDone() 来获取状态,或者通过 get() 阻塞⽅法来获取值。
sync() 和 await() 有什么区别呢?仔细阅读源码可以看到:
sync()调用wait()
wait()
checkDeadLock()
- sync() 内部会先调⽤ await() ⽅法,等 await() ⽅法返回后,会检查下任务是否失败, 如果失败,重新将导致失败的异常抛出来。
- 也就是说,如果使⽤ await(),任务抛出异常后,await() ⽅***返回,但是不会抛出异常,⽽ sync() ⽅法返回的同时会抛出异常。
Selector
Netty基于
java.nio.channels.Selector对象实现IO多路复⽤,通过Selector⼀个线程可以监听多 个连接的Channel事件。当向⼀个Selector中注册Channel后,Selector内部的机制就可以⾃动不断的 Select这些注册的Channel是否有就绪的IO事件(可读、可写、⽹络连接完成等)。
⼀个NioEventLoop中会有⼀个线程以及⼀个Selector, 这个线程就是我们所说的I/O线程
Channel
Channel 是 Netty ⽹络操作抽象类,使⽤了Facade 模式聚合了⼀组功能,除了包括基本的 I/O 操 作,如 bind、connect、read、write 之外,还包括 Netty 框架相关的⼀些功能,如获取该 Channel 的 EventLoop 。
Facade 模式
外观(Facade)模式的:⼜叫⻔⾯模式,是⼀种通过为多个复杂的⼦系统提供⼀个⼀致的接⼝,使 这些⼦系统更加容易被访问的模式。该模式对外有⼀个统⼀接⼝,外部应⽤程序不⽤关⼼内部⼦系统的 具体的细节,这样会⼤⼤降低应⽤程序的复杂度,提⾼了程序的可维护性。
外观(Facade)模式的结构⽐较简单,主要是定义了⼀个⾼层接⼝。它包含了对各个⼦系统的引 ⽤,客户端可以通过它访问各个⼦系统的功能。
外观(Facade)模式包含以下主要⻆⾊:
- 外观(Facade)⻆⾊:为多个⼦系统对外提供⼀个共同的接⼝。
- ⼦系统(Sub System)⻆⾊:实现系统的部分功能,客户可以通过外观⻆⾊访问它。
- 客户(Client)⻆⾊:通过⼀个外观⻆⾊访问各个⼦系统的功能。
外观模式代码:
Channel主要功能:
- 1. ⽹络的读写
- 2. 客户端发起连接、主动关闭连接
- 3. 链路关闭
- 4. 获取通信双⽅的⽹络地址
Channel设计理念
- 1.Channel 接⼝层,采⽤ Facade 模式进⾏统⼀封装,将⽹络I/O 操作、⽹络I/O 相关联的其他操作 封装起来,统⼀对外提供。
- 2.Channel 接⼝:⼤⽽全,为SocketChannel 和ServerSocketChannel 提供统⼀试图,由不同⼦类 现实不同的功能,公共功能在抽象⽗类中实现,最⼤程度地实现功能和接⼝的重⽤。
- 3.具体实现采⽤聚合模式⽽⾮组合模式,将相关的功能类聚合在Channel中,由Channel 统⼀负责 分配和调度,功能实现更加灵活。
常⽤的 Channel 类型:
- NioSocketChannel,异步的客户端 TCP Socket 连接。
- NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
- NioDatagramChannel,异步的 UDP 连接。
- NioSctpChannel,异步的客户端 Sctp 连接。 N
- ioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP ⽹络 IO 以及 ⽂件 IO。
ChannelHandler
ChannelHandler属于业务的核⼼接⼝,处理 I/O 事件或拦截 I/O 操作,并将信息转发到其 ChannelPipeline,可以看作为:拦截器或过滤器。
入站和出站
以客户端为主体:客户端发出数据的时候就是出站,服务端为入站。
以服务端为主体:服务端发出数据的时候就是出站,客户端为入站。
我们使用Netty实现业务代码,一般都是处理入站数据。
ChannelHandler下主要是两个⼦接⼝:
1.ChannelInboundHandler(⼊站): 处理输⼊数据和Channel状态类型改变。
- 适配器: ChannelInboundHandlerAdapter(适配器设计模式)
- 常⽤的: SimpleChannelInboundHandler
2.ChannelOutboundHandler(出站): 处理输出数据
- 适配器: ChannelOutboundHandlerAdapter
Channel&ChannelHandler⽣命周期
开启连接: handlerAdded -> channelRegister -> channelActive -> channelRead -> channelReadComplete
- handlerAdded 指的是当检测到新连接之后,调⽤ ch.pipeline().addLast(new ***Handler()); 之后的回调,表示在当前的 channel 中,已经成功添加⼀个 handler 处理器。
- channelRegistered 表示当前的 channel 所有的逻辑处理已经和线程建⽴了绑定关系,类似 socket.accept 到新的连接,然后创建⼀个线程来处理这条连接的读写,Netty ⾥⾯是使⽤了线程 池的⽅式, 只需要从线程池⾥⾯去抓⼀个线程绑定在这个 channel 上。
- channelActive 当 channel 所有的业务逻辑链准备完毕(也就是说 channel 的 pipeline 中已经 添加完所有的 handler)以及绑定好⼀个 NIO 线程之后,这条连接算是真正激活了。
- channelRead
- channelReadComplete
关闭连接: channelInactive -> channelUnregistered -> handlerRemoved
- channelInactive 表⾯这条连接已经被关闭,这条连接在 TCP 层⾯已经不再是 ESTABLISH 状态了
- channelUnregistered 连接已经被关闭,那么与这条连接绑定的线程就不需要对这条连接负责了, 表明与这条连接对应的 NIO 线程移除掉对这条连接的处理
- handlerRemoved 给这条连接上添加的所有的业务逻辑处理器都给移除掉
ChannelHandlerContext
保存 Channel 相关的上下⽂信息,同时关联⼀个 ChannelHandler 对象。
ChannelHandlerContext 中包含⼀个具体的事件处理器 ChannelHandler,同时 ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,⽅便对 ChannelHandler 进⾏调⽤。
每个ChannelHandlerContext之间形成双向链表,DefaultChannelPipeline中保存第⼀个ChannelHandlerContext及最后⼀个 ChannelHandlerContext的引⽤,伪代码:
public class DefaultChannelPipeline implements ChannelPipeline {
.....
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
....
}
⽽
AbstractChannelHandlerContext中可以看到
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
...........
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
....
}
双向链表(double linked list)
注意:双向链表并不是在channel上实现的,是在Context上实现的,Context包含channel。
ChannelPipeline
ChannelPipeline 是⼀个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和 操作,相当于⼀个贯穿 Channel 的链。如下图:
⼊站事件和出站事件在⼀个双向链表中,⼊站事件会从链表head往后传递到最后⼀个⼊站的 handler,出站事件会从链表tail往前传递到最前⼀个出站的handler,两种类型的handler互不⼲扰。
在Channel创建的时候,会同时创建ChannelPipeline。
Channel ChannelPipeline
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
.....
private final DefaultChannelPipeline pipeline;
.....
protected AbstractChannel(Channel parent)
{
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
....
}
在ChannelPipeline中也会持有Channel的引⽤
public class DefaultChannelPipeline implements ChannelPipeline{
....
private final Channel channel;
....
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail; tail.prev = head;
}
....
}
这里是不是感觉挺乱,看一下以下代码实例:
打印结果:
从结果上可以看出Channel和Pipeline其实存在上下文关系。
⼊站事件和出站事件
pipeline保存通道所有的处理器信息,在创建⼀个channel的时候,会创建⼀个这个channel专有的 pipeline,⼊站事件和出站事件都会调⽤这个pipeline上⾯的处理器。
伪代码:
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
结果:
对于inbound事件传播,顺序是1,2,5,跟添加顺序相同。
对于outbound事件传播,顺序是5,4,3,跟添加顺序相反。
重点记住: InboundHandler顺序执⾏,OutboundHandler逆序执⾏。
ChannelOption
hannelOption定义了对⼀个Channel的各种属性配置选项,包括了各种底层连接的详细信息,如 keep-alive或者超时属性以及缓冲区的设置等。
EventLoopGroup 和 NioEventLoopGroup
ServerSocketChannel 与 SocketChannel
EventLoopGroup 是⼀组 EventLoop 的抽象,Netty 为了更好的利⽤多核 CPU 资源,⼀般会有多 个 EventLoop 同时⼯作,每个 EventLoop 维护着⼀个 Selector 实例。
EventLoopGroup 提供 next 接⼝,可以从组⾥⾯按照⼀定规则获取其中⼀个 EventLoop 来处理任 务。在 Netty 服务器端编程中,⼀般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
通常⼀个服务端⼝即⼀个 ServerSocketChannel 对应⼀个 Selector 和⼀个 EventLoop 线程。 BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进⾏ IO 处理
BossEventLoopGroup通常是⼀个单线程的EventLoop,EventLoop维护着⼀个注册了 ServerSocketChannel的Selector实例BossEventLoop不断轮询Selector将连接事件分离出来。
通常是OP_ACCEPT事件,然后将收到SocketChannel交给WokerEventLoopGroup。
WokerEventLoopGroup会有next选择其中⼀个EventLoop来将这个SocketChannel注册到其维护 的Selector并对其后续的IO事件进⾏处理。
EventLoop与Channel
任务执⾏
关于EventLoop以及EventLoopGroup的映射关系为:
- ⼀个EventLoopGroup 包含⼀个或者多个EventLoop;
- ⼀个EventLoop 在它的⽣命周期内只和⼀个Thread 绑定; 所有由EventLoop 处理的I/O 事件都将在它专有的Thread 上被处理;
- ⼀个Channel 在它的⽣命周期内只注册于⼀个EventLoop;
- ⼀个EventLoop 可能会被分配给⼀个或多个Channel。
当⼀个连接到达时,Netty 就会注册⼀个 Channel,然后从 EventLoopGroup 中分配⼀个 EventLoop 绑定到这个Channel上,在该Channel的整个⽣命周期中都是有这个绑定的 EventLoop 来服 务的。
ByteBuf
Netty 提供⼀个专⻔⽤来操作缓冲区的⼯具类Unpooled。
常⽤⽅法伪代码:
/** 通过给定的数据和字符编码返回⼀个ByteBuf 对象(类似NIO中的ByteBuffer,但是有区别)1.10.2 ByteBuf readerIndex(读指针) writerIndex(写指针) maxCapacity(最⼤容量) **/
public static ByteBuf copiedBuffer(CharSequence string, Charset charset) {
ObjectUtil.checkNotNull(string, "string");
if (CharsetUtil.UTF_8.equals(charset)) {
return copiedBufferUtf8(string);
}
if (CharsetUtil.US_ASCII.equals(charset)) {
return copiedBufferAscii(string);
}
if (string instanceof CharBuffer) {
return copiedBuffer((CharBuffer) string, charset);
}
return copiedBuffer(CharBuffer.wrap(string), charset);
}
Netty中实现
Netty中也是通过指针实现,但是与java nio中不同的是采用了又指针。
- readerIndex(读指针)
- writerIndex(写指针)
- maxCapacity(最⼤容量)
源码备注图:
调⽤discardReadBytes⽅法后,readIndex置为0,writeIndex也往前移动了Discardable bytes⻓ 度的距离,扩⼤了可写区域。但是这种做***严重影响效率,它进⾏了⼤量的拷⻉⼯作。
如果要进⾏数据的清除操作,建议使⽤clear⽅法,调⽤clear()⽅法将会将readIndex和writeIndex 同时置为0,不会进⾏内存的拷⻉⼯作,同时要注意,clear⽅法不会清除内存中的内容,只是改变了索引 位置⽽已