文章目录
1. ByteBuffer
1.1、基本介绍
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel
提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer
。
1.2、Buffer类介绍
- 基类是
Buffer
抽象类 - 基类派生出基于基本数据类型的7个
xxxBuffer
抽象类,没有boolean
相关的buffer
类。 - 除了
ByteBuffer
外,每个基本数据的抽象类xxxBuffer
类下面都派生出转向ByteBuffer
的类ByteBufferXxxAsBufferL
和ByteBufferAsXxxBufferB
实现类;以及DirectXxxBufferU
和DirectXxxBufferS
和HeapXxxBuffer
==(具体实例对象类)==这五个类。 - 就只有抽象类
CharBuffer
派生出了第六个类StringCharBuffer
。 ByteBuffer
只派生出了HeapByteBuffer
和MappedByteBufferR
两个类- 类图如下:
1.2.1、Buffer类主要属性
属性 | 描述 |
---|---|
Capacity | 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变 |
Limit | 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的 |
Position | 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备 |
Mark | 标记 ,一般不会主动修改,在flip() 被调用后,mark就作废了。 |
mark <= position <= limit <= capacity
1.2.2、Buffer类使用示例
//创建一个Buffer,大小为5,即可以存放5个int
IntBuffer intBuffer = IntBuffer.allocate(5);
//向buffer中存放数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}
//如何从buffer中读取数据
//将buffer转换,读写切换
intBuffer.flip();
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
123456789101112
Buffer
刚创建时,capacity = 5
,固定不变。limit
指针指向5
,position
指向0
,mark
指向-1
- 之后调用
intBuffer.put
方法,向buffer
中添加数据,会不断移动position
指针,最后position
变量会和limit
指向相同。 - 调用
buffer.flip()
实际上是重置了position
和limit
两个变量,将limit
放在position
的位置,position
放在0
的位置。这里只是最后的position
和limit
位置相同,所以flip
后limit
位置没变。
- 调用
intBuffer.get()
实际上是不断移动position
指针,直到它移动到limit
的位置
1.2.3、Buffer类主要方法
Buffer基类(抽象类)
public final int capacity();
- 直接返回了此缓冲区的容量,
capacity
- 直接返回了此缓冲区的容量,
public final int position();
- 直接返回了此缓冲区指针的当前位置
public final Buffer position(int newPosition);
- 设置此缓冲区的位置,设置
position
- 设置此缓冲区的位置,设置
public final int limit();
- 返回此缓冲区的限制
public final Buffer limit(int newLimit);
- 设置此缓冲区的限制,设置
limit
- 设置此缓冲区的限制,设置
public final Buffer clear();
- 清除此缓冲区,即将各个标记恢复到初识状态,
position = 0;limit = capacity; mark = -1
,但是并没有删除数据。
- 清除此缓冲区,即将各个标记恢复到初识状态,
public final Buffer flip();
- 切换为读模式
- 反转此缓冲区,
limit = position;position = 0;mark = -1
。 - 当指定数据存放在缓冲区中后,
position
所指向的即为此缓冲区数据最后的位置。只有当数据大小和此缓冲区大小相同时,position
才和limit
的指向相同。 flip()
方法将limit
置向position
,position
置0
,那么从position
读取数据到limit
即为此缓冲区中所有的数据。
public final boolean hasRemaining();
- 告知当前位置和限制之间是否有元素。
return position < limit;
- 告知当前位置和限制之间是否有元素。
public abstract boolean isReadOnly();
- 此方法为抽象方法,告知此缓冲区是否为只读缓冲区,具体实现在各个实现类中。
public abstract boolean hasArray();
- 告知此缓冲区是否具有可访问的底层实现数组
public abstract Object array();
- 返回此缓冲区的底层实现数组
Buffer具体实现类(ByteBuffer为例)
从前面可以看出来对于Java中的基本数据类型(boolean除外),都有一个Buffer
类型与之对应,最常用的自然是ByteBuffer
类(二进制数据),该类的主要方法如下:
-
public static ByteBuffer allocateDirect(int capacity);
- 创建直接缓冲区
-
public static ByteBuffer allocate(int capacity) ;
- 设置缓冲区的初识容量
-
public abstract byte get();
- 从当前位置
position
上get
数据,获取之后,position
会自动加1
- 从当前位置
-
public abstract byte get(int index);
- 通过绝对位置获取数据。
-
public abstract ByteBuffer put(byte b);
- 从当前位置上添加,
put
之后,position
会自动加1
- 从当前位置上添加,
-
public abstract ByteBuffer put(int index, byte b);
- 从绝对位置上添加数据
-
public abstract ByteBuffer putXxx(Xxx value [, int index]);
-
从
position
当前位置插入元素。Xxx
表示基本数据类型 -
此方法时类型化的
put
和get
,put
放入的是什么数据类型,get
就应该使用相应的数据类型来取出,否则可能有BufferUnderflowException
异常。 -
示例如下:
ByteBuffer buf = ByteBuffer.allocate(64); //类型化方式放入数据 buf.putInt(100); buf.putLong(20); buf.putChar('上'); buf.putShort((short)44); //取出,当取出的顺序和上面插入的数据类型的顺序不对时,就会抛出BufferUnderflowException异常 buf.flip(); System.out.println(buf.getInt()); System.out.println(buf.getLong()); System.out.println(buf.getChar()); System.out.println(buf.getShort()); 1234567891011121314
-
-
可以将一个普通的Buffer转成只读的Buffer
//创建一个Buffer ByteBuffer byteBuffer = ByteBuffer.allocate(64); for (int i = 0; i < 64; i++) { byteBuffer.put((byte)i); } //读取 byteBuffer.flip(); //得到一个只读的Buffer ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer(); System.out.println(readOnlyBuffer.getClass()); //读取 while (readOnlyBuffer.hasRemaining()){ System.out.println(readOnlyBuffer.get()); } readOnlyBuffer.put((byte)100); //会抛出 ReadOnlyBufferException 123456789101112131415
-
MappedByteBuffer
可以让文件直接在内存(堆外内存)中进行修改,而如何同步到文件由NIO来完成/** * 1、MappedByteBuffer可以让文件直接在内存中(堆外内存)修改,操作系统不需要拷贝一次 */ @Test public void test() throws IOException { RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw"); //获取对应的文件通道 FileChannel channel = randomAccessFile.getChannel(); /** * 参数1: FileChannel.MapMode.READ_WRITE,使用的读写模式 * 参数2: 0,可以直接修改的起始位置 * 参数3: 5,是映射到内存的大小(不是文件中字母的索引位置),即将 1.txt 的多少个字节映射到内存,也就是可以直接修改的范围就是 [0, 5) * 实际的实例化类型:DirectByteBuffer */ MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5); mappedByteBuffer.put(0,(byte)'N'); mappedByteBuffer.put(3, (byte)'M'); mappedByteBuffer.put(5, (byte)'Y'); //会抛出 IndexOutOfBoundsException randomAccessFile.close(); System.out.println("修改成功~"); } 1234567891011121314151617181920212223
1.flip相关
客户端: 写入 socketChannel.write(byteBuffer)之前一定要使用flip切换为读模式才可以成功写入到服务端 注意:(ByteBuffer.wrap(str.getBytes()))特殊他已经是写模式不需要切换
服务端: 在读取(socketChannel.read(byteBuffer)) 之前一定要使用flip切换为读模式才可以读取到数据
3、Selector(选择器)
3.1、基本介绍
- Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)
- Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
- 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
- 避免了多线程之间的上下文切换导致的开销
Netty
的IO
线程NioEventLoop
聚合了Selector
(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。- 当线程从某客户端
Socket
通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。 - 线程通常将非阻塞
IO
的空闲时间用于在其他通道上执行IO
操作,所以单独的线程可以管理多个输入和输出通道。 - 由于读写操作都是非阻塞的,这就可以充分提升
IO
线程的运行效率,避免由于频繁I/O
阻塞导致的线程挂起。 - 一个
I/O
线程可以并发处理N
个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O
一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
3.2、SelectionKey介绍
主要作用:
Selector
通过管理SelectionKey
的集合从而去监听各个Channel
。当Channel
注册到Selector
上面时,会携带该Channel
关注的事件**(SelectionKey包含Channel以及与之对应的事件)**,并会返回一个SelectionKey
的对象,Selector
将该对象加入到它统一管理的集合中去,从而对Channel
进行管理。SelectionKey表示的是Selector和网络通道的注册关系,固FileChannel是没有办法通过SelectionKey注册到Selector上去的。
四大事件:
public static final int OP_READ = 1 << 0
- 值为
1
,表示读操作, - 代表本
Channel
已经接受到其他客户端传过来的消息,需要将Channel
中的数据读取到Buffer
中去
- 值为
public static final int OP_WRITE = 1 << 2
- 值为
4
,表示写操作 - 一般临时将
Channel
的事件修改为它,在处理完后又修改回去。我暂时也没明白具体的作用。
- 值为
public static final int OP_CONNECT = 1 << 3
- 值为
8
,代表建立连接。 - 一般在
ServerSocketChannel
上绑定该事件,结合channel.finishConnect()
在连接建立异常时进行异常处理
- 值为
public static final int OP_ACCEPT = 1 << 4
- 值为
16
,表示由新的网络连接可以accept
。 - 与
ServerSocketChannel
进行绑定,用于创建新的SocketChannel
,并把其注册到Selector
上去
- 值为
事件介绍
-
OP_ACCEPT
事件就绪:当收到一个客户端的连接请求时,该事件就绪。这是ServerSocketChannel上唯一有效的操作。(服务器端) -
OP_CONNECT
事件就绪:只有客户端SocketChannel会注册该操作,当客户端调用SocketChannel.connect()时,该事件就绪。(客户端) -
OP_READ
事件就绪:该操作对客户端和服务端的SocketChannel都有效,当OS的读缓冲区中有数据可读时,该事件就绪。 -
OP_WRITE
事件就绪:该操作对客户端和服务端的SocketChannel都有效,当OS的写缓冲区中有空闲的空间时,该事件就绪。 -
accept
和connect
的区别:accept是服务器端接收到连接,connect则是客户端发起连接成功。后者用在客户端,当连接成功后注册相关读写事件。 -
IO复用更多的是指一个线程监控多个IO通道,这个监控线程的复用。
相关方法
public abstract Selector selector()
- 得到该
SelectionKey
具体是属于哪个Selector
对象的
- 得到该
public abstract SelectableChannel channel()
- 通过
SelectionKey
的到对应的Channel
- 通过
public final Object attachment()
- 得到与之关联的共享数据,一般用于获取
buffer
- 在使用
register
注册通道时,也可以为该Channel
绑定一个Buffer
,可以通过本方法获取这个Buffer
。 - 通过
selectionKey.attach(Object ob)
绑定的数据,也是通过该方法获取
- 得到与之关联的共享数据,一般用于获取
public abstract SelectionKey interestOps()
- 获取该
SelectionKey
下面的事件
- 获取该
public abstract SelectionKey interestOps(int ops)
- 用于设置或改变某个
Channel
关联的事件 - 增加事件:
key.interestOps(key.interestOps | SelectionKey.OP_WRITE)
- 减少事件:
key.interestOps(key.interestOps & ~SelectionKey.OP_WRITE)
- 用于设置或改变某个
public final boolean isAcceptable(),isReadable(),isWritable(),isConnectable()
- 用于判断这个
SelectionKey
产生的是什么事件,与上面的事件类型一一对应
- 用于判断这个
3.3、Selector常见方法
public static Selector open();
- 得到一个选择器对象,实例化出
WindowsSelectorImpl
对象。
- 得到一个选择器对象,实例化出
public int select(long timeout)
- 监控所有注册的通道,当其中有
IO
操作可以进行时,将对应的SelectionKey
加入到内部集合中并返回,返回的结果为Channel
响应的事件总和,当结果为0
时,表示本Selector
监听的所有Channel
中没有Channel
产生事件。 - 如果不传入
timeout
值,就会阻塞线程,传入值则为阻塞多少毫秒,通过它设置超时时间。 - 之所以需要传入时间,是为了让它等待几秒钟再看有没有
Channel
会产生事件,从而获取一段时间内产生事件的Channel
的总集合再一起处理。
- 监控所有注册的通道,当其中有
selector.selectNow();
- 不会阻塞,立马返回冒泡的事件数
public Set selectedKeys()
- 从内部集合中得到所有触发事件的
SelectionKey
- 从内部集合中得到所有触发事件的
public Set\<SelectionKey> keys()
- 获取所有已经注册的
SelectionKey
- 获取所有已经注册的
- wakeUp()
- 由于调用
select()
而被阻塞的线程,可以通过调用Selector.wakeup()
来唤醒即便此时已然没有channel处于就绪状态。具体操作是,在另外一个线程调用wakeup
,被阻塞与select
方法的线程就会立刻返回。
- 由于调用
4、Demo实例
编码步骤:
- 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
- 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
- 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
- 进一步得到各个 SelectionKey (有事件发生)
- 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
- 判断该Channel的事件类型,对不同事件进行不同的业务处理
4.1、NIO入门案例:实现服务器和客户端的简单通讯
public class NioTest {
private static void handleInput(Selector selector, SelectionKey selectionKey) throws IOException {
if (selectionKey.isValid()) {
// 根据key,对应通道发生的事件做相应处理
// 如果是连接事件(OP_ACCEPT)
if (selectionKey.isAcceptable()) {
//通过事件(SelectionKey) 反向获取通道
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
//响应客户端的连接 并获取连接客户端的通道
SocketChannel socketChannel = channel.accept();
//设置为非阻塞状态
socketChannel.configureBlocking(false);
//给客户端的通道注册一个读事件 同时给socketChannel关联一个缓存区(可选)
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("获取到一个客户端连接 已经成功注册到selector");
}
//如果是读事件(OP_READ)
if (selectionKey.isReadable()) {
//通过事件(selectionKey) 反向获取对应的管道(Channel)
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//获取注册事件时候 socketChannel关联的缓冲区
ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
//把管道的东西读进缓冲区
int readByte = socketChannel.read(byteBuffer);
if (readByte > 0) {
//缓冲区切换为写模式
byteBuffer.flip();
System.out.println("来自客户端的消息:" + new String(byteBuffer.array()));
} else if (readByte < 0) {
//对端链路关闭(传输中断)
selectionKey.cancel();
socketChannel.close();
} else {
}
}
}
}
@Test
public void Server() throws IOException {
//创建ServerSocketChannel -> ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个Selector对象
Selector selector = Selector.open();
//绑定一个端口6666
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置非阻塞
serverSocketChannel.configureBlocking(false);
//把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接
SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println();
//循环等待客户端连接
while (true) {
//等待1秒,如果没有事件发生,就返回
if (selector.select(1000) == 0) {
System.out.println("服务器等待了1秒,无连接");
continue;
}
//如果返回的 > 0,表示已经获取到关注的事件
// 就获取到相关的 selectionKey 集合,反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//遍历 Set<SelectionKey>,使用迭代器遍历
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
if (selector.select(1000) == 0) {
System.out.println("服务端等待一秒,没有客户端进行连接");
continue;
}
// 返回关注事件触发的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
SelectionKey selectionKey = null;
while (keyIterator.hasNext()) {
//通过迭代器遍历set 获取其中存放的事件(SelectionKey)
selectionKey = keyIterator.next();
//获取到SelectionKey后 从原set中手动删除事件(SelectionKey)防止重复触发
keyIterator.remove();
try {
//处理事件
handleInput(selector, selectionKey);
} catch (IOException e) {
if (selectionKey != null) {
selectionKey.cancel();
System.out.println("客户端关闭了");
if (selectionKey.channel() != null)
selectionKey.channel().close();
}
}
}
}
}
}
@Test
public void Client() throws IOException {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//提供服务器端的IP和端口
InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(socketAddress)) {
//如果不成功
while (!socketChannel.finishConnect()) {
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作。。。");
}
}
//如果连接成功,就发送数据
String str = "hello, 尚硅谷";
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
//发送数据,实际上就是将buffer数据写入到channel
socketChannel.write(byteBuffer);
System.in.read();
}
}
4.2、群聊系统Demo
客户端
package com.itcode.nio.groupchat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
/** * @author 夏天 * @date 2020年11月04日 13:49 */
public class GroupChatClient {
//服务器ip
private final String HOST = "127.0.0.1";
private final int PORT = 6667;
private Selector selector;
private SocketChannel socketChannel;
private String username;
private boolean stop = false;
public GroupChatClient() throws IOException {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
username = socketChannel.getLocalAddress().toString();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
/** * 向服务器发送消息 */
public void sendInfo(String info) throws IOException {
info = username + "说: " + info;
//把消息发送到服务端
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
}
/** * 读取从服务器端回复的消息 */
public void readInfo() {
try {
while (selector.select(3000) == 0) {
}
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
//删除事件 避免重复
keyIterator.remove();
//读事件
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(byteBuffer);
if (len > 0) {
System.out.println(new String(byteBuffer.array()));
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
//启动客户端
GroupChatClient chatClient = new GroupChatClient();
//启动一个线程用于读取服务器的消息
new Thread(() -> {
while (true) {
chatClient.readInfo();
}
}).start();
//主线程用于发送数据给服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}
服务端
package com.itcode.nio.groupchat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/** * @author 夏天 * @date 2020年11月04日 12:51 */
public class GroupChatServer {
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6667;
public GroupChatServer() {
try {
//得到选择器
selector = Selector.open();
//ServerSocketChannel
listenChannel = ServerSocketChannel.open();
//绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
//设置非阻塞模式
listenChannel.configureBlocking(false);
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() {
try {
while (true) {
while (selector.select(2000) == 0) {
System.out.println("等待");
}
//遍历得到SelectionKey 集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//手动从集合中删除事件 防止重复触发
iterator.remove();
if (key.isValid()) {
//监听到accept(如果有连接会产生该事件)事件
if (key.isAcceptable()) {
//进行连接
SocketChannel socketChannel = listenChannel.accept();
socketChannel.configureBlocking(false);
//把连接的SocketChannel的读事件注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(socketChannel.getRemoteAddress() + " 上线");
}
//监听读事件 (如果某个注册的通道是可读状态触发读事件)
if (key.isReadable()) {
//处理读
readData(key);
}
}
}
}
} catch (Throwable e) {
e.printStackTrace();
}
}
private void readData(SelectionKey key) throws IOException {
SocketChannel channel = null;
//得到channel
try {
channel = (SocketChannel) key.channel();
//创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
//根据count值做处理
if (count > 0) {
//把缓冲区数据转成字符串
String msg = new String(buffer.array());
//输出该消息
System.out.println("form 客户端" + msg);
//向其他的客户端转发消息,不需要向当前客户端转发消息
sendInfoToOtherClients(msg, channel);
} else if (count < 0) {
// 对端链路关闭
System.out.println(channel.getRemoteAddress() + "离线了");
key.cancel();
channel.close();
} else {
}
} catch (Exception e) {
if (channel != null) {
System.out.println(channel.getRemoteAddress() + "离线了");
}
//取消注册
key.channel();
key.channel().close();
}
}
/** * 向其他的客户端转发消息,不需要向当前客户端转发消息 * * @param msg 消息 * @param self 发送消息的客户端 */
private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
System.out.println("服务器转发消息中...");
//遍历注册的所有Channel
for (SelectionKey key : selector.keys()) {
//通过key取出对应的SocketChannel
Channel targetChannel = key.channel();
//是SocketChannel实现类 不是发送消息的客户端
if (targetChannel instanceof SocketChannel && !targetChannel.equals(self)) {
SocketChannel dest = (SocketChannel) targetChannel;
//将msg存储到buffer
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
//将buffer的数据写入通道
dest.write(byteBuffer);
}
}
}
public static void main(String[] args) {
new GroupChatServer().listen();
}
}
注意事项:
-
使用
int read = channel.read(buffer)
读取数据时,读取的结果情况:
-
当
read=-1
时,说明客户端的数据发送完毕,并且主动的关闭socket
。所以这种情况下,服务器程序需要关闭socketSocket
,并且取消key
的注册。注意:这个时候继续使用SocketChannel
进行读操作的话,就会抛出:远程主机强迫关闭一个现有的连接的IO异常 -
当
read=0
时:- 某一时刻
SocketChannel
中当前没有数据可读。 - 客户端的数据发送完毕。
- 某一时刻
-
-
socketChannel.connect()
会导致socketChannel.getLocalAddress()
为null
5、NIO的零拷贝
零拷贝是网络编程的关键,很多性能优化都离不开它。零拷贝是指:从操作系统的角度来看,文件的传输不存在CPU的拷贝,只存在DMA拷贝。在Java程序中,常用的零拷贝有 mmap(内存映射)和 sendFile。
零拷贝不仅仅带来更少的数据复制,还能减少线程的上下文切换,减少CPU缓存伪共享以及无CPU校验和计算。
传统IO的读写:
File file = new File("test.txt");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
byte[] arr = new byte[(int) file.length()];
raf.read(arr);
Socket socket = new ServerSocket(8080).accept();
socket.getOutputStream().write(arr);
mmap优化的IO读写:
-
mmap通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内存空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数。
-
需要进行4次上下文切换,3次数据拷贝。
-
适合小数据量的读写。
sendFile优化的IO读写: -
Linux2.1 版本提供了
sendFile
函数,其基本原理如下:数据根本不经过用户态,直接从内核缓冲区进入到SocketBuffer
,同时,由于和用户态完全无关,就减少了一次上下文切换。 -
需要3次上下文切换和最少2此数据拷贝。
-
适合大文件的传输。
- 而 Linux 在 2.4 版本中,做了一些修改,避免了从内核缓冲区拷贝到 Socket Buffer 的操作,直接拷贝到协议栈,从而再一次减少了数据拷贝。
- 注:这里其实有一次CPU拷贝,
kernel buffer -> socket buffer
。但是,拷贝的信息很少,只拷贝了数据的长度、偏移量等关键信息,消耗低,可以忽略不计。
NIO中的零拷贝(transferTo):
- transferTo 以当前channel为标准 拷贝的目标管道
- transferForm 和transferTo 类似:把目标管道拷贝的当前管道中
- 在linux下transferTo 方法就可以完成传输
- 在windows下调用transferTo 只能发送8m就需要分段传输文件
算法 : 用文件大小除8 并向上取整 为发送次数
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 7001));
//得到一个文件CHANNEl
FileChannel channel = new FileInputStream("a.zip").getChannel();
//准备发送
long startTime = System.currentTimeMillis();
//在Linux下一个 transferTo 方法就可以完成传输
//在windows 下一次调用 transferTo 只能发送 8M,就需要分段传输文件
//传输时的位置
//transferTo 底层使用到零拷贝
long transferCount = channel.transferTo(0, channel.size(), socketChannel);
System.out.println("发送的总的字节数:" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));
channel.close();
}