1. ByteBuffer

1.1、基本介绍

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer

1.2、Buffer类介绍

  • 基类是Buffer抽象类
  • 基类派生出基于基本数据类型的7个xxxBuffer 抽象类,没有boolean相关的buffer类。
  • 除了ByteBuffer外,每个基本数据的抽象类 xxxBuffer 类下面都派生出转向 ByteBuffer 的类 ByteBufferXxxAsBufferLByteBufferAsXxxBufferB实现类;以及 DirectXxxBufferUDirectXxxBufferSHeapXxxBuffer==(具体实例对象类)==这五个类。
  • 就只有抽象类CharBuffer 派生出了第六个类StringCharBuffer
  • ByteBuffer只派生出了 HeapByteBufferMappedByteBufferR 两个类
  • 类图如下:

1.2.1、Buffer类主要属性

属性 描述
Capacity 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变
Limit 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
Position 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备
Mark 标记 ,一般不会主动修改,在flip()被调用后,mark就作废了。

mark <= position <= limit <= capacity

1.2.2、Buffer类使用示例

//创建一个Buffer,大小为5,即可以存放5个int
IntBuffer intBuffer = IntBuffer.allocate(5);
//向buffer中存放数据
for (int i = 0; i < intBuffer.capacity(); i++) {
   
    intBuffer.put(i * 2);
}
//如何从buffer中读取数据
//将buffer转换,读写切换
intBuffer.flip();
while (intBuffer.hasRemaining()) {
   
    System.out.println(intBuffer.get());
}
123456789101112
  • Buffer 刚创建时,capacity = 5 ,固定不变。limit指针指向5position指向0mark指向-1
  • 之后调用 intBuffer.put方法,向buffer中添加数据,会不断移动position指针,最后position变量会和limit指向相同。
  • 调用 buffer.flip()实际上是重置了positionlimit两个变量,将limit放在position的位置,position放在0的位置。这里只是最后的positionlimit位置相同,所以fliplimit位置没变。
  • 调用 intBuffer.get()实际上是不断移动position指针,直到它移动到limit的位置

1.2.3、Buffer类主要方法

Buffer基类(抽象类)
  • public final int capacity();
    • 直接返回了此缓冲区的容量,capacity
  • public final int position();
    • 直接返回了此缓冲区指针的当前位置
  • public final Buffer position(int newPosition);
    • 设置此缓冲区的位置,设置position
  • public final int limit();
    • 返回此缓冲区的限制
  • public final Buffer limit(int newLimit);
    • 设置此缓冲区的限制,设置limit
  • public final Buffer clear();
    • 清除此缓冲区,即将各个标记恢复到初识状态, position = 0;limit = capacity; mark = -1,但是并没有删除数据。
  • public final Buffer flip();
    • 切换为读模式
    • 反转此缓冲区, limit = position;position = 0;mark = -1
    • 当指定数据存放在缓冲区中后,position所指向的即为此缓冲区数据最后的位置。只有当数据大小和此缓冲区大小相同时,position才和limit的指向相同。
    • flip()方法将limit置向positionposition0,那么从position读取数据到limit即为此缓冲区中所有的数据。
  • public final boolean hasRemaining();
    • 告知当前位置和限制之间是否有元素。return position < limit;
  • public abstract boolean isReadOnly();
    • 此方法为抽象方法,告知此缓冲区是否为只读缓冲区,具体实现在各个实现类中。
  • public abstract boolean hasArray();
    • 告知此缓冲区是否具有可访问的底层实现数组
  • public abstract Object array();
    • 返回此缓冲区的底层实现数组
Buffer具体实现类(ByteBuffer为例)

从前面可以看出来对于Java中的基本数据类型(boolean除外),都有一个Buffer类型与之对应,最常用的自然是ByteBuffer类(二进制数据),该类的主要方法如下:

  • public static ByteBuffer allocateDirect(int capacity);

    • 创建直接缓冲区
  • public static ByteBuffer allocate(int capacity) ;

    • 设置缓冲区的初识容量
  • public abstract byte get();

    • 从当前位置positionget数据,获取之后,position会自动加1
  • public abstract byte get(int index);

    • 通过绝对位置获取数据。
  • public abstract ByteBuffer put(byte b);

    • 从当前位置上添加,put之后,position会自动加1
  • public abstract ByteBuffer put(int index, byte b);

    • 从绝对位置上添加数据
  • public abstract ByteBuffer putXxx(Xxx value [, int index]);

    • position当前位置插入元素。Xxx表示基本数据类型

    • 此方法时类型化的 putgetput放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。

    • 示例如下:

      ByteBuffer buf = ByteBuffer.allocate(64);
      
      //类型化方式放入数据
      buf.putInt(100);
      buf.putLong(20);
      buf.putChar('上');
      buf.putShort((short)44);
      
      //取出,当取出的顺序和上面插入的数据类型的顺序不对时,就会抛出BufferUnderflowException异常
      buf.flip();
      System.out.println(buf.getInt());
      System.out.println(buf.getLong());
      System.out.println(buf.getChar());
      System.out.println(buf.getShort());
      1234567891011121314
      
  • 可以将一个普通的Buffer转成只读的Buffer

    //创建一个Buffer
    ByteBuffer byteBuffer = ByteBuffer.allocate(64);
    for (int i = 0; i < 64; i++) {
         
        byteBuffer.put((byte)i);
    }
    //读取
    byteBuffer.flip();
    //得到一个只读的Buffer
    ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
    System.out.println(readOnlyBuffer.getClass());
    //读取
    while (readOnlyBuffer.hasRemaining()){
         
        System.out.println(readOnlyBuffer.get());
    }
    readOnlyBuffer.put((byte)100); //会抛出 ReadOnlyBufferException
    123456789101112131415
    
  • MappedByteBuffer可以让文件直接在内存(堆外内存)中进行修改,而如何同步到文件由NIO来完成

    /** * 1、MappedByteBuffer可以让文件直接在内存中(堆外内存)修改,操作系统不需要拷贝一次 */
    @Test
    public void test() throws IOException {
         
        RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
        //获取对应的文件通道
        FileChannel channel = randomAccessFile.getChannel();
        /** * 参数1: FileChannel.MapMode.READ_WRITE,使用的读写模式 * 参数2: 0,可以直接修改的起始位置 * 参数3: 5,是映射到内存的大小(不是文件中字母的索引位置),即将 1.txt 的多少个字节映射到内存,也就是可以直接修改的范围就是 [0, 5) * 实际的实例化类型:DirectByteBuffer */
        MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
    
        mappedByteBuffer.put(0,(byte)'N');
        mappedByteBuffer.put(3, (byte)'M');
        mappedByteBuffer.put(5, (byte)'Y'); //会抛出 IndexOutOfBoundsException
    
        randomAccessFile.close();
        System.out.println("修改成功~");
    }
    1234567891011121314151617181920212223
    

1.flip相关

客户端: 写入 socketChannel.write(byteBuffer)之前一定要使用flip切换为读模式才可以成功写入到服务端 注意:(ByteBuffer.wrap(str.getBytes()))特殊他已经是写模式不需要切换
服务端: 在读取(socketChannel.read(byteBuffer)) 之前一定要使用flip切换为读模式才可以读取到数据

3、Selector(选择器)

3.1、基本介绍

  • Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)
  • Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
  • 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
  • 避免了多线程之间的上下文切换导致的开销
  • NettyIO线程NioEventLoop聚合了Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
  • 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
  • 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
  • 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
  • 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

3.2、SelectionKey介绍


主要作用:

Selector通过管理SelectionKey的集合从而去监听各个Channel。当Channel注册到Selector上面时,会携带该Channel关注的事件**(SelectionKey包含Channel以及与之对应的事件)**,并会返回一个SelectionKey的对象,Selector将该对象加入到它统一管理的集合中去,从而对Channel进行管理。SelectionKey表示的是Selector和网络通道的注册关系,固FileChannel是没有办法通过SelectionKey注册到Selector上去的。

四大事件:

  • public static final int OP_READ = 1 << 0
    • 值为1,表示读操作,
    • 代表本Channel已经接受到其他客户端传过来的消息,需要将Channel中的数据读取到Buffer中去
  • public static final int OP_WRITE = 1 << 2
    • 值为4,表示写操作
    • 一般临时将Channel的事件修改为它,在处理完后又修改回去。我暂时也没明白具体的作用。
  • public static final int OP_CONNECT = 1 << 3
    • 值为8,代表建立连接。
    • 一般在ServerSocketChannel上绑定该事件,结合 channel.finishConnect()在连接建立异常时进行异常处理
  • public static final int OP_ACCEPT = 1 << 4
    • 值为16,表示由新的网络连接可以accept
    • ServerSocketChannel进行绑定,用于创建新的SocketChannel,并把其注册到Selector上去

事件介绍

  • OP_ACCEPT事件就绪:当收到一个客户端的连接请求时,该事件就绪。这是ServerSocketChannel上唯一有效的操作。(服务器端)

  • OP_CONNECT事件就绪:只有客户端SocketChannel会注册该操作,当客户端调用SocketChannel.connect()时,该事件就绪。(客户端)

  • OP_READ事件就绪:该操作对客户端和服务端的SocketChannel都有效,当OS的读缓冲区中有数据可读时,该事件就绪。

  • OP_WRITE事件就绪:该操作对客户端和服务端的SocketChannel都有效,当OS的写缓冲区中有空闲的空间时,该事件就绪。

  • acceptconnect的区别:accept是服务器端接收到连接,connect则是客户端发起连接成功。后者用在客户端,当连接成功后注册相关读写事件。

  • IO复用更多的是指一个线程监控多个IO通道,这个监控线程的复用。

相关方法

  • public abstract Selector selector()
    • 得到该SelectionKey具体是属于哪个Selector对象的
  • public abstract SelectableChannel channel()
    • 通过SelectionKey的到对应的Channel
  • public final Object attachment()
    • 得到与之关联的共享数据,一般用于获取buffer
    • 在使用register注册通道时,也可以为该Channel绑定一个Buffer,可以通过本方法获取这个Buffer
    • 通过selectionKey.attach(Object ob)绑定的数据,也是通过该方法获取
  • public abstract SelectionKey interestOps()
    • 获取该SelectionKey下面的事件
  • public abstract SelectionKey interestOps(int ops)
    • 用于设置或改变某个Channel关联的事件
    • 增加事件:key.interestOps(key.interestOps | SelectionKey.OP_WRITE)
    • 减少事件:key.interestOps(key.interestOps & ~SelectionKey.OP_WRITE)
  • public final boolean isAcceptable(),isReadable(),isWritable(),isConnectable()
    • 用于判断这个SelectionKey产生的是什么事件,与上面的事件类型一一对应

3.3、Selector常见方法

  • public static Selector open();
    • 得到一个选择器对象,实例化出 WindowsSelectorImpl对象。
  • public int select(long timeout)
    • 监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入到内部集合中并返回,返回的结果为Channel响应的事件总和,当结果为0时,表示本Selector监听的所有Channel中没有Channel产生事件。
    • 如果不传入timeout值,就会阻塞线程,传入值则为阻塞多少毫秒,通过它设置超时时间。
    • 之所以需要传入时间,是为了让它等待几秒钟再看有没有Channel会产生事件,从而获取一段时间内产生事件的Channel的总集合再一起处理。
  • selector.selectNow();
    • 不会阻塞,立马返回冒泡的事件数
  • public Set selectedKeys()
    • 从内部集合中得到所有触发事件的SelectionKey
  • public Set\<SelectionKey> keys()
    • 获取所有已经注册的SelectionKey
  • wakeUp()
    • 由于调用select()而被阻塞的线程,可以通过调用Selector.wakeup()来唤醒即便此时已然没有channel处于就绪状态。具体操作是,在另外一个线程调用wakeup,被阻塞与select方法的线程就会立刻返回。

4、Demo实例

编码步骤:

  1. 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
  2. Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
  3. 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
  4. 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
  5. 进一步得到各个 SelectionKey (有事件发生)
  6. 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
  7. 判断该Channel的事件类型,对不同事件进行不同的业务处理

4.1、NIO入门案例:实现服务器和客户端的简单通讯

public class NioTest {
   
    private static void handleInput(Selector selector, SelectionKey selectionKey) throws IOException {
   
        if (selectionKey.isValid()) {
   
            // 根据key,对应通道发生的事件做相应处理
            // 如果是连接事件(OP_ACCEPT)
            if (selectionKey.isAcceptable()) {
   
                //通过事件(SelectionKey) 反向获取通道
                ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
                //响应客户端的连接 并获取连接客户端的通道
                SocketChannel socketChannel = channel.accept();
                //设置为非阻塞状态
                socketChannel.configureBlocking(false);
                //给客户端的通道注册一个读事件 同时给socketChannel关联一个缓存区(可选)
                socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                System.out.println("获取到一个客户端连接 已经成功注册到selector");
            }
            //如果是读事件(OP_READ)
            if (selectionKey.isReadable()) {
   
                //通过事件(selectionKey) 反向获取对应的管道(Channel)
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                //获取注册事件时候 socketChannel关联的缓冲区
                ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
                //把管道的东西读进缓冲区
                int readByte = socketChannel.read(byteBuffer);
                if (readByte > 0) {
   
                    //缓冲区切换为写模式
                    byteBuffer.flip();
                    System.out.println("来自客户端的消息:" + new String(byteBuffer.array()));
                } else if (readByte < 0) {
   
                    //对端链路关闭(传输中断)
                    selectionKey.cancel();
                    socketChannel.close();
                } else {
   

                }
            }
        }
    }

    @Test
    public void Server() throws IOException {
   
        //创建ServerSocketChannel -> ServerSocket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //得到一个Selector对象
        Selector selector = Selector.open();
        //绑定一个端口6666
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        //设置非阻塞
        serverSocketChannel.configureBlocking(false);

        //把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接
        SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println();
        //循环等待客户端连接
        while (true) {
   
            //等待1秒,如果没有事件发生,就返回
            if (selector.select(1000) == 0) {
   
                System.out.println("服务器等待了1秒,无连接");
                continue;
            }
            //如果返回的 > 0,表示已经获取到关注的事件
            // 就获取到相关的 selectionKey 集合,反向获取通道
            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            //遍历 Set<SelectionKey>,使用迭代器遍历
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()) {
   
                if (selector.select(1000) == 0) {
   
                    System.out.println("服务端等待一秒,没有客户端进行连接");
                    continue;
                }
                // 返回关注事件触发的集合
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                SelectionKey selectionKey = null;
                while (keyIterator.hasNext()) {
   
                    //通过迭代器遍历set 获取其中存放的事件(SelectionKey)
                    selectionKey = keyIterator.next();
                    //获取到SelectionKey后 从原set中手动删除事件(SelectionKey)防止重复触发
                    keyIterator.remove();
                    try {
   
                        //处理事件
                        handleInput(selector, selectionKey);
                    } catch (IOException e) {
   
                        if (selectionKey != null) {
   
                            selectionKey.cancel();
                            System.out.println("客户端关闭了");
                            if (selectionKey.channel() != null)
                                selectionKey.channel().close();
                        }
                    }
                }
            }
        }
    }

    @Test
    public void Client() throws IOException {
   
        //得到一个网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //提供服务器端的IP和端口
        InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 6666);
        //连接服务器
        if (!socketChannel.connect(socketAddress)) {
    //如果不成功
            while (!socketChannel.finishConnect()) {
   
                System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作。。。");
            }
        }

        //如果连接成功,就发送数据
        String str = "hello, 尚硅谷";
        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
        //发送数据,实际上就是将buffer数据写入到channel
        socketChannel.write(byteBuffer);
        System.in.read();
    }
}

4.2、群聊系统Demo

客户端

package com.itcode.nio.groupchat;


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

/** * @author 夏天 * @date 2020年11月04日 13:49 */
public class GroupChatClient {
   

    //服务器ip
    private final String HOST = "127.0.0.1";
    private final int PORT = 6667;
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;
    private boolean stop = false;

    public GroupChatClient() throws IOException {
   
        selector = Selector.open();
        socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
        username = socketChannel.getLocalAddress().toString();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    /** * 向服务器发送消息 */
    public void sendInfo(String info) throws IOException {
   
        info = username + "说: " + info;
        //把消息发送到服务端
        socketChannel.write(ByteBuffer.wrap(info.getBytes()));
    }

    /** * 读取从服务器端回复的消息 */
    public void readInfo() {
   
        try {
   
            while (selector.select(3000) == 0) {
   
            }
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while (keyIterator.hasNext()) {
   
                SelectionKey key = keyIterator.next();
                //删除事件 避免重复
                keyIterator.remove();
                //读事件
                if (key.isReadable()) {
   
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int len = socketChannel.read(byteBuffer);
                    if (len > 0) {
   
                        System.out.println(new String(byteBuffer.array()));
                    }
                }
            }
        } catch (IOException e) {
   
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
   
        //启动客户端
        GroupChatClient chatClient = new GroupChatClient();
        //启动一个线程用于读取服务器的消息
        new Thread(() -> {
   
            while (true) {
   
                chatClient.readInfo();
            }
        }).start();

        //主线程用于发送数据给服务器端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
   
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        }
    }
}

服务端

package com.itcode.nio.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/** * @author 夏天 * @date 2020年11月04日 12:51 */
public class GroupChatServer {
   
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;

    public GroupChatServer() {
   
        try {
   
            //得到选择器
            selector = Selector.open();
            //ServerSocketChannel
            listenChannel = ServerSocketChannel.open();
            //绑定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            //设置非阻塞模式
            listenChannel.configureBlocking(false);
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
   
            e.printStackTrace();
        }
    }

    public void listen() {
   
        try {
   
            while (true) {
   
                while (selector.select(2000) == 0) {
   
                    System.out.println("等待");
                }
                //遍历得到SelectionKey 集合
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
   
                    SelectionKey key = iterator.next();
                    //手动从集合中删除事件 防止重复触发
                    iterator.remove();
                    if (key.isValid()) {
   
                        //监听到accept(如果有连接会产生该事件)事件
                        if (key.isAcceptable()) {
   
                            //进行连接
                            SocketChannel socketChannel = listenChannel.accept();
                            socketChannel.configureBlocking(false);
                            //把连接的SocketChannel的读事件注册到selector
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            System.out.println(socketChannel.getRemoteAddress() + " 上线");
                        }
                        //监听读事件 (如果某个注册的通道是可读状态触发读事件)
                        if (key.isReadable()) {
   
                            //处理读
                            readData(key);
                        }
                    }
                }
            }
        } catch (Throwable e) {
   
            e.printStackTrace();
        }
    }

    private void readData(SelectionKey key) throws IOException {
   
        SocketChannel channel = null;
        //得到channel
        try {
   
            channel = (SocketChannel) key.channel();
            //创建缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            //根据count值做处理
            if (count > 0) {
   
                //把缓冲区数据转成字符串
                String msg = new String(buffer.array());
                //输出该消息
                System.out.println("form 客户端" + msg);
                //向其他的客户端转发消息,不需要向当前客户端转发消息
                sendInfoToOtherClients(msg, channel);
            } else if (count < 0) {
   
                // 对端链路关闭
                System.out.println(channel.getRemoteAddress() + "离线了");
                key.cancel();
                channel.close();
            } else {
   

            }
        } catch (Exception e) {
   
            if (channel != null) {
   
                System.out.println(channel.getRemoteAddress() + "离线了");
            }
            //取消注册
            key.channel();
            key.channel().close();
        }
    }

    /** * 向其他的客户端转发消息,不需要向当前客户端转发消息 * * @param msg 消息 * @param self 发送消息的客户端 */
    private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
   
        System.out.println("服务器转发消息中...");
        //遍历注册的所有Channel
        for (SelectionKey key : selector.keys()) {
   
            //通过key取出对应的SocketChannel
            Channel targetChannel = key.channel();
            //是SocketChannel实现类 不是发送消息的客户端
            if (targetChannel instanceof SocketChannel && !targetChannel.equals(self)) {
   
                SocketChannel dest = (SocketChannel) targetChannel;
                //将msg存储到buffer
                ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
                //将buffer的数据写入通道
                dest.write(byteBuffer);
            }
        }
    }

    public static void main(String[] args) {
   
        new GroupChatServer().listen();
    }
}

注意事项:

  • 使用 int read = channel.read(buffer)

    读取数据时,读取的结果情况:

    • read=-1时,说明客户端的数据发送完毕,并且主动的关闭socket。所以这种情况下,服务器程序需要关闭socketSocket,并且取消key的注册。注意:这个时候继续使用SocketChannel进行读操作的话,就会抛出:远程主机强迫关闭一个现有的连接的IO异常

    • read=0时:

      • 某一时刻SocketChannel中当前没有数据可读。
      • 客户端的数据发送完毕。
  • socketChannel.connect()会导致 socketChannel.getLocalAddress()为null

5、NIO的零拷贝

零拷贝是网络编程的关键,很多性能优化都离不开它。零拷贝是指:从操作系统的角度来看,文件的传输不存在CPU的拷贝,只存在DMA拷贝。在Java程序中,常用的零拷贝有 mmap(内存映射)和 sendFile。

零拷贝不仅仅带来更少的数据复制,还能减少线程的上下文切换,减少CPU缓存伪共享以及无CPU校验和计算。

传统IO的读写:

File file = new File("test.txt");
RandomAccessFile raf = new RandomAccessFile(file, "rw");

byte[] arr = new byte[(int) file.length()];
raf.read(arr);

Socket socket = new ServerSocket(8080).accept();
socket.getOutputStream().write(arr);

mmap优化的IO读写:

  • mmap通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内存空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数。

  • 需要进行4次上下文切换,3次数据拷贝。

  • 适合小数据量的读写。

    sendFile优化的IO读写:

  • Linux2.1 版本提供了 sendFile 函数,其基本原理如下:数据根本不经过用户态,直接从内核缓冲区进入到SocketBuffer,同时,由于和用户态完全无关,就减少了一次上下文切换。

  • 需要3次上下文切换和最少2此数据拷贝。

  • 适合大文件的传输。

  • 而 Linux 在 2.4 版本中,做了一些修改,避免了从内核缓冲区拷贝到 Socket Buffer 的操作,直接拷贝到协议栈,从而再一次减少了数据拷贝。
  • 注:这里其实有一次CPU拷贝,kernel buffer -> socket buffer。但是,拷贝的信息很少,只拷贝了数据的长度、偏移量等关键信息,消耗低,可以忽略不计。

NIO中的零拷贝(transferTo):

  • transferTo 以当前channel为标准 拷贝的目标管道
  • transferForm 和transferTo 类似:把目标管道拷贝的当前管道中
  • 在linux下transferTo 方法就可以完成传输
  • 在windows下调用transferTo 只能发送8m就需要分段传输文件
    算法 : 用文件大小除8 并向上取整 为发送次数
public static void main(String[] args) throws IOException {
   
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.connect(new InetSocketAddress("localhost", 7001));
    //得到一个文件CHANNEl
    FileChannel channel = new FileInputStream("a.zip").getChannel();

    //准备发送
    long startTime = System.currentTimeMillis();

    //在Linux下一个 transferTo 方法就可以完成传输
    //在windows 下一次调用 transferTo 只能发送 8M,就需要分段传输文件
    //传输时的位置
    //transferTo 底层使用到零拷贝
    long transferCount = channel.transferTo(0, channel.size(), socketChannel);

    System.out.println("发送的总的字节数:" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));
    channel.close();
}

6、BIO、NIO、AIO对比