- 下一篇:【笔记】RPC 架构 - 2 - netty客户端、源码分析
- 笔记视频: https://www.bilibili.com/video/av44457831/?p=4
- 源码下载【包含视频资料】 - Github
- RPC简介及框架选择
文章目录
传统 Socket 服务器端
下面代码为例
# 单线程
运行下面代码
package test.nio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class OioServer {
public static void main(String[] args) throws IOException {
// 创建 Socket 服务, 监听10101端口
ServerSocket server = new ServerSocket(10101);
System.out.println("服务器启动!");
while (true) {
// 获取一个套接字(阻塞1)
final Socket socket = server.accept();
System.out.println("来了一个新客户端!");
// 业务处理
handler(socket);
}
}
/** * 读取数据 */
private static void handler(Socket socket) {
try {
byte[] bytes = new byte[1024];
InputStream in = socket.getInputStream() ;
while (true) {
// 读取数据(阻塞2)
int read = in.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));
}else {
break ;
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
System.out.println("socket关闭");
socket.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
}
来到第一个阻塞点
客户端尝试连接 - 【工具】windows - telnet
telnet 127.0.0.1 10101
来到第二个阻塞点
服务端接收到数据之后,继续阻塞
<mark>单线程阻塞情况,无法接收到其他客户端响应</mark>
传统 IO 特点
- 阻塞点:
server.accept()
inputStream
# 线程池
<mark>为了同时响应 多个客户端,可我们单线程情况有两个阻塞点,需要多线程</mark>
这里我们优化代码,使用线程池
package test.nio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class OioServer {
public static void main(String[] args) throws IOException {
ExecutorService pool = Executors.newCachedThreadPool();
// 创建 Socket 服务, 监听10101端口
ServerSocket server = new ServerSocket(10101);
System.out.println("服务器启动!");
while (true) {
// 获取一个套接字(阻塞1)
final Socket socket = server.accept();
System.out.println("来了一个新客户端!");
pool.execute(new Runnable() {
@Override
public void run() {
// 业务处理
handler(socket);
}
});
}
}
/** * 读取数据 */
private static void handler(Socket socket) {
try {
byte[] bytes = new byte[1024];
InputStream in = socket.getInputStream();
while (true) {
// 读取数据(阻塞2)
int read = in.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
System.out.println("socket关闭");
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
新问题:
每一个客户端请求,都创建一个线程,资源浪费
导致结果:
无法做长连接,只能做短连接(如 tomcat,底层使用的就是 socket)
# 总结
海底捞:一个客人一个服务员
NIO
下面看看 NIO
下面代码为例
package test.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/** * NIO服务器 */
public class NioServer {
// 通道管理器
private Selector selector;
/** * 获得一个 ServerSocket 通道, 并且对该通道做一些初始化的工作 * * @param port 绑定的端口号 * @throws IOException */
public void initServer(int port) throws IOException {
// 获得一个ServerSocket通道
ServerSocketChannel socketChannelServer = ServerSocketChannel.open();
// 设置通道为非阻塞
socketChannelServer.configureBlocking(false);
// 将该通道对应的 SeverSocket 绑定到 port 端口
socketChannelServer.socket().bind(new InetSocketAddress(port));
// 获得一个通道管理器
this.selector = Selector.open();
// 将通道管理器和该通道绑定, 并且为该通道注册 SelectionKey.OP_ACCEPT 事件
// 注册该事件后, 当该事件到达时, selector.select() 会返回
// 如果该事件没有到达, selector.select() 会一直阻塞
socketChannelServer.register(selector, SelectionKey.OP_ACCEPT);
}
/** * 采用轮询的方式监听 selector 上是否有需要处理的事件。 如果有,则进行处理 */
public void listen() throws IOException {
System.out.println("服务端启动成功!");
// 轮询访问 selector
while (true) {
// 当注册的事件到达时, 方法返回; 否则, 该方***一直阻塞
this.selector.select();
// 获得 selector 中选中的项的迭代器, 选中的项为注册的事件
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 删除已经选中的 key, 以防重复处理
iterator.remove();
handler(key);
}
}
}
/** * 处理请求 * * @param key selector 轮询出的被触发事件 * @throws IOException */
private void handler(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
// 客户端请求连接事件 OP_ACCEPT
handlerAccept(key);
} else if (key.isReadable()) {
// 获得了可读的事件 OP_READ
handlerRead(key);
}
}
/** * 处理连接请求 * * @param key OP_ACCEPT 事件 * @throws IOException */
private void handlerAccept(SelectionKey key) throws IOException {
ServerSocketChannel socketChannelServer = (ServerSocketChannel) key.channel();
// 获得和客户端连接的通道
SocketChannel channel = socketChannelServer.accept();
// 设置成非阻塞
channel.configureBlocking(false);
// 在这里可以给客户端发送信息哦
System.out.println("新的客户端连接");
// 在和客户端连接成功后, 为了可以接收到客户端的信息, 需要给通道设置可读的权限
channel.register(this.selector, SelectionKey.OP_READ);
}
/** * 处理读的事件 * * @param key OP_READ * @throws IOException */
private void handlerRead(SelectionKey key) throws IOException {
// 服务器可读信息:得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10); // bytes
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data);
System.out.println("服务端收到信息:" + msg);
}
/** * 启动服务端测试 * @throws IOException */
public static void main(String[] args) throws IOException {
NioServer server = new NioServer();
server.initServer(8000);
server.listen();
}
}
# 单客户端连接
启动
阻塞点(下图)
客户端尝试连接
telnet 127.0.0.1 8000
连接成功
然后回到原先的 阻塞点,继续阻塞
# 多客户端连接
这里因为只用了10字节,所以换行…
# 总结
这里整理下api
ServerSocketChannel
:对应传统 ServerSocketSocketChannel
:对应 SocketSelector
:nio 核心,用于监听 SocketChannel 和 ServerSocketChannel
<mark>因为同时监听:连接和消息事件(同时还能注册其他事件)。</mark>
<mark>所以,实现了单线程为多个客户端服务 ! ! !</mark>SelectionKey
:
大排档:服务员(selector)面向事件服务
⇒ 一个服务员对应多个客人
# 上面NIO代码的优化
我们给 可读事件 添加 回写功能 和 客户端的功能
/** * 处理读的事件 * * @param key OP_READ * @throws IOException */
private void handlerRead(SelectionKey key) throws IOException {
// 服务器可读信息:得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10); // bytes
// 跟传统socket类似
int read = channel.read(buffer);
if(read<=0) {
System.out.println("客户端关闭");
key.cancel();
return ;
}
byte[] data = buffer.array();
String msg = new String(data);
System.out.println("服务端收到信息:" + msg);
// 回写数据
ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
// 将消息回送给客户端
channel.write(outBuffer);
}
## 说明!
必须添加
...
if(read<=0) {
System.out.println("客户端关闭");
key.cancel();
return ;
}
...
这段代码,用于检测客户端是否已断开。
否则,当客户端断开,回写会报错
或者出现死循环
<mark>原因是:服务端会循环从 channel 读取数据 !</mark>
netty
netty版本:3.x、4.x、4.x
# 应用领域
-
分布式进程通信
如:hadoop、dubbo、akka等具有分布式功能的框架,底层RPC通信都是基于netty实现的,这些框架版本通常都还在用netty3.x(2019年2月10日) -
游戏服务器开发
最新的游戏服务器都有部分公司可能已经开始采用 netty4.x 或者 netty5.x
# netty hello! (MessageEvent 声明周期)
创建 maven jar 项目
引入 netty 依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.10.5.Final</version>
</dependency>
</dependencies>
创建 netty 的服务类
package cn.edut.server;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
public class Server {
public static void main(String[] args) {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
ExecutorService bossExecutor = Executors.newCachedThreadPool();
ExecutorService workerExecutor = Executors.newCachedThreadPool();
// 设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(bossExecutor, workerExecutor));
// 设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
// 管道可以理解为:一堆过滤器
@Override
public ChannelPipeline getPipeline() throws Exception {
// 通常服务类都是xxx+s,如channels
ChannelPipeline pipeline = Channels.pipeline();
// 添加handler
pipeline.addLast("helloHandler", new HelloHandler());
return pipeline;
}
});
// 为服务类绑定端口
bootstrap.bind(new InetSocketAddress(10101));
System.out.println("服务启动!");
}
}
上面的 HelloHandler 是自定义的 handler 类
下面实现一下下
继承 SimpleChannelHandler ,并(目前阶段)关注下面打钩的方法
package cn.edut.server;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
public class HelloHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("MessageEvent:"+e);
System.out.println("messageReceived");
super.messageReceived(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println("exceptionCaught");
super.exceptionCaught(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelConnected");
super.channelConnected(ctx, e);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelDisconnected");
super.channelDisconnected(ctx, e);
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelClosed");
super.channelClosed(ctx, e);
}
}
启动 server 服务
客户端连接:
telnet 127.0.0.1 10101
客户端发送信息
send hello
可以看到,MessageEvent
记录了来自客户端的信息
断开连接
channelClosed
和channelDisconnected
的区别:
channelClosed
: channel 关闭的时候就触发channelDisconnected
:<mark>必须是连接已经建立</mark>,关闭通道的时候才会触发
## 内置Handler:StringDecoder
获取客户端信息,我们需要在 messageReceived
方法上修改
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("messageReceived");
System.out.println("MessageEvent:"+e);
//获取客户端信息
ChannelBuffer buffer = (ChannelBuffer) e.getMessage() ;
System.out.println("message:"+new String(buffer.array()));
super.messageReceived(ctx, e);
}
但是这样很麻烦, netty 提供了 handler:StringDecoder
在 channelPipeline 里面添加 handler
这个handler 下面的handler 需要获取 客户端发送的信息,在 messageReceived
中只需要
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("messageReceived");
System.out.println("MessageEvent:" + e);
// 获取客户端信息
String message = (String) e.getMessage();
System.out.println("message:" + message);
super.messageReceived(ctx, e);
}
即可
## 回写数据
修改 messageReceived
方法,添加回写代码
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("messageReceived");
System.out.println("MessageEvent:" + e);
// 获取客户端信息
String message = (String) e.getMessage();
System.out.println("message:" + message);
// 回写数据
ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer("hi from server".getBytes());
ctx.getChannel().write(copiedBuffer);
super.messageReceived(ctx, e);
}
<mark>在 telnet 的 命令行模式下 按回车</mark>,可以看到服务端回写的数据
注意:!
回写数据需要 ChannelBuffer 或者 FileRegion
否则抛出IllegalArgumentException
## 内置Handler:StringEncoder
同理,netty 也内置了 回写的 handler方便我们使用
# 名词说明
## ExecutorService
- boss:监听端口
- worker:监听读写
(这里介绍,后面看源码就完全懂了)