ChannelHandler的添加

ChannelHandler的添加在DefaultChannelPipeline#addLast方法里

public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    ObjectUtil.checkNotNull(handlers, "handlers");

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }

    return this;
}
复制代码

可以看到,这个函数能添加多个handler,继续跟进

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //检查handler是否被重复添加
        checkMultiplicity(handler);

        //创建节点
        newCtx = newContext(group, filterName(name, handler), handler);

        //将节点添加到链表
        addLast0(newCtx);

        //省略部分代码

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            //若不在当前eventLoop,把回调添加完成事件这个任务添加到异步任务队列
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    //回调添加完成事件
    callHandlerAdded0(newCtx);
    return this;
}
复制代码

概括一下,添加ChannelHandler主要做了三件事

  • 判断是否重复添加

  • 创建节点并添加至链表

  • 回调添加完成事件

判断是否重复添加

来看看checkMultiplicity这个函数

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            //如果该handler不是共享的并且已经被添加,抛出异常
            throw new ChannelPipelineException(
                h.getClass().getName() +
                " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        //标记该handler已被添加
        h.added = true;
    }
}
复制代码

如果这个handler是ChannelHandlerAdapter,并且该handler已被添加并且不是共享的,那么就会抛出异常

关于判断是否是共享的,这里会判断这个handler是否被@Sharable注解

创建节点并添加到链表

来看创建节点的这行代码

newCtx = newContext(group, filterName(name, handler), handler);
复制代码

这里调用了一个filterName函数,这个函数是用来判断handler的名字是否与已有的handler名字重复,默认这里传入的name为null,netty会帮我们自动生成一个name

接着看看newContext这个方法

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
复制代码

这里会创建一个DefaultChannelHandlerContext类型的节点(DefaultChannelHandlerContext继承自AbstractChannelHandlerContext

接着看添加添加到链表的addLast0方法

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}
复制代码

在 [Netty——pipeline初始化](https://juejin.im/post/6888879138606678030) 这篇文章里说过,pipeline内部维护了一个双向链表(链表节点类型为AbstractChannelHandlerContext),这里就是在向tail节点前插入一个新创建的节点

并且注意到,DefaultChannelHandlerContext的构造函数里传入了handler参数,也就是说,每个节点管理着一个handler,一个节点和一个handler对应

回调添加完成事件

接下来看看callHandlerAdded0这个函数

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.callHandlerAdded();
    } catch (Throwable t) {
        //省略
    }
}

final void callHandlerAdded() throws Exception {
    if (setAddComplete()) {
        handler().handlerAdded(this);
    }
}
复制代码

可以看到,这里在回调handler重写的handlerAdded方法并标记添加完成ChannelInitializer

ChannelInitializer

这里说一下比较特殊的handler即ChannelInitializer

通常在用户代码里,通过ChannelInitializer来添加childHandler,如下

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 100)
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(new LoggingHandler(LogLevel.INFO));
            p.addLast(serverHandler);
        }
    });
复制代码

这个initChannel方法在哪里被调用呢,跟进ChannelInitializerhandlerAdded看看

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        if (initChannel(ctx)) {
            // We are done with init the Channel, removing the initializer now.
            removeState(ctx);
        }
    }
}
复制代码

可以看到,在回调handlerAdded方法里会调用initChannel方法添加handler

继续看看removeState方法

private void removeState(final ChannelHandlerContext ctx) {
    if (ctx.isRemoved()) {
        initMap.remove(ctx);
    } else {
        ctx.executor().execute(new Runnable() {
            @Override
            public void run() {
                initMap.remove(ctx);
            }
        });
    }
}
复制代码

这里,主要是在删除自己即ChannelInitializer,因为添加完handler后ChannelInitializer的使命也就完成了

ChannelHandler的删除

ChannelHandler的删除在DefaultChannelPipeline#remove方法里

public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    return this;
}
复制代码

概括一下,删除ChannelHandler主要分为3步

  • 找到包含该ChannelHandler的节点

  • 从链表中删除该节点

  • 回调删除handler事件

找到节点

来看看getContextOrDie这个方法

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    //找到包含该handler的节点
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}
复制代码

context方法里就是在找包含该handler的节点,找的方法也很简单,就是从head节点开始向后遍历判断当前节点里的handler是否等于该handler,若找不到该节点,就抛出异常

删除节点

接着来看看remove方法

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    //不能删除head和tail节点
    assert ctx != head && ctx != tail;

    synchronized (this) {
        //删除节点
        atomicRemoveFromHandlerList(ctx);

        //省略部分代码

        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx;
        }
    }
    //回调删除handler事件
    callHandlerRemoved0(ctx);
    return ctx;
}
复制代码

来看看atomicRemoveFromHandlerList方法

private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next;
    next.prev = prev;
}
复制代码

这里用synchronized关键字保证线程安全,方法体里就是很简单的链表删除节点的代码

回调删除handler事件

来看看callHandlerRemoved0方法

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    // Notify the complete removal.
    try {
        ctx.callHandlerRemoved();
    } catch (Throwable t) {
        fireExceptionCaught(new ChannelPipelineException(
            ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
    }
}

final void callHandlerRemoved() throws Exception {
    try {
        // Only call handlerRemoved(...) if we called handlerAdded(...) before.
        if (handlerState == ADD_COMPLETE) {
            handler().handlerRemoved(this);
        }
    } finally {
        // Mark the handler as removed in any case.
        setRemoved();
    }
}
复制代码

这里就是回调handler重写的handlerRemoved方法并标记已被removed