前言

高能预警,本文是我一个月前就开始写的,所以内容会非常长,当然也非常硬核。我也一直就想着写一下netty系列的,但是netty的源码概念又非常多,所以才写到了现在。

我相信90%的读者都不会一口气看完的,因为实在太长了,长到我现在顶配的mbp打字编辑框都是卡的,但是我希望大家日后想看netty或者在面试前需要了解的朋友回头翻一下就够了,那我写这个文章的意义也就有了。

也不多BB,直接开整。

NIO 基本概念

阻塞(Block)与非阻塞(Non-Block)

阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候。

阻塞:往往需要等待缓冲区中的数据准备好过后才处理其他的事情,否则一直等待在那里。

非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。

阻塞 IO :

非阻塞 IO :

同步(Synchronous)与异步(Asynchronous)

同步和异步都是基于应用程序和操作系统处理 IO 事件所采用的方式。比如

**同步:**是应用程序要直接参与 IO 读写的操作。

**异步:**所有的 IO 读写交给操作系统去处理,应用程序只需要等待通知。

同步方式在处理 IO 事件的时候,必须阻塞在某个方法上面等待我们的 IO 事件完成(阻塞 IO 事件或者通过轮询 IO事件的方式),对于异步来说,所有的 IO 读写都交给了操作系统。这个时候,我们可以去做其他的事情,并不需要去完成真正的 IO 操作,当操作完成 IO 后,会给我们的应用程序一个通知。

所以异步相比较于同步带来的直接好处就是在我们处理IO数据的时候,异步的方式我们可以把这部分等待所消耗的资源用于处理其他事务,提升我们服务自身的性能。

同步 IO :

异步 IO :

Java BIO与NIO对比

BIO(传统IO):

BIO是一个同步并阻塞的IO模式,传统的 java.io 包,它基于流模型实现,提供了我们最熟知的一些 IO 功能,比如File抽象、输入输出流等。交互方式是同步、阻塞的方式,也就是说,在读取输入流或者写入输出流时,在读、写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。

NIO(Non-blocking/New I/O)

NIO 是一种同步非阻塞的 I/O 模型,于 Java 1.4 中引入,对应 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发

BIO与NIO的对比

IO模型

BIO

NIO

通信

面向流

面向缓冲

处理

阻塞 IO

非阻塞 IO

触发

选择器

NIO 的 Server 通信的简单模型:

BIO 的 Server 通信的简单模型:

NIO的特点:

  1. 一个线程可以处理多个通道,减少线程创建数量;
  2. 读写非阻塞,节约资源:没有可读/可写数据时,不会发生阻塞导致线程资源的浪费

Reactor 模型

单线程的 Reactor 模型

多线程的 Reactor 模型

多线程主从 Reactor 模型

Netty 基础概念

Netty 简介

Netty 是一个 NIO 客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器。

“快速简便”并不意味着最终的应用程序将遭受可维护性或性能问题的困扰。Netty 经过精心设计,结合了许多协议(例如FTP,SMTP,HTTP 以及各种基于二进制和文本的旧式协议)的实施经验。结果,Netty 成功地找到了一种无需妥协即可轻松实现开发,性能,稳定性和灵活性的方法。

Netty 执行流程

Netty 核心组件

Channel

​ Channel是 Java NIO 的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。

EventLoop 与 EventLoopGroup

​ EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。

​ EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。

​ Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。

​ 一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。

ServerBootstrap 与 Bootstrap

​ Bootstarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。

​ Bootstrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。

​ ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。

ChannelHandler 与 ChannelPipeline

​ ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。

ChannelFuture

​ Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener() 方法为该异步操作添加监 NIO 网络编程框架 Netty 听器,为其注册回调:当结果出来后马上调用执行。

​ Netty 的异步编程模型都是建立在 Future 与回调概念之上的。

Netty 源码阅读

源码阅读,最好可以再 Debug 的情况下进行,这样更容易帮助理解,因此在分析 Netty 前的我准备一个客户端和服务端的代码。

Netty - Server 代码

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeSocketServerHandler());
                         }
                    });

            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动。。。");

            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
复制代码

Server 端 Handler:

public class DemoSocketServerHandler
                       extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
        ctx.fireChannelActive();
        TimeUnit.MILLISECONDS.sleep(500);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码

Netty - Client 代码

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new DemoSocketClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            if(eventLoopGroup != null) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
}
复制代码

Client 端 Handler :

public class DemoSocketClientHandler
               extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println(msg);
        ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
        TimeUnit.MILLISECONDS.sleep(5000);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx)
            throws Exception {
        ctx.channel().writeAndFlush("from client:begin talking");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码

NioEventLoopGroup 初始化分析

首先根据 Server 服务端代码,分析 NioEventLoopGroup 的初始化过程。而在分析 NioEventLoopGroup 之前,有必要简单的说一说 NioEventLoopGroup 与 NioEventLoop ,方便后续源码的理解。

NioEventLoop 源码分析前了解

NioEventLoop 的继承体系

从 NioEventLoop 的继承体系中可以看到,NioEventLoop 本身就是一个 Executor,并且还是一个 单线程的 Executor。Executor 必然拥有一个 execute(Runnable command) 的实现方法,而 NioEventLoop 的 execute() 实现方法在其父类 SingleThreadEventExecutor 中,找到具体代码:

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

复制代码

这里不细说,但是贴出这段代码主要为了引出 startThread(); 这句代码,在跟这句代码会发现,它最终调用了 NioEventLoop 的一个成员 Executor 执行了当前成员的 execute() 方法。对应的成员 io.netty.util.concurrent.SingleThreadEventExecutor#executor

而 executor 成员的初始化也是在当前代码执行时创建的匿名 Executor ,也就是执行到即新建并且执行当前 匿名 executr() 方法。

总结:

  1. NioEventLoop 本身就是一个 Executor。
  2. NioEventLoop 内部封装这一个新的线程 Executor 成员。
  3. NioEventLoop 有两个 execute 方法,除了本身的 execute() 方法对应的还有成员属性 Executor 对应的 execute() 方法。

备注: 因为这里出现了四个 Executor,为了区分,我们给其新的名称:

NioEventLoop 本身 Executor:NioEventLoop

NioEventLoop 的成员 Executor:子 Executor

NioEventLoopGroup 本身 Executor :NioEventLoopGroup

NioEventLoopGroup 的构造参数 Executor :总Executor

NioEventLoopGroup 的继承体系

看到继承体系可以直接知道 NioEventLoopGroup 也是一个 Executor,并且是一个线程池的 Executor,所以他也有 execute() 方法。对应的实现再其父类之中:io.netty.util.concurrent.AbstractEventExecutorGroup#execute

而这里还需要说到的一点是:在 NioEventLoopGroup 的构造中,再其父类 MultithreadEventExecutorGroup 的构造再次引入了一个新的 Executor,

之所以这里提到这个 Executor,是因为这个 Executor 是对应的 execute() 就是在 NioEventLoop 中的成员 Executor 的 execute() 执行时调用的。也就是下面对应的代码调用。io.netty.util.internal.ThreadExecutorMap#apply(java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutor)

到这如果不明白,没关系,因为只是为了引入 NioEventLoopGroup 和 NioEventLoop 的对应的两个 Executor,和两个 Executor 对应的两个 execute() 方法。这个后面还会有详细分析。

总结:

  1. NioEventLoopGroup 是一个线程池线程 Executor。
  2. NioEventLoopGroup 也封装了一个线程 Executor。
  3. NioEventLoopGroup 也有两个 execute()方法。

NioEventLoopGroup 初始化代码分析

上面说了基本的了解内容,下面具体分析,从 NioEventLoopGroup 的初始化进入源码分析。

入口我们直接找 NioEventLoopGroup 的无参构造。

    public NioEventLoopGroup() {
        this(0);
    }
复制代码
    public NioEventLoopGroup(int nThreads) {
        // 第二个参数是这个group所包含的executor
        this(nThreads, (Executor) null);
    }
复制代码
    public NioEventLoopGroup(int nThreads, Executor executor) {
        // 第三个参数是provider,其用于提供selector及selectable的channel,
        // 这个provider是当前JVM中唯一的一个单例的provider
        this(nThreads, executor, SelectorProvider.provider());
    }
复制代码
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        // 第四个参数是一个选择策略工厂实例
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
复制代码
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
复制代码
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
复制代码
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        // 第三个参数是选择器工厂实例
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
复制代码

跟到此,可以发现无参构造的基本参数被初始化, nThreads :DEFAULT_EVENT_LOOP_THREADS//默认当前CPU逻辑核心数的两倍,selectorProvide:SelectorProvider.provider()//当前JVM中唯一的一个单例的provider,SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默认选择策略工厂实例,chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//选择器工厂实例。到这里只是基本的初始化参数,重点方法为MultithreadEventExecutorGroup 的构造方法。下面重点分析:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            // 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 创建eventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
                if (!success) {
                    // 关闭之前所有已经创建好的eventLoop
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    // 终止所有eventLoop上所执行的任务
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        // 创建一个选择器
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
复制代码

根据无参构造直接往下跟,可以看到核心部分在最后一个父类的构造里。也就是 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)。

再这里完成整个 NioEventLoopGroup 的实例初始化,这里分析下,然后再画个图回顾下。

初始化构造参数中的 Executor 参数,当其为空时,将其初始化

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
复制代码

首先 newDefaultThreadFactory()) 创建默认的线程工厂,有兴趣可以跟进去看看。然后再创建ThreadPerTaskExecutor线程 Executor 对象。(PS:这里创建的 Executor 就是 NioEventLoopGroup 内的 Executor 对象,并不是当前 NioEventLoopGroup 自身,可以称其为 总 Executor)。

然后可以看到这里创建了一个 children 数组,根据需要创建的线程数创建对应数量的数组。

children = new EventExecutor[nThreads];
复制代码

因为每个 NioEventLoopGroup 都是 NioEventLoop 的集合,所以这里的 children 数组就是当前 NioEventLoopGroup 的 NioEventLoop。所以 NioEventLoop 的创建的实在 NioEventLoopGroup 初始化的时候。下面看 NioEventLoop 的初始化:

// 逐个创建nioEventLoop实例
for (int i = 0; i < nThreads; i ++) {
    boolean success = false;
    try {
        // 创建eventLoop
        children[i] = newChild(executor, args);
        success = true;
    } catch (Exception e) {
        // TODO: Think about if this is a good exception type
        throw new IllegalStateException("failed to create a child event loop", e);
    } finally {
        // 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
        if (!success) {
            // 闭之前所有已经创建好的eventLoop
            for (int j = 0; j < i; j ++) {
                children[j].shutdownGracefully();
            }

            // 终止所有eventLoop上所执行的任务
            for (int j = 0; j < i; j ++) {
                EventExecutor e = children[j];
                try {
                    while (!e.isTerminated()) {
                        e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                    }
                } catch (InterruptedException interrupted) {
                    // Let the caller handle the interruption.
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
复制代码

先整体看这段 NioEventLoop 的创建代码,可以看到整个过程中存在一个成功标志,catch 每个 NioEventLoop 创建完成过程,如果发生异常则将所有已经创建的 NioEventLoop 关闭。重点的代码也就在 NioEventLoop 的创建了。所以我们继续跟:children[i] = newChild(executor, args);往下走,直接找到 io.netty.channel.nio.NioEventLoopGroup#newChild ,因为当前是 NioEventLoopGroup 的创建,所以知道找到子类的 newChild 实现。

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
复制代码

又将之前合并的 args 参数强转回来,继续跟进 NioEventLoop 构造:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 创建一个selector的二元组
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
复制代码

这里我们先整体看下,将之前的默认参数初始化到 NioEventLoop 属性中。其中有两处:openSelector() 和 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler)。这里先看父类构造:

往下跟,直接就是 SingleThreadEventLoop -> SingleThreadEventExecutor 的初始化,这些也可以在 NioEventLoop 的继承体系可以看到:

// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    // 创建一个收尾队列
    tailTasks = newTaskQueue(maxPendingTasks);
}

// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    // 这是当前NioEventLoop所包含的executor
    this.executor = ThreadExecutorMap.apply(executor, this);
    // 创建一个任务队列
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
复制代码

这里首先创建的是 SingleThreadEventExecutor ,这里重点需要关注的代码是:

this.executor = ThreadExecutorMap.apply(executor, this);
复制代码

这里this 是 NioEventLoop ,所以this.executor 就是前面说的 NioEventLoop 里的 Executor,这里我们先称为 子 Executor(子:对应的就是 NioEventLoop ,前面说的 总:对应的是 NioEventLoopGroup )。

而这里 子 Executor 的初始化是由一个 executor 参数的,这个就是前面 NioEventLoopGroup 构造方法一直带入的 总 Executor。那我们继续往下跟,看看这个子 Executor 是如何完成的初始化的。

    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(executor, "executor");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        // 这里创建的executor是子executor
        return new Executor() {
            // 这个execute()是子executor的execute()
            @Override
            public void execute(final Runnable command) {
                // 这里调用了NioEventLoopGroup所包含的executor的execute()
                // 即调用了“总的executor”的execute()
                executor.execute(apply(command, eventExecutor));
            }
        };
    }
复制代码

这段代码细看就会明白,这里创建的 子 Executor的创建也就是一个线程的创建,但是重点却在这个线程 Executor 的 execute()方法实现,只做了一件事情:就是调用 传入的 总 Executor 的 execute()方法。所以这里 子 Executor 做的事情就是调用 总 Executor 的 execute()。不要觉得这里绕,因为这还只是初始化,后面这里执行会更绕。[手动捂脸哭]

其实这里的 apply(command, eventExecutor),这里再执行 总 Executor 的 execute() 时还是会记录当前正在执行的线程,并且再执行完成时将当前记录值删除。

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Runnable() {
        @Override
        public void run() {
            setCurrentEventExecutor(eventExecutor);
            try {
                command.run();
            } finally {
                setCurrentEventExecutor(null);
            }
        }
    };
}
复制代码

这里再 NioEventLoop 的属性 Executor 创建完成时,又去创建了一个普通任务队列taskQueue = newTaskQueue(this.maxPendingTasks);并且还创建了一个收尾任务队列tailTasks = newTaskQueue(maxPendingTasks);。这几个队列后面会说到。这里继续跟 NioEventLoop 主流程初始化。

到这我们再回去看看 openSelector(),这里我们要先知道 SelectorTuple :

private static final class SelectorTuple {
    final Selector unwrappedSelector;  // NIO原生selector
    final Selector selector;  // 优化过的selector

    SelectorTuple(Selector unwrappedSelector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = unwrappedSelector;
    }

    SelectorTuple(Selector unwrappedSelector, Selector selector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = selector;
    }
}
复制代码

SelectorTuple 只是一个包含两个 Selector 的内部类,用于封装优化前后的 Selector。而 openSelector() 方法就是为了返回 Selector 并且根据配置判断是否需要优化当前 Selector 。下面看具体代码:

而具体的优化过程有兴趣的可以自己去看看,这里只要知道,若是禁用了优化则 SelectorTuple 的优化后的 Selector 和为优化的 Selector 均为 Nio 原生的 Selector。

而这io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)后面还有在 NioEventLoop 数组创建完成后,还有选择器创建和关闭***绑定等,感兴趣可以自己看看,这里不再介绍。

到这一个 NioEventLoop 的创建过程的代码也全部看完了。我想如果只看这个肯定还是有点懵,源码这个东西需要自己跟进去去看,debug 一点点的跟,跟着运行的代码去想为何这么实现,不过这里我也画个图,让大家更直观的了解到 NioEventLoopGroup 的创建流程以及主要操作。

我想大家结合这个图,再结合上面的分析过程,最好可以自己找到源码,跟一遍,应该可以理解 NioEvnetLoopGroup 的创建。



原文链接:https://juejin.cn/post/6921858121774137352