Netty 基础 java NIO

Selector

三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键

本质上,Selector是***,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。

Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗? 可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次

SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件

使用 NIO 实现 server与 clinet 通信(没有使用 selector的)

服务端

public static void main(String[] args) throws Exception {
     
//创建服务端通道,用 open 获取
        ServerSocketChannel ServerChannel = ServerSocketChannel.open();
        //设置 ip 和端口号
        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
        //绑定到 服务通道 的 socket
        ServerChannel.socket().bind(address);

        //等待客户端连接
        SocketChannel socketChannel = ServerChannel.accept();

        // 处理数据 通过 buffer 来
        ByteBuffer writebuffer = ByteBuffer.allocate(128);
        writebuffer.put("hello client i am server".getBytes());
        writebuffer.flip();
        socketChannel.write(writebuffer);

        // 读取客户端的数据
        ByteBuffer readbuffer = ByteBuffer.allocate(128);
        StringBuffer stringBuffer = new StringBuffer();
        socketChannel.read(readbuffer);
        readbuffer.flip();
        while (readbuffer.hasRemaining()) {
   
            stringBuffer.append((char) readbuffer.get());
        }
        System.out.println("client data :" + stringBuffer.toString());

        socketChannel.close();
        ServerChannel.close();
}

客户端

  public static void main(String[] args) throws Exception {
   
        //开启一个 socket 通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置 ip 和端口号
        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
        //连接 这个 address
        socketChannel.connect(address);

        // 先写后读
        // 处理数据 通过 buffer 来
        ByteBuffer writebuffer = ByteBuffer.allocate(128);
        writebuffer.put("hello server i am client ".getBytes());
        writebuffer.flip();
        socketChannel.write(writebuffer);

        // 读取客户端的数据
        ByteBuffer readbuffer = ByteBuffer.allocate(128);
        StringBuffer stringBuffer = new StringBuffer();
        socketChannel.read(readbuffer);
        readbuffer.flip();
        while (readbuffer.hasRemaining()) {
   
            stringBuffer.append((char) readbuffer.get());
        }
        System.out.println("server data :" + stringBuffer.toString());
        socketChannel.close();
    }

这里我们并没有使用 selector 接下来我们编写一个 Nio selector server 来对比学习

Nio selector server

SelectionKey 中我们常用判断的几种操作类型

  • isAcceptable() : 连接
  • isConnectable() : 就绪
  • isReadable() : 读取
  • isWritable() : 写入

代码执行和编写细节 : 见注释

public static void main(String[] args) throws Exception {
   
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置 ip 和端口号
        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
        //绑定到 服务通道 的 socket
        serverSocketChannel.socket().bind(address);

        // 将这个 channel 设置成非阻塞的
        serverSocketChannel.configureBlocking(false);
        // 打开一个选择器
        Selector selector = Selector.open();
        // 将通道注册到选择其中 声明选择器监听事件
        // 通常来说,我们监听的是 连接事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        /* * 通过选择器来管理通道 * 需要感知, 被监听的通道 有没有事件触发 * 当 select 方法 返回值 >0 的时候就代表 当前有多少个操作要处理 * 所以我们需要一直轮询 它是否有时间要处理 * */
        while (true) {
   
            int ready = selector.select();
            if (ready == 0) {
   
                continue;
            }
            // 通过 Selected keys 获取到操作集合
            Set<SelectionKey> set = selector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();
            while (iterator.hasNext()) {
   
                SelectionKey key = iterator.next();
                // 为了 避免重复操作 我们处理一个就移出一个
                iterator.remove();
                //我们通过 key 中的方法 来判断要处理什么操作
                /* * isAcceptable() : 连接 * isConnectable() : 就绪 * isReadable() : 读取 * isWritable() : 写入 * */
                if (key.isAcceptable()) {
   
                    //处理 accpt 事件
                    //获得客户端连接 并且注册 写事件
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    // 处理完 连接状态之后 后续将写操作加入到选择其中
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                } else if (key.isWritable()) {
   
                    // 我们处理完连接之后,可以用key来获得对应的事件通道
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    //处理 write 事件
                    ByteBuffer writebuffer = ByteBuffer.allocate(128);
                    writebuffer.put("hello client i am server from 4321".getBytes());
                    writebuffer.flip();
                    socketChannel.write(writebuffer);
                    //我们可以通过 key 来注册接下来发生在这个通道的事件
                    key.interestOps(SelectionKey.OP_READ);
                } else if (key.isReadable()) {
   
                    //处理 read 事件
                    SocketChannel socketChannel = (SocketChannel) key.channel();

                    // 读取客户端的数据
                    ByteBuffer readbuffer = ByteBuffer.allocate(128);
                    //读取数据
                    int read = socketChannel.read(readbuffer);
                    //read = -1 代表着已经读完了
                    if (read == -1) {
   
                        key.cancel();
                    }
                    //刷新 buffer
                    readbuffer.flip();
                    //用string buffer 来拼接读取的数据
                    StringBuffer stringBuffer = new StringBuffer();
                    while (readbuffer.hasRemaining()) {
   
                        stringBuffer.append((char) readbuffer.get());
                    }
                    System.out.println("client data :" + stringBuffer.toString());


                } else if (key.isConnectable()) {
   

                }
            }

        }

    }


【使用方式】

a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set<SelectionKey> , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ