传统 Socket 服务器端

下面代码为例

# 单线程

运行下面代码

package test.nio;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class OioServer {

	public static void main(String[] args) throws IOException {
		// 创建 Socket 服务, 监听10101端口
		ServerSocket server = new ServerSocket(10101);
		System.out.println("服务器启动!");
		while (true) {
			// 获取一个套接字(阻塞1)
			final Socket socket = server.accept();
			System.out.println("来了一个新客户端!");
			// 业务处理
			handler(socket);
		}
	}

	/** * 读取数据 */
	private static void handler(Socket socket) {
		try {
			byte[] bytes = new byte[1024];
			InputStream in = socket.getInputStream() ;
			while (true) {
				// 读取数据(阻塞2)
				int read = in.read(bytes);
				if (read != -1) {
					System.out.println(new String(bytes, 0, read));
				}else {
					break ; 
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}finally {
			try {
				System.out.println("socket关闭");
				socket.close();
			}catch (IOException e) {
				e.printStackTrace();
			}
		}

	}

}

来到第一个阻塞点

客户端尝试连接 - 【工具】windows - telnet

telnet 127.0.0.1 10101


来到第二个阻塞点


服务端接收到数据之后,继续阻塞

<mark>单线程阻塞情况,无法接收到其他客户端响应</mark>

传统 IO 特点

  • 阻塞点:
    server.accept()
    inputStream

# 线程池

<mark>为了同时响应 多个客户端,可我们单线程情况有两个阻塞点,需要多线程</mark>

这里我们优化代码,使用线程池

package test.nio;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class OioServer {

	public static void main(String[] args) throws IOException {

		ExecutorService pool = Executors.newCachedThreadPool();
		// 创建 Socket 服务, 监听10101端口
		ServerSocket server = new ServerSocket(10101);
		System.out.println("服务器启动!");
		while (true) {
			// 获取一个套接字(阻塞1)
			final Socket socket = server.accept();
			System.out.println("来了一个新客户端!");
			pool.execute(new Runnable() {
				@Override
				public void run() {
					// 业务处理
					handler(socket);
				}
			});
		}
	}

	/** * 读取数据 */
	private static void handler(Socket socket) {
		try {
			byte[] bytes = new byte[1024];
			InputStream in = socket.getInputStream();
			while (true) {
				// 读取数据(阻塞2)
				int read = in.read(bytes);
				if (read != -1) {
					System.out.println(new String(bytes, 0, read));
				} else {
					break;
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				System.out.println("socket关闭");
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}

	}

}

新问题:
每一个客户端请求,都创建一个线程,资源浪费
导致结果:
无法做长连接,只能做短连接(如 tomcat,底层使用的就是 socket)

# 总结

海底捞:一个客人一个服务员

NIO

参考 Java NIO原理 图文分析及代码实现

下面看看 NIO

下面代码为例

package test.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/** * NIO服务器 */
public class NioServer {
	// 通道管理器
	private Selector selector;

	/** * 获得一个 ServerSocket 通道, 并且对该通道做一些初始化的工作 * * @param port 绑定的端口号 * @throws IOException */
	public void initServer(int port) throws IOException {
		// 获得一个ServerSocket通道
		ServerSocketChannel socketChannelServer = ServerSocketChannel.open();
		// 设置通道为非阻塞
		socketChannelServer.configureBlocking(false);
		// 将该通道对应的 SeverSocket 绑定到 port 端口
		socketChannelServer.socket().bind(new InetSocketAddress(port));
		// 获得一个通道管理器
		this.selector = Selector.open();
		// 将通道管理器和该通道绑定, 并且为该通道注册 SelectionKey.OP_ACCEPT 事件
		// 注册该事件后, 当该事件到达时, selector.select() 会返回
		// 如果该事件没有到达, selector.select() 会一直阻塞
		socketChannelServer.register(selector, SelectionKey.OP_ACCEPT);
	}

	/** * 采用轮询的方式监听 selector 上是否有需要处理的事件。 如果有,则进行处理 */
	public void listen() throws IOException {
		System.out.println("服务端启动成功!");
		// 轮询访问 selector
		while (true) {
			// 当注册的事件到达时, 方法返回; 否则, 该方***一直阻塞
			this.selector.select();
			// 获得 selector 中选中的项的迭代器, 选中的项为注册的事件
			Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
			while (iterator.hasNext()) {
				SelectionKey key = iterator.next();
				// 删除已经选中的 key, 以防重复处理
				iterator.remove();
				handler(key);
			}
		}
	}

	/** * 处理请求 * * @param key selector 轮询出的被触发事件 * @throws IOException */
	private void handler(SelectionKey key) throws IOException {
		if (key.isAcceptable()) {
			// 客户端请求连接事件 OP_ACCEPT
			handlerAccept(key);
		} else if (key.isReadable()) {
			// 获得了可读的事件 OP_READ
			handlerRead(key);
		}
	}

	/** * 处理连接请求 * * @param key OP_ACCEPT 事件 * @throws IOException */
	private void handlerAccept(SelectionKey key) throws IOException {
		ServerSocketChannel socketChannelServer = (ServerSocketChannel) key.channel();
		// 获得和客户端连接的通道
		SocketChannel channel = socketChannelServer.accept();
		// 设置成非阻塞
		channel.configureBlocking(false);

		// 在这里可以给客户端发送信息哦
		System.out.println("新的客户端连接");
		// 在和客户端连接成功后, 为了可以接收到客户端的信息, 需要给通道设置可读的权限
		channel.register(this.selector, SelectionKey.OP_READ);
	}

	/** * 处理读的事件 * * @param key OP_READ * @throws IOException */
	private void handlerRead(SelectionKey key) throws IOException {
		// 服务器可读信息:得到事件发生的Socket通道
		SocketChannel channel = (SocketChannel) key.channel();
		// 创建读取的缓冲区
		ByteBuffer buffer = ByteBuffer.allocate(10); // bytes
		channel.read(buffer);
		byte[] data = buffer.array();
		String msg = new String(data);
		System.out.println("服务端收到信息:" + msg);
	}
	
	/** * 启动服务端测试 * @throws IOException */
	public static void main(String[] args) throws IOException {
		NioServer server = new NioServer();
		server.initServer(8000);
		server.listen();
	}
}

# 单客户端连接

启动

阻塞点(下图)

客户端尝试连接

telnet 127.0.0.1 8000

连接成功

然后回到原先的 阻塞点,继续阻塞

# 多客户端连接


这里因为只用了10字节,所以换行…


# 总结

这里整理下api

  • ServerSocketChannel:对应传统 ServerSocket
  • SocketChannel:对应 Socket
  • Selector:nio 核心,用于监听 SocketChannel 和 ServerSocketChannel
    <mark>因为同时监听:连接和消息事件(同时还能注册其他事件)。</mark>
    <mark>所以,实现了单线程为多个客户端服务 ! ! !</mark>
  • SelectionKey

大排档:服务员(selector)面向事件服务
⇒ 一个服务员对应多个客人

# 上面NIO代码的优化

我们给 可读事件 添加 回写功能 和 客户端的功能

	/** * 处理读的事件 * * @param key OP_READ * @throws IOException */
	private void handlerRead(SelectionKey key) throws IOException {
		// 服务器可读信息:得到事件发生的Socket通道
		SocketChannel channel = (SocketChannel) key.channel();
		// 创建读取的缓冲区
		ByteBuffer buffer = ByteBuffer.allocate(10); // bytes
		// 跟传统socket类似
		int read = channel.read(buffer);
		if(read<=0) {
			System.out.println("客户端关闭");
			key.cancel();
			return ;
		}
		byte[] data = buffer.array();
		String msg = new String(data);
		System.out.println("服务端收到信息:" + msg);

		// 回写数据
		ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
		// 将消息回送给客户端
			channel.write(outBuffer);
	}

## 说明!

必须添加

... 

if(read<=0) {
	System.out.println("客户端关闭");
	key.cancel();
	return ;
}

...

这段代码,用于检测客户端是否已断开。

否则,当客户端断开,回写会报错

或者出现死循环

<mark>原因是:服务端会循环从 channel 读取数据 !</mark>

netty

netty版本:3.x、4.x、4.x

# 应用领域

  1. 分布式进程通信
    如:hadoop、dubbo、akka等具有分布式功能的框架,底层RPC通信都是基于netty实现的,这些框架版本通常都还在用netty3.x(2019年2月10日)

  2. 游戏服务器开发
    最新的游戏服务器都有部分公司可能已经开始采用 netty4.x 或者 netty5.x

# netty hello! (MessageEvent 声明周期)

创建 maven jar 项目

引入 netty 依赖

	<dependencies>
		<!-- https://mvnrepository.com/artifact/io.netty/netty -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty</artifactId>
			<version>3.10.5.Final</version>
		</dependency>
	</dependencies>

创建 netty 的服务类

package cn.edut.server;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class Server {

	public static void main(String[] args) {
		// 服务类
		ServerBootstrap bootstrap = new ServerBootstrap();

		ExecutorService bossExecutor = Executors.newCachedThreadPool();
		ExecutorService workerExecutor = Executors.newCachedThreadPool();

		// 设置niosocket工厂
		bootstrap.setFactory(new NioServerSocketChannelFactory(bossExecutor, workerExecutor));

		// 设置管道的工厂
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			// 管道可以理解为:一堆过滤器
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				// 通常服务类都是xxx+s,如channels
				ChannelPipeline pipeline = Channels.pipeline();
				// 添加handler
				pipeline.addLast("helloHandler", new HelloHandler());
				return pipeline;
			}
		});

		// 为服务类绑定端口
		bootstrap.bind(new InetSocketAddress(10101));
		
		System.out.println("服务启动!");
	}
}

上面的 HelloHandler 是自定义的 handler 类

下面实现一下下

继承 SimpleChannelHandler ,并(目前阶段)关注下面打钩的方法

package cn.edut.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class HelloHandler extends SimpleChannelHandler {

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		System.out.println("MessageEvent:"+e);
		System.out.println("messageReceived");
		super.messageReceived(ctx, e);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		System.out.println("exceptionCaught");
		super.exceptionCaught(ctx, e);
	}

	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		System.out.println("channelConnected");
		super.channelConnected(ctx, e);
	}

	@Override
	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		System.out.println("channelDisconnected");
		super.channelDisconnected(ctx, e);
	}

	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		System.out.println("channelClosed");
		super.channelClosed(ctx, e);
	}

}

启动 server 服务

客户端连接:

telnet 127.0.0.1 10101

客户端发送信息

send hello 

可以看到,MessageEvent 记录了来自客户端的信息

断开连接

channelClosedchannelDisconnected 的区别:

  • channelClosed : channel 关闭的时候就触发
  • channelDisconnected :<mark>必须是连接已经建立</mark>,关闭通道的时候才会触发

## 内置Handler:StringDecoder

获取客户端信息,我们需要在 messageReceived 方法上修改

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		System.out.println("messageReceived");
		System.out.println("MessageEvent:"+e);
		
		//获取客户端信息
		ChannelBuffer buffer = (ChannelBuffer) e.getMessage() ; 
		System.out.println("message:"+new String(buffer.array()));
		
		super.messageReceived(ctx, e);
	}

但是这样很麻烦, netty 提供了 handler:StringDecoder

在 channelPipeline 里面添加 handler


这个handler 下面的handler 需要获取 客户端发送的信息,在 messageReceived 中只需要

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		System.out.println("messageReceived");
		System.out.println("MessageEvent:" + e);

		// 获取客户端信息
		String message = (String) e.getMessage();
		System.out.println("message:" + message);

		super.messageReceived(ctx, e);
	}

即可

## 回写数据

修改 messageReceived 方法,添加回写代码

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		System.out.println("messageReceived");
		System.out.println("MessageEvent:" + e);

		// 获取客户端信息
		String message = (String) e.getMessage();
		System.out.println("message:" + message);

		// 回写数据
		ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer("hi from server".getBytes());
		ctx.getChannel().write(copiedBuffer);

		super.messageReceived(ctx, e);
	}


<mark>在 telnet 的 命令行模式下 按回车</mark>,可以看到服务端回写的数据

注意:!
回写数据需要 ChannelBuffer 或者 FileRegion
否则抛出 IllegalArgumentException

## 内置Handler:StringEncoder

同理,netty 也内置了 回写的 handler方便我们使用

# 名词说明

## ExecutorService

  • boss:监听端口
  • worker:监听读写
    (这里介绍,后面看源码就完全懂了)