四、Netty核心组成

boss Group整体图

Worker Group整体图

整体关联图

1、Bootstrap、ServerBootstrap

  • Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,NettyBootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类

常见方法:

方法名称 方法介绍
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 该方法用于服务器端,用来设置两个EventLoop
public B group(EventLoopGroup group) 该方法用于客户端,用来设置一个EventLoop
public B channel(Class channelClass) 该方法用来设置一个服务器端的通道实现
public B option(ChannelOption option, T value) 用来给 ServerChannel 添加配置
public ServerBootstrap childOption(ChannelOption childOption, T value) 用来给接收到的通道添加配置
public ServerBootstrap childHandler(ChannelHandler childHandler) 该方法用来设置业务处理类(自定义的 handler)
public ChannelFuture bind(int inetPort) 该方法用于服务器端,用来设置占用的端口号
public ChannelFuture connect(String inetHost, int inetPort) 该方法用于客户端,用来连接服务器

2、Future、ChannelFuture

  • Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 FutureChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

常见的方法有

方法名 方法介绍
Channel channel() 返回当前正在进行 IO 操作的通道
ChannelFuture sync() 等待异步操作执行完毕,相当于将阻塞在当前。

3、Channel

  • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  • 通过Channel 可获得当前网络连接的通道的状态
  • 通过Channel 可获得 网络连接的配置参数 (例如接收缓冲区大小)
  • Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成
  • 调用立即返回一个 ChannelFuture 实例,通过注册***到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方
  • 支持关联 I/O 操作与对应的处理程序
  • 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应

常用的 Channel 类型

名称 介绍
NioSocketChannel 异步的客户端 TCP Socket 连接。
NioServerSocketChannel 异步的服务器端 TCP Socket 连接
NioDatagramChannel 异步的 UDP 连接。
NioSctpChannel 异步的客户端 Sctp 连接。
NioSctpServerChannel 异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

4、Selector

  • Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
  • 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel
  • 同时,Netty中对selector中的selectedKey集合进行了替换,它替换成了一个它自己实现的一个set集合,这样效率更高。

5、ChannelHandler及其实现类

  • ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
  • ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
  • 我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
   

    //通道注册事件
    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
   
        ctx.fireChannelRegistered();
    }

    //通道取消注册事件
    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
   
        ctx.fireChannelUnregistered();
    }

    //通道就绪事件
    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
   
        ctx.fireChannelActive();
    }

    /** * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */
    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
   
        ctx.fireChannelInactive();
    }

    //通道读取数据事件
    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
        ctx.fireChannelRead(msg);
    }

    //通道数据读取完毕事件
    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   
        ctx.fireChannelReadComplete();
    }

    /** * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */
    @Skip
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
   
        ctx.fireUserEventTriggered(evt);
    }

    /** * Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */
    @Skip
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
   
        ctx.fireChannelWritabilityChanged();
    }

    //通道发生异常事件
    @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
   
        ctx.fireExceptionCaught(cause);
    }
}

  • ChannelInboundHandler 用于处理入站 I/O 事件。
  • ChannelOutboundHandler 用于处理出站 I/O 操作。

适配器

  • ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
  • ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
  • ChannelDuplexHandler 用于处理入站和出站事件。

6、Pipeline和ChannelPipeline

  • ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解:ChannelPipeline 是 保存 ChannelHandlerList,用于处理或拦截 Channel 的入站事件和出站操作)

  • ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互

  • Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下

  • 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler

  • 入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰

注意: serverBootstrap.childHandler(new TestServerInitializer()); 每个客户端连接有一个独立的channel 也就说不同连接的 ChannelHandler 和ChannelHandlerContext(上下文)是独立不共享的

常用方法:

方法名 介绍
ChannelPipeline addFirst(ChannelHandler... handlers) 把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addLast(ChannelHandler… handlers) 把一个业务处理类(handler)添加到链中的最后一个位置

7、ChannelHandlerContext

  • 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
  • ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同 时ChannelHandlerContext 中也绑定了对应的 pipelineChannel 的信息,方便对 ChannelHandler进行调用.

常用方法:

方法名 介绍
ChannelFuture close() 关闭通道
ChannelOutboundInvoker flush() 刷新
ChannelFuture writeAndFlush(Object msg) 将 数 据 写 到 ChannelPipeline 中 当 前ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

8、ChannelOption

  • Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。

ChannelOption 参数如下:

  • ChannelOption.SO_BACKLOG
    • 对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。
    • 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定了队列的大小。
  • ChannelOption.SO_KEEPALIVE
    • 一直保持连接活动状态

9、EventLoopGroup和其实现类NioEventLoopGroup

  • EventLoopGroup是一组 EventLoop的抽象,Netty为了更好的利用多核 CPU资源,一般会有多个 EventLoop同时工作,每个 EventLoop维护着一个 Selector 实例。
  • EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和WorkerEventLoopGroup。
  • 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理,如下图所示
  • BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例BossEventLoop 不断轮询 Selector 将连接事件分离出来
  • 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup
  • WorkerEventLoopGroup 会由 next 选择其中一个 EventLoop来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理

常用方法:

方法名 介绍
public NioEventLoopGroup() 构造方法
public Future shutdownGracefully() 断开连接,关闭线程

10、Unpooled

  • Netty 提供一个专门用来操作缓冲区(即Netty的数据容器)的工具类

常用方法

方法名 介绍
public static ByteBuf copiedBuffer(CharSequence string, Charset charset) 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 但有区别)

代码示例

//创建一个ByteBuf
//1、创建对象,该对象包含一个数组,是一个byte[10]
//2、在netty的buffer中,写入数据后再读取数据不需要使用 flip 进行反转
// 底层维护了 readerIndex 和 writeIndex
//往buffer中写的范围为 [writeIndex, capacity)
//往buffer中可读的范围为 [readerIndex, writeIndex)。使用 buf.readByte() 会往后移动 readerIndex 指针,使用 buf.getByte(i) 通过索引获取就不会移动该指针
ByteBuf byteBuf = Unpooled.buffer(10);
for (int i = 0; i < 10; i++) {
   
    byteBuf.writeByte(i);
}
//获取该buf的大小
int capacity = byteBuf.capacity();
//输出
for (int i = 0; i < byteBuf.capacity(); i++) {
   
    System.out.println(byteBuf.getByte(i));
    System.out.println(byteBuf.readByte());
}
byte[] content = byteBuf.array();
//将content转成字符串
String c = new String(content, StandardCharsets.UTF_8);
//数组偏移量
int offset = byteBuf.arrayOffset();
//获取读取偏移量
int readerIndex = byteBuf.readerIndex();
//获取写偏移量
int writerIndex = byteBuf.writerIndex();
//获取容量
int capacity = byteBuf.capacity();
//获取可读取的字节数
int readableBytes = byteBuf.readableBytes();
//通过索引获取某个位置的字节
byte aByte = byteBuf.getByte(0);
//获取Buf中某个范围的字符序列
CharSequence charSequence = byteBuf.getCharSequence(0, 4, StandardCharsets.UTF_8);

11、Netty群聊系统

实例要求:

  • 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  • 实现多人群聊
  • 服务器端:可以监测用户上线,离线,并实现消息转发功能
  • 客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
  • 目的:进一步理解Netty非阻塞网络编程机制

服务器端代码:

/*************启动类**************/
public class GroupChatServer {
   
    private int port; //监听端口
    public GroupChatServer(int port){
   
        this.port = port;
    }
    //编写run 方法,处理客户端的请求
    public void run() throws InterruptedException {
   
        //创建两个线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
   
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
   
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
   
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入解码器
                            pipeline.addLast("encoder", new StringEncoder()); //加入编码器
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });
            System.out.println("netty服务器启动");
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

            //监听关闭事件
            channelFuture.channel().closeFuture().sync();
        } finally {
   
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws InterruptedException {
   
        GroupChatServer groupChatServer = new GroupChatServer(7000);
        groupChatServer.run();
    }
}

/***********************Handler**********************/
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
   

    //定义一个Channel组,管理所有的channel
    //GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //此方法表示连接建立,一旦建立连接,就第一个被执行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
   
        Channel channel = ctx.channel();
        //该方***将 channelGroup 中所有 channel 遍历,并发送消息,而不需要我们自己去遍历
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + sdf.format(new Date()) + "加入聊天\n");
        //将当前的Channel加入到 ChannelGroup
        channelGroup.add(channel);
    }

    //表示 channel 处于活动状态,提示 xxx 上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
   
        System.out.println(ctx.channel().remoteAddress() + " " + sdf.format(new Date()) + "上线了~");
    }

    //表示 channel 处于不活动状态,提示 xxx 离线
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
   
        System.out.println(ctx.channel().remoteAddress() + " " + sdf.format(new Date()) + "离线了~");
    }

    //表示 channel 断开连接,将xx客户离开信息推送给当前在线客户
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
   
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() +" "+ sdf.format(new Date()) + "离开了\n");
        System.out.println("当前channelGroup大小 :" + channelGroup.size());
    }

    //读取数据,并进行消息转发
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
   
        //获取当前channel
        Channel channel = ctx.channel();
        //这时,遍历channelGroup,根据不同的情况,回送不同的消息
        channelGroup.forEach(item -> {
   
            if (item != channel) {
   
                item.writeAndFlush("[客户]" + channel.remoteAddress() + "发送了消息:" + msg + "\n");
            } else {
    //把自己发送的消息发送给自己
                item.writeAndFlush("[自己]发送了消息:" + msg + "\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
   
        ctx.close();
    }
}

客户端代码:

/*********************启动类******************/
public class GroupChatClient {
   

    //属性
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
   
        this.port = port;
        this.host = host;
    }

    public void run() throws InterruptedException {
   
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();

        try {
   
            Bootstrap bootstrap = new Bootstrap()
                    .group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
   
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
   
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入Handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("--------" + channel.localAddress() + "---------");
            //客户端需要输入信息,创建一个扫描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
   
                String msg = scanner.nextLine();
                //通过channel发送到服务器端
                channel.writeAndFlush(msg + "\r\n");
            }
        } finally {
   
            eventExecutors.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
   
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}

/*****************Handler*****************/
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
   
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
   
        System.out.println(msg.trim());
    }
}

私聊实现
源码地址:gitee仓库
方式一:
public static Map<User, Channel> channels2 = new HashMap<User, Channel>();

  1. 服务器端存放所有连接的用户信息和用户对应管道映射
  2. 服务器再转发消息的时候直接把私聊的消息转发给指定用户对应的管道

方式二: 根据端口号发送消息
public static Map<String, Channel> channels = new HashMap<String, Channel>();
1. 根据端口号找到对应的客户端channel
2. channels.put(port, channel); 注册到当前类全局的channels 中 当用户发来消息指定端口号时候转发给指定channel

12、Netty心跳检测机制案例

实例要求:

  • 编写一个 Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲
  • 当服务器超过5秒没有写操作时,就提示写空闲
  • 实现当服务器超过7秒没有读或者写操作时,就提示读写空闲

启动类:

public static void main(String[] args) throws InterruptedException {
   
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
   
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))  //为BossGroup中的请求添加日志处理Handler
            .childHandler(new ChannelInitializer<SocketChannel>() {
   
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
   
                    ChannelPipeline pipeline = ch.pipeline();
                    //加入一个 netty 提供的 IdleStateHandler
                    /** * 1、IdleStateHandler 是 netty 提供的检测空闲状态的处理器 * 2、long readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包检测是否还是连接的状态 * 3、long writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包检测是否还是连接的状态 * 4、long allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包检测是否还是连接的状态 * 5、当 IdleStateEvent 触发后,就会传递给管道的下一个 Handler,通过调用(触发)下一个Handler的 userEventTriggered,在该方法区处理这个事件。 */
                    pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));

                    //加入一个对空闲检测进一步处理的Handler(自定义)
                    pipeline.addLast(new MyServerHandler());
                }
            });
        //启动服务器,设置为同步模式。
        ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
        channelFuture.channel().closeFuture().sync();
    } finally {
   
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

Handler:

public class MyServerHandler extends ChannelInboundHandlerAdapter {
   

    /** * @param ctx 上下文 * @param evt 事件 * @throws Exception */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
   
        if (evt instanceof IdleStateEvent) {
   
            //将 evt 向下转型 IdleStateEvent
            IdleStateEvent event = (IdleStateEvent)evt;
            String eventTye = null;
            switch (event.state()) {
   
                case READER_IDLE:
                    eventTye = "读空闲";
                    break;
                case WRITER_IDLE:
                    eventTye = "写空闲";
                    break;
                case ALL_IDLE:
                    eventTye = "读写空闲";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() +"---超时时间--" + eventTye);
            System.out.println("服务器做相应处理。。");
        }
    }
}

13、Netty建立Websocket连接

实例要求:

  • Http协议是无状态的, 浏览器和服务器间的请求响应一次,下一次会重新创建连接.
  • 要求:实现基于webSocket的长连接的全双工的交互
  • 改变Http协议多次请求的约束,实现长连接了, 服务器可以发送消息给浏览器
  • 客户端浏览器和服务器端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知

启动代码:

public static void main(String[] args) throws InterruptedException {
   
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    try{
   
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
   
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
   
                    ChannelPipeline pipeline = ch.pipeline();
                    //因为是基于Http协议,所以要使用Http的编码和解码器
                    pipeline.addLast(new HttpServerCodec());
                    //是以块方式写,添加ChunkedWriter处理器
                    pipeline.addLast(new ChunkedWriteHandler());
                    /** * 1、http数据在传输过程中是分裂的,HttpObjectAggregator就可以将多个段聚合 * 2、这就是为什么,当浏览器发送大量数据时,就会发出多次http请求 */
                    pipeline.addLast(new HttpObjectAggregator(8192));

                    /** * 1、对于websocket,它的数据是以帧的形式传递的 * 2、可以看到 WebsocketFrame 下面有六个子类 * 3、浏览器请求时:ws://localhost:7000/hello 表示请求的uri * 4、WebSocketServerProtocolHandler 核心功能是将 http 协议升级为 ws 协议,保持长连接 * 5、从Http协议升级到Websocket协议,是通过StatusCode 101(Switching Protocols)来切换的。 */
                    pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));

                    //自定义Handler,处理业务逻辑
                    pipeline.addLast(new MyTextWebSocketFrameHandler());
                }
            });
        ChannelFuture sync = serverBootstrap.bind(7000).sync();
        sync.channel().closeFuture().sync();
    } finally {
   
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

handle处理器

/** * @author 夏天 * @date 2020年11月05日 18:01 * 这里TextWebSocketFrame类型标识一个文本帧(frame) */
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
   
    /** * 有消息过来了触发该事件 */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
   
        System.out.println("服务器收到消息" + msg.text());
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + msg.text()));
    }

    /** * 当web客户端连接后触发方法 */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
   
        //id 标识唯一的值 LongText是唯一的 ShortText不是唯一的有可能重复
        System.out.println("handlerAdder被调用" + ctx.channel().id().asLongText());
        System.out.println("handlerAdder被调用" + ctx.channel().id().asShortText());
    }

    /** * 断开连接触发该事件 */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
   
        System.out.println("handlerRemoved被调用" + ctx.channel().id().asLongText());
    }

    /** * 发生异常触发该事件 */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
   
        System.out.println("异常发生" + cause.getMessage());
        //关闭通道
        ctx.close();
    }
}

页面

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script> var socket //判断当前浏览器是否支持webSocket if (window.WebSocket) {
     //go on socket = new WebSocket('ws://localhost:7000/hello') //相当于channelReado ev 收到服务端回送的消息 socket.onmessage = function (ev) {
     let rt = document.getElementById('responseText') rt.value = rt.value + '\n' + ev.data } //相当于连接开启(感知到连接开启) socket.onopen = function (ev) {
     let rt = document.getElementById('responseText') rt.value = rt.value + '\n' + '连接开启了' } //相当于连接关闭(感知到连接关闭) socket.onclose = function (ev) {
     let rt = document.getElementById('responseText') rt.value = rt.value + '\n' + '连接关闭了' } } else {
     alert('当前浏览器不支持WebSocket') } //发送下消息到服务器 function send(message) {
     //先判断socket是否创建好 if (!window.socket) {
     return } //连接已经开启 if (socket.readyState == WebSocket.OPEN) {
     //通过socket 发送消息 socket.send(message) } else alert('连接没有开启') } </script>
<form onsubmit="return false">
    <textarea name="message" style="height: 300px;width: 300px;"></textarea>
    <input type="button" value="发送消息" onclick="send(this.form.message.value)">
    <textarea id="responseText" style="height: 300px;width: 300px;"></textarea>
    <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>

14、案例简单总结

创建启动类:

  • 首先初始化两个NioEventLoopGroup。其中BossGroup一般设置线程为1
  • 初始化一个ServerBootStrap类。并调用它设置很多参数。
    • group():服务端设置两个Group,客户端设置一个Group
    • chnnel():服务端传入NioServerSocketChannel,客户端传入NioSocketChannel
    • option():服务端给BossGroup设置SO_BACKLOG任务队列大小
    • childOption():服务端给WorkerGroup设置连接SO_KEEPALIVE保持连接状态
    • handler():服务端给BossGroup设置Handler,客户端设置Handler
    • childHandler():服务端给WorkerGroup设置Handler。
  • 通过BootStrap去绑定端口,监听关闭事件。设置为同步

Handler:

  • SimpleChannelInboundHandler
    • 可以继承它来处理很多通信。经过上面几个案例推敲,一般写自己的Handler继承它就可以了
  • ChannelInboundHandlerAdapter
    • 这个是上一个的父类,我们在心跳检测的时候通过继承它的userEventTriggered去判断连接状态
    • 其实通过上面那个simple也可以继承这个trigger
  • IdleStateHandler
    • 在心跳检测时我们要通过这个Handler去触发上面的trigger
  • HttpServerCodec
    • 提供好的用于Http编码解码,一般用于Http请求
  • ChunkedWriteHandler
    • 提供好的Handler,以块方式写,添加ChunkedWriter处理器
    • 我搜了一下,它一般用于发送大文件。这个东西使我们在Websocket的时候用的。
  • HttpObjectAggregator
    • 它会将http数据聚合在一起发送
  • WebSocketServerProtocolHandler
    • 传入ws路径,将Http协议升级成为ws协议

Netty中通信数据实体:

  • TextWebSocketFrame
    • 这是我们在websocket连接的时候用的,它表示一个文本帧,是websocket进行通信的数据形式
  • HttpObject
    • 这是我们在建立Http连接的时候用到的,可以将它转换成一个HttpRequest

Hander常用方法:

方法名 介绍
channelRead0(ChannelHandlerContext channelHandlerContext, T t) 读取数据,并进行消息转发
handlerAdded(ChannelHandlerContext ctx) 连接建立,一旦建立连接,就第一个被执行
channelActive(ChannelHandlerContext ctx) 表示 channel 处于活动状态,提示 xxx 上线
channelInactive(ChannelHandlerContext ctx) 表示 channel 处于不活动状态,提示 xxx 离线
handlerRemoved(ChannelHandlerContext ctx) 表示 channel 断开连接,将xx客户离开信息推送给当前在线客户
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 出现错误如何进行处理
userEventTriggered(ChannelHandlerContext ctx, Object evt) 事件触发器,通过判断evt的类型去判断发生了什么事件,再通过里面的属性判断事件发生的类型。我们在IdleStateHandler后面加上一个触发器,可以检测心跳。

原文链接 https://blog.csdn.net/qq_35751014/article/details/104524889
本文只是在原文的基础上添加自己笔记与理解