1、引言
京东的京麦商家后台2014年构建网关,从HTTP网关发展到TCP网关。在2016年重构完成基于Netty4.x+Protobuf3.x实现对接PC和App上下行通信的高可用、高性能、高稳定的TCP长连接网关。
早期京麦搭建HTTP和TCP长连接功能主要用于消息通知的推送,并未应用于API网关。随着逐步对NIO的深入学习和对Netty框架的了解,以及对系统通信稳定能力的愈加高要求,采用NIO技术应用网关实现API请求调用的想法,最终在2016年实现,并完全支撑业务化运行。由于诸多的改进,包括TCP长连接容器、Protobuf的序列化、服务泛化调用框架等等,性能比HTTP网关提升10倍以上,稳定性也远远高于HTTP网关。
本文重点介绍京麦TCP网关的技术架构及Netty的应用实践。
简单介绍一下京麦是什么:
京麦工作台是京东商城为京东的商家准备的一款后台管理工具,它可以使您不登陆商家后台就能进行订单生产,快速实现订单下载发货流程。类似于淘宝的旺旺商家版(现在叫淘宝千牛)这样的东西。
2、本文作者
张松然:
- 京东商家研发部架构师;
- 丰富的构建高性能高可用大规模分布式系统的研发、架构经验;
- 2013年加入京东,目前负责京麦服务网关和京麦服务市场的系统研发工作。
3、TCP网关的网络结构
基于Netty构建京麦TCP网关的长连接容器,作为网关接入层提供服务API请求调用。
客户端通过域名+端口访问TCP网关,域名不同的运营商对应不同的VIP,VIP发布在LVS上,LVS将请求转发给后端的HAProxy,再由HAProxy把请求转发给后端的Netty的IP+Port。
LVS转发给后端的HAProxy,请求经过LVS,但是响应是HAProxy直接反馈给客户端的,这也就是LVS的DR模式。
4、TCP网关长连接容器架构
TCP网关的核心组件是Netty,而Netty的NIO模型是Reactor反应堆模型(Reactor相当于有分发功能的多路复用器Selector)。每一个连接对应一个Channel(多路指多个Channel,复用指多个连接复用了一个线程或少量线程,在Netty指EventLoop),一个Channel对应唯一的ChannelPipeline,多个Handler串行的加入到Pipeline中,每个Handler关联唯一的ChannelHandlerContext。
TCP网关长连接容器的Handler就是放在Pipeline的中。我们知道TCP属于OSI的传输层,所以建立Session管理机制构建会话层来提供应用层服务,可以极大的降低系统复杂度。所以,每一个Channel对应一个Connection,一个Connection又对应一个Session,Session由Session Manager管理,Session与Connection是一一对应,Connection保存着ChannelHandlerContext(ChannelHanderContext可以找到Channel),Session通过心跳机制来保持Channel的Active状态。
每一次Session的会话请求(ChannelRead)都是通过Proxy代理机制调用Service层,数据请求完毕后通过写入ChannelHandlerConext再传送到Channel中。数据下行主动推送也是如此,通过Session Manager找到Active的Session,轮询写入Session中的ChannelHandlerContext,就可以实现广播或点对点的数据推送逻辑。如下图所示。
京麦TCP网关使用Netty Channel进行数据通信,使用Protobuf进行序列化和反序列化,每个请求都将被封装成Byte二进制字节流,在整个生命周期中,Channel保持长连接,而不是每次调用都重新创建Channel,达到链接的复用。
我们接下来来看看基于Netty的具体技术实践。
5、TCP网关Netty Server的IO模型
具体的实现过程如下:
- 1)创建ServerBootstrap,设定BossGroup与WorkerGroup线程池;
- 2)bind指定的port,开始侦听和接受客户端链接(如果系统只有一个服务端port需要监听,则BossGroup线程组线程数设置为1);
- 3)在ChannelPipeline注册childHandler,用来处理客户端链接中的请求帧。
6、TCP网关的线程模型
TCP网关使用Netty的线程池,共三组线程池,分别为BossGroup、WorkerGroup和ExecutorGroup。其中,BossGroup用于接收客户端的TCP连接,WorkerGroup用于处理I/O、执行系统Task和定时任务,ExecutorGroup用于处理网关业务加解密、限流、路由,及将请求转发给后端的抓取服务等业务操作。
NioEventLoop是Netty的Reactor线程,其角色:
- 1)Boss Group:作为服务端Acceptor线程,用于accept客户端链接,并转发给WorkerGroup中的线程;
- 2)Worker Group:作为IO线程,负责IO的读写,从SocketChannel中读取报文或向SocketChannel写入报文;
- 3)Task Queue/Delay Task Queu:作为定时任务线程,执行定时任务,例如链路空闲检测和发送心跳消息等。
7、TCP网关执行时序图
如上图所示,其中步骤一至步骤九是Netty服务端的创建时序,步骤十至步骤十三是TCP网关容器创建的时序。
步骤一:创建ServerBootstrap实例,ServerBootstrap是Netty服务端的启动辅助类。
步骤二:设置并绑定Reactor线程池,EventLoopGroup是Netty的Reactor线程池,EventLoop负责所有注册到本线程的Channel。
步骤三:设置并绑定服务器Channel,Netty Server需要创建NioServerSocketChannel对象。
步骤四:TCP链接建立时创建ChannelPipeline,ChannelPipeline本质上是一个负责和执行ChannelHandler的职责链。
步骤五:添加并设置ChannelHandler,ChannelHandler串行的加入ChannelPipeline中。
步骤六:绑定监听端口并启动服务端,将NioServerSocketChannel注册到Selector上。
步骤七:Selector轮训,由EventLoop负责调度和执行Selector轮询操作。
步骤八:执行网络请求事件通知,轮询准备就绪的Channel,由EventLoop执行ChannelPipeline。
步骤九:执行Netty系统和业务ChannelHandler,依次调度并执行ChannelPipeline的ChannelHandler。
步骤十:通过Proxy代理调用后端服务,ChannelRead事件后,通过发射调度后端Service。
步骤十一:创建Session,Session与Connection是相互依赖关系。
步骤十二:创建Connection,Connection保存ChannelHandlerContext。
步骤十三:添加SessionListener,SessionListener监听SessionCreate和SessionDestory等事件。
8、TCP网关源码分析
8.1Session管理
Session是客户端与服务端建立的一次会话链接,会话信息中保存着SessionId、连接创建时间、上次访问事件,以及Connection和SessionListener,在Connection中保存了Netty的ChannelHandlerContext上下文信息。Session会话信息会保存在SessionManager内存管理器中。
创建Session的源码:
通过源码分析,如果Session已经存在销毁Session,但是这个需要特别注意,创建Session一定不要创建那些断线重连的Channel,否则会出现Channel被误销毁的问题。因为如果在已经建立Connection(1)的Channel上,再建立Connection(2),进入session.close方***将cxt关闭,Connection(1)和Connection(2)的Channel都将会被关闭。在断线之后再建立连接Connection(3),由于Session是有一定延迟,Connection(3)和Connection(1/2)不是同一个,但Channel可能是同一个。
所以,如何处理是否是断线重练的Channel,具体的方法是在Channel中存入SessionId,每次事件请求判断Channel中是否存在SessionId,如果Channel中存在SessionId则判断为断线重连的Channel,代码如下图所示。
8.2心跳
心跳是用来检测保持连接的客户端是否还存活着,客户端每间隔一段时间就会发送一次心跳包上传到服务端,服务端收到心跳之后更新Session的最后访问时间。在服务端长连接会话检测通过轮询Session集合判断最后访问时间是否过期,如果过期则关闭Session和Connection,包括将其从内存中删除,同时注销Channel等。如下图代码所示。
通过源码分析,在每个Session创建成功之后,都会在Session中添加TcpHeartbeatListener这个心跳检测的监听,TcpHeartbeatListener是一个实现了SessionListener接口的守护线程,通过定时休眠轮询Sessions检查是否存在过期的Session,如果轮训出过期的Session,则关闭Session。如下图代码所示。
同时,注意到session.connect方法,在connect方法中会对Session添加的Listeners进行添加时间,它会循环调用所有Listner的sessionCreated事件,其中TcpHeartbeatListener也是在这个过程中被唤起。如下图代码所示。
8.3数据上行
数据上行特指从客户端发送数据到服务端,数据从ChannelHander的channelRead方法获取数据。数据包括创建会话、发送心跳、数据请求等。这里注意的是,channelRead的数据包括客户端主动请求服务端的数据,以及服务端下行通知客户端的返回数据,所以在处理object数据时,通过数据标识区分是请求-应答,还是通知-回复。如下图代码所示。
8.4数据下行
数据下行通过MQ广播机制到所有服务器,所有服务器收到消息后,获取当前服务器所持有的所有Session会话,进行数据广播下行通知。如果是点对点的数据推送下行,数据也是先广播到所有服务器,每个服务器判断推送的端是否是当前服务器持有的会话,如果判断消息数据中的信息是在当前服务,则进行推送,否则抛弃。如下图代码所示。
通过源码分析,数据下行则通过NotifyProxy的方式发送数据,需要注意的是Netty是NIO,如果下行通知需要获取返回值,则要将异步转同步,所以NotifyFuture是实现java.util.concurrent.Future的方法,通过设置超时时间,在channelRead获取到上行数据之后,通过seq来关联NotifyFuture的方法。如下图代码所示。
下行的数据通过TcpConnector的send方法发送,send方式则是通过ChannelHandlerContext的writeAndFlush方法写入Channel,并实现数据下行,这里需要注意的是,之前有另一种写法就是cf.await,通过阻塞的方式来判断写入是否成功,这种写法偶发出现BlockingOperationException的异常。如下图代码所示。
使用阻塞获取返回值的写法:
关于BlockingOperationException的问题我在StackOverflow进行提问,非常幸运的得到了Norman Maurer(Netty的核心贡献者之一)的解答:
最终结论大致分析出,在执行write方法时,Netty会判断current thread是否就是分给该Channe的EventLoop,如果是则行线程执行IO操作,否则提交executor等待分配。当执行await方法时,会从executor里fetch出执行线程,这里就需要checkDeadLock,判断执行线程和current threads是否时同一个线程,如果是就检测为死锁抛出异常BlockingOperationException。
9、本文小结
本篇文章粗浅的向大家介绍了京麦TCP网关中使用的Netty实现长连接容器的架构,涉及TCP长连接容器搭建的关键点一一进行了阐述,以及对源码进行简单的分析。在京麦发展过程里Netty还有很多的实践应用,例如Netty4.11+HTTP2实现APNs的消息推送等等。