概述
一个问题
编码器实现了ChannelOutboundHandler
,并将出站数据从 一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类, 用于帮助你编写具有以下功能的编码器:
- 将消息编码为字节
- 将消息编码为消息
我们将首先从抽象基类 MessageToByteEncoder 开始来对这些类进行考察
1 抽象类 MessageToByteEncoder
解码器通常需要在Channel
关闭之后产生最后一个消息(因此也就有了 decodeLast()
方法)
这显然不适于编码器的场景——在连接被关闭之后仍然产生一个消息是毫无意义的
1.1 ShortToByteEncoder
其接受一Short
型实例作为消息,编码为Short
的原子类型值,并写入ByteBuf
,随后转发给ChannelPipeline
中的下一个 ChannelOutboundHandler
每个传出的 Short 值都将会占用 ByteBuf 中的 2 字节
1.2 Encoder
Netty 提供了一些专门化的 MessageToByteEncoder
,可基于此实现自己的编码器
WebSocket08FrameEncoder
类提供了一个很好的实例
2 抽象类 MessageToMessageEncoder
你已经看到了如何将入站数据从一种消息格式解码为另一种
为了完善这幅图,将展示 对于出站数据将如何从一种消息编码为另一种。MessageToMessageEncoder
类的 encode()
方法提供了这种能力
为了演示,使用IntegerToStringEncoder
扩展了 MessageToMessageEncoder
- 编码器将每个出站 Integer 的 String 表示添加到了该 List 中
关于有趣的 MessageToMessageEncoder 的专业用法,请查看 io.netty.handler. codec.protobuf.ProtobufEncoder
类,它处理了由 Google 的 Protocol Buffers 规范所定义 的数据格式。
一个java对象最后是如何转变成字节流,写到socket缓冲区中去的
pipeline中的标准链表结构
java对象编码过程
write:写队列
flush:刷新写队列
writeAndFlush: 写队列并刷新
pipeline中的标准链表结构
数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务Handler
处理,调用write
,将结果对象写出去
而写的过程先通过tail
节点,然后通过encoder
节点将对象编码成ByteBuf
,最后将该ByteBuf
对象传递到head
节点,调用底层的Unsafe写到JDK底层管道
Java对象编码过程
为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的ByteBuf,写到管道里?
我们先看下调用write的code
业务处理器接受到请求之后,做一些业务处理,返回一个user
-
然后,user在pipeline中传递
-
情形一
-
情形二
handler 如果不覆盖 flush 方法,就会一直向前传递直到 head 节点
落到 Encoder
节点,下面是 Encoder
的处理流程
按照简单自定义协议,将Java对象 User 写到传入的参数 out中,这个out到底是什么?
需知User
对象,从BizHandler
传入到 MessageToByteEncoder
时,首先传到 write
1. 判断当前Handelr是否能处理写入的消息(匹配对象)
- 判断该对象是否是该类型参数匹配器实例可匹配到的类型
2 分配内存
3 编码实现
- 调用
encode
,这里就调回到Encoder
这个Handler
中
- 其为抽象方法,因此自定义实现类实现编码方法
4 释放对象
- 既然自定义Java对象转换成
ByteBuf
了,那么这个对象就已经无用,释放掉 (当传入的msg
类型是ByteBuf
时,就不需要自己手动释放了)
5 传播数据
//112 如果buf中写入了数据,就把buf传到下一个节点,直到 header 节点
6 释放内存
//115 否则,释放buf,将空数据传到下一个节点
// 120 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
// 127 当buf在pipeline中处理完之后,释放
Encoder处理传入的Java对象
- 判断当前
Handler
是否能处理写入的消息- 如果能处理,进入下面的流程
- 否则,直接扔给下一个节点处理
- 将对象强制转换成
Encoder
可以处理的Response
对象 - 分配一个
ByteBuf
- 调用
encoder
,即进入到 Encoder 的 encode方法,该方法是用户代码,用户将数据写入ByteBuf - 既然自定义Java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉(当传入的msg类型是ByteBuf时,无需自己手动释放)
- 如果buf中写入了数据,就把buf传到下一个节点,否则,释放buf,将空数据传到下一个节点
- 最后,当buf在pipeline中处理完之后,释放节点
总结就是,Encoder
节点分配一个ByteBuf
,调用encode
方法,将Java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点,在我们的例子中,最终会传入到head节点
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
这里的msg就是前面在Encoder节点中,载有java对象数据的自定义ByteBuf对象
write - 写buffer队列
以下过程分三步讲解
direct ByteBuf
- 首先,调用
assertEventLoop
确保该方法的调用是在reactor
线程中 - 然后,调用
filterOutboundMessage()
,将待写入的对象过滤,把非ByteBuf
对象和FileRegion
过滤,把所有的非直接内存转换成直接内存DirectBuffer
插入写队列
- 接下来,估算出需要写入的ByteBuf的size
- 最后,调用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情
想要理解上面这段代码,须掌握写缓存中的几个消息指针
ChannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise下面分别是
三个指针的作用
- flushedEntry
表第一个被写到OS Socket缓冲区中的节点
- unFlushedEntry
表第一个未被写入到OS Socket缓冲区中的节点
- tailEntry
表ChannelOutboundBuffer
缓冲区的最后一个节点
图解过程
-
初次调用write 即
addMessage
后
fushedEntry
指向空,unFushedEntry
和tailEntry
都指向新加入节点 -
第二次调用
addMessage
后
-
第n次调用
addMessage
后
可得,调用n次addMessage
后
flushedEntry
指针一直指向null
,表此时尚未有节点需写到Socket缓冲区unFushedEntry
后有n个节点,表当前还有n个节点尚未写到Socket缓冲区
设置写状态
-
统计当前有多少字节需要需要被写出
-
当前缓冲区中有多少待写字节
- 所以默认不能超过64k
- 自旋锁+CAS 操作,通过 pipeline 将事件传播到channelhandler 中监控
flush:刷新buffer队列
添加刷新标志并设置写状态
-
不管调用
channel.flush()
,还是ctx.flush()
,最终都会落地到pipeline
中的head
节点
-
之后进入到
AbstractUnsafe
-
flush方法中,先调用
-
结合前面的图来看,上述过程即
首先拿到unflushedEntry
指针,然后将flushedEntry
指向unflushedEntry
所指向的节点,调用完毕后
遍历 buffer 队列,过滤bytebuf
-
接下来,调用
flush0()
-
发现这里的核心代码就一个
doWrite
AbstractNioByteChannel
- 继续跟
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
// 拿到第一个需要flush的节点的数据
Object msg = in.current();
if (msg instanceof ByteBuf) {
boolean done = false;
long flushedAmount = 0;
// 拿到自旋锁迭代次数
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
// 自旋,将当前节点写出
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
// 写完之后,将当前节点删除
if (done) {
in.remove();
} else {
break;
}
}
}
}
-
第一步,调用
current()
先拿到第一个需要flush
的节点的数据
-
第二步,拿到自旋锁的迭代次数
-
第三步 调用 JDK 底层 API 进行自旋写
自旋的方式将ByteBuf
写到JDK NIO的Channel
强转为ByteBuf,若发现没有数据可读,直接删除该节点
-
拿到自旋锁迭代次数
-
在并发编程中使用自旋锁可以提高内存使用率和写的吞吐量,默认值为16
-
继续看源码
-
javaChannel()
,表明 JDK NIO Channel 已介入此次事件
-
得到向JDK 底层已经写了多少字节
-
从 Netty 的 bytebuf 写到 JDK 底层的 bytebuffer
-
第四步,删除该节点
节点的数据已经写入完毕,接下来就需要删除该节点
首先拿到当前被flush
掉的节点(flushedEntry
所指)
然后拿到该节点的回调对象ChannelPromise
, 调用removeEntry()
移除该节点
这里是逻辑移除,只是将flushedEntry指针移到下个节点,调用后
随后,释放该节点数据的内存,调用safeSuccess
回调,用户代码可以在回调里面做一些记录,下面是一段Example
ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
// 回调
}
})
最后,调用 recycle
,将当前节点回收
writeAndFlush: 写队列并刷新
writeAndFlush
在某个Handler
中被调用之后,最终会落到 TailContext
节点
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
最终,通过一个boolean
变量,表示是调用invokeWriteAndFlush
,还是invokeWrite
,invokeWrite
便是我们上文中的write过程
可以看到,最终调用的底层方法和单独调用write
和flush
一样的
由此看来,invokeWriteAndFlush
基本等价于write
之后再来一次flush
总结
- 调用
write
并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush
才是真正的写出 writeAndFlush
等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功- netty中的缓冲区中的ByteBuf为DirectByteBuf
当 BizHandler 通过 writeAndFlush 方法将自定义对象往前传播时,其实可以拆分成两个过程
- 通过 pipeline逐渐往前传播,传播到其中的一个 encode 节点后,其负责重写 write 方法将自定义的对象转化为 ByteBuf,接着继续调用 write 向前传播
- pipeline中的编码器原理是创建一个
ByteBuf
,将Java对象转换为ByteBuf
,然后再把ByteBuf
继续向前传递,若没有再重写了,最终会传播到 head 节点,其中缓冲区列表拿到缓存写到 JDK 底层 ByteBuffer