网络传输的基本单位是字节,在Java NIO中提供了ByteBuffer作为字节缓冲区容器,但该类的API使用起来不太方便,所以Netty实现了ByteBuf作为其替代品,下面是使用ByteBuf的优点:
ByteBuffer继承于abstract class Buffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | RandomAccessFile aFile = new RandomAccessFile( "data/nio-data.txt", "rw"); FileChannel inChannel = aFile.getChannel(); // 分配一个48字节大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate( 48); int bytesRead =; // 读取数据到缓冲区 while (bytesRead != - 1) { buf.flip(); // 将position重置为0 while(buf.hasRemaining()){ System.out.print(( char) buf.get()); // 读取数据并输出到控制台 } buf.clear(); // 清理缓冲区 bytesRead =; } aFile.close(); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | /** * Sets this buffer's mark at its position. * * @return This buffer */ public final Buffer mark() { mark = position; // mark属性是用来标记当前索引位置的 return this; } // 将当前索引位置重置为mark所标记的位置 public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; } // 翻转这个Buffer,将limit设置为当前索引位置,然后再把position重置为0 public final Buffer flip() { limit = position; position = 0; mark = - 1; return this; } // 清理缓冲区 // 说是清理,也只是把postion与limit进行重置,之后再写入数据就会覆盖之前的数据了 public final Buffer clear() { position = 0; limit = capacity; mark = - 1; return this; } // 返回剩余空间 public final int remaining() { return limit - position; } |
Java NIO中的Buffer API操作的麻烦之处就在于读写转换需要手动重置指针。而ByteBuf没有这种繁琐性,它维护了两个不同的索引,一个用于读取,一个用于写入。当你从ByteBuf读取数据时,它的readerIndex将会被递增已经被读取的字节数,同样的,当你写入数据时,writerIndex则会递增。readerIndex的最大范围在writerIndex的所在位置,如果试图移动readerIndex超过该值则会触发异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | public byte readByte() { this.checkReadableBytes0( 1); // 检查readerIndex是否已越界 int i = this.readerIndex; byte b = this._getByte(i); this.readerIndex = i + 1; // 递增readerIndex return b; } private void checkReadableBytes0(int minimumReadableBytes) { this.ensureAccessible(); if( this.readerIndex > this.writerIndex - minimumReadableBytes) { throw new IndexOutOfBoundsException(String.format( "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf( this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf( this.writerIndex), this})); } } public ByteBuf writeByte(int value) { this.ensureAccessible(); this.ensureWritable0( 1); // 检查writerIndex是否会越过capacity this._setByte( this.writerIndex++, value); return this; } private void ensureWritable0(int minWritableBytes) { if(minWritableBytes > this.writableBytes()) { if(minWritableBytes > this.maxCapacity - this.writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf( this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf( this.maxCapacity), this})); } else { int newCapacity = this.alloc().calculateNewCapacity( this.writerIndex + minWritableBytes, this.maxCapacity); this.capacity(newCapacity); } } } // get与set只对传入的索引进行了检查,然后对其位置进行get或set public byte getByte(int index) { this.checkIndex(index); return this._getByte(index); } public ByteBuf setByte(int index, int value) { this.checkIndex(index); this._setByte(index, value); return this; } |
1 2 3 4 5 6 7 8 | ByteBuf heapBuf = Unpooled.copiedBuffer(bytes); if (heapBuf.hasArray()) { // 判断是否有一个支撑数组 byte[] array = heapBuf.array(); // 计算第一个字节的偏移量 int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); int length = heapBuf.readableBytes(); // 获得可读字节 handleArray(array,offset,length); // 调用你的处理方法 } |
另一种模式为堆外分配,Java NIO ByteBuffer类在JDK1.4时就已经允许JVM实现通过JNI调用来在堆外分配内存(调用malloc()函数在JVM堆外分配内存),这主要是为了避免额外的缓冲区复制操作。
1 2 3 4 5 6 7 8 | ByteBuf directBuf = Unpooled.directBuffer(capacity); if (!directBuf.hasArray()) { int length = directBuf.readableBytes(); byte[] array = new byte[length]; // 将字节复制到数组中 directBuf.getBytes(directBuf.readerIndex(),array); handleArray(array, 0,length); } |
1 2 3 4 5 6 7 | CompositeByteBuf messageBuf = Unpooled.compositeBuffer(); ByteBuf headerBuf = ....; ByteBuf bodyBuf = ....; messageBuf.addComponents(headerBuf,bodyBuf); for (ByteBuf buf : messageBuf) { System.out.println(buf.toString()); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | // 通过array.clone()来复制一个数组进行包装 public static ByteBuf copiedBuffer(byte[] array) { return array.length == 0?EMPTY_BUFFER:wrappedBuffer(( byte[])array.clone()); } // 默认是堆内分配 public static ByteBuf wrappedBuffer(byte[] array) { return (ByteBuf)(array.length == 0?EMPTY_BUFFER: new UnpooledHeapByteBuf(ALLOC, array, array.length)); } // 也提供了堆外分配的方法 private static final ByteBufAllocator ALLOC; public static ByteBuf directBuffer(int initialCapacity) { return ALLOC.directBuffer(initialCapacity); } |
1 2 3 4 | Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); ByteBuf buffer = allocator.directBuffer(); do something....... |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | package io.netty.buffer; public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf { // 由于ByteBuf的实例对象会非常多,所以这里没有将refCnt包装为AtomicInteger // 而是使用一个全局的AtomicIntegerFieldUpdater来负责操作refCnt private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); // 每个ByteBuf的初始引用值都为1 private volatile int refCnt = 1; public int refCnt() { return this.refCnt; } protected final void setRefCnt(int refCnt) { this.refCnt = refCnt; } public ByteBuf retain() { return this.retain0( 1); } // 引用计数值递增increment,increment必须大于0 public ByteBuf retain(int increment) { return this.retain0(ObjectUtil.checkPositive(increment, "increment")); } public static int checkPositive(int i, String name) { if(i <= 0) { throw new IllegalArgumentException(name + ": " + i + " (expected: > 0)"); } else { return i; } } // 使用CAS操作不断尝试更新值 private ByteBuf retain0(int increment) { int refCnt; int nextCnt; do { refCnt = this.refCnt; nextCnt = refCnt + increment; if(nextCnt <= increment) { throw new IllegalReferenceCountException(refCnt, increment); } } while(!refCntUpdater.compareAndSet( this, refCnt, nextCnt)); return this; } public boolean release() { return this.release0( 1); } public boolean release(int decrement) { return this.release0(ObjectUtil.checkPositive(decrement, "decrement")); } private boolean release0(int decrement) { int refCnt; do { refCnt = this.refCnt; if(refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); } } while(!refCntUpdater.compareAndSet( this, refCnt, refCnt - decrement)); if(refCnt == decrement) { this.deallocate(); return true; } else { return false; } } protected abstract void deallocate(); } |
Netty中的Channel与Java NIO的概念一样,都是对一个实体或连接的抽象,但Netty提供了一套更加通用的API。就以网络套接字为例,在Java中OIO与NIO是截然不同的两套API,假设你之前使用的是OIO而又想更改为NIO实现,那么几乎需要重写所有代码。而在Netty中,只需要更改短短几行代码(更改Channel与EventLoop的实现类,如把OioServerSocketChannel替换为NioServerSocketChannel),就可以完成OIO与NIO(或其他)之间的转换。
1 2 3 4 5 6 7 8 9 10 11 12 13 | final Channel channel = ... final ByteBuf buffer = Unpooled.copiedBuffer( "Hello,World!", CharsetUtil.UTF_8).retain(); Runnable writer = new Runnable() { public void run() { channel.writeAndFlush(buffer.duplicate()); } }; Executor executor = Executors.newCachedThreadPool(); executor.execute(writer); executor.execute(writer); ....... |
Nmae | Package | Description |
NIO | | 以Java NIO为基础实现 |
OIO | | 以java.net为基础实现,使用阻塞I/O模型 |
Epoll | | 由JNI驱动epoll()实现的更高性能的非阻塞I/O,它只能使用在Linux |
Local | | 本地传输,在JVM内部通过管道进行通信 |
Embedded | | 允许在不需要真实网络传输的环境下使用ChannelHandler,主要用于对ChannelHandler进行测试 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | Channel channel = ... ChannelFuture future = channel.connect( new InetSocketAddress( "", 6666)); // 注册一个*** future.addListener( new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { // do something.... } else { // 输出错误信息 Throwable cause = future.cause(); cause.printStackTrace(); // do something.... } } }); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | package; import; import io.netty.util.concurrent.GenericFutureListener; public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> { // 在Future完成时关闭 ChannelFutureListener CLOSE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) {; } }; // 如果失败则关闭 ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if(!future.isSuccess()) {; } } }; // 将异常信息传递给下一个ChannelHandler ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if(!future.isSuccess()) {; } } }; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | package; public interface ChannelHandler { void handlerAdded(ChannelHandlerContext var1) throws Exception; void handlerRemoved(ChannelHandlerContext var1) throws Exception; /** @deprecated */ void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; // 该注解表明这个ChannelHandler可被其他线程复用 public Sharable { } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | package; import; import; public interface ChannelInboundHandler extends ChannelHandler { // 当channel被注册到EventLoop时被调用 void channelRegistered(ChannelHandlerContext var1) throws Exception; // 当channel已经被创建,但还未注册到EventLoop(或者从EventLoop中注销)被调用 void channelUnregistered(ChannelHandlerContext var1) throws Exception; // 当channel处于活动状态(连接到远程节点)被调用 void channelActive(ChannelHandlerContext var1) throws Exception; // 当channel处于非活动状态(没有连接到远程节点)被调用 void channelInactive(ChannelHandlerContext var1) throws Exception; // 当从channel读取数据时被调用 void channelRead(ChannelHandlerContext var1, Object var2) throws Exception; // 当channel的上一个读操作完成时被调用 void channelReadComplete(ChannelHandlerContext var1) throws Exception; // 当ChannelInboundHandler.fireUserEventTriggered()方法被调用时被调用 void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception; // 当channel的可写状态发生改变时被调用 void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception; // 当处理过程中发生异常时被调用 void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; } package; import; import; import; import; public interface ChannelOutboundHandler extends ChannelHandler { // 当请求将Channel绑定到一个地址时被调用 // ChannelPromise是ChannelFuture的一个子接口,定义了如setSuccess(),setFailure()等方法 void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception; // 当请求将Channel连接到远程节点时被调用 void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception; // 当请求将Channel从远程节点断开时被调用 void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // 当请求关闭Channel时被调用 void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // 当请求将Channel从它的EventLoop中注销时被调用 void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // 当请求从Channel读取数据时被调用 void read(ChannelHandlerContext var1) throws Exception; // 当请求通过Channel将数据写到远程节点时被调用 void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception; // 当请求通过Channel将缓冲中的数据冲刷到远程节点时被调用 void flush(ChannelHandlerContext var1) throws Exception; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | public abstract class ChannelHandlerAdapter implements ChannelHandler { boolean added; public ChannelHandlerAdapter() { } // 该方法不允许将此ChannelHandler共享复用 protected void ensureNotSharable() { if( this.isSharable()) { throw new IllegalStateException( "ChannelHandler " + this.getClass().getName() + " is not allowed to be shared"); } } // 使用反射判断实现类有没有@Sharable注解,以确认该类是否为可共享复用的 public boolean isSharable() { Class clazz = this.getClass(); Map cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = (Boolean)cache.get(clazz); if(sharable == null) { sharable = Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class)); cache.put(clazz, sharable); } return sharable.booleanValue(); } public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { public ChannelInboundHandlerAdapter() { } public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } } public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { public ChannelOutboundHandlerAdapter() { } public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } public void read(ChannelHandlerContext ctx) throws Exception {; } public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | // 这个泛型为消息对象的类型 public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter { private final TypeParameterMatcher matcher; private final boolean autoRelease; protected SimpleChannelInboundHandler() { this( true); } protected SimpleChannelInboundHandler(boolean autoRelease) { this.matcher = TypeParameterMatcher.find( this, SimpleChannelInboundHandler.class, "I"); this.autoRelease = autoRelease; } protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) { this(inboundMessageType, true); } protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) { this.matcher = TypeParameterMatcher.get(inboundMessageType); this.autoRelease = autoRelease; } public boolean acceptInboundMessage(Object msg) throws Exception { return this.matcher.match(msg); } // SimpleChannelInboundHandler只是替你做了ReferenceCountUtil.release() public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if( this.acceptInboundMessage(msg)) { this.channelRead0(ctx, msg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if( this.autoRelease && release) { ReferenceCountUtil.release(msg); } } } // 这个方法才是我们需要实现的方法 protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception; } // ReferenceCountUtil中的源码,release方法对消息对象的类型进行判断然后调用它的release()方法 public static boolean release(Object msg) { return msg instanceof ReferenceCounted?((ReferenceCounted)msg).release(): false; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | public class DefaultChannelPipeline implements ChannelPipeline { ......... // 头部节点和尾部节点的引用变量 // ChannelHandlerContext在ChannelPipeline中是以链表的形式组织的 final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ......... // 添加一个ChannelHandler到链表尾部 public final ChannelPipeline addLast(String name, ChannelHandler handler) { return this.addLast((EventExecutorGroup) null, name, handler); } public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized( this) { // 检查ChannelHandler是否为一个共享对象(@Sharable) // 如果该ChannelHandler没有@Sharable注解,并且是已被添加过的那么就抛出异常 checkMultiplicity(handler); // 返回一个DefaultChannelHandlerContext,注意该对象持有了传入的ChannelHandler newCtx = this.newContext(group, this.filterName(name, handler), handler); this.addLast0(newCtx); // 如果当前ChannelPipeline没有被注册,那么就先加到未决链表中 if(! this.registered) { newCtx.setAddPending(); this.callHandlerCallbackLater(newCtx, true); return this; } // 否则就调用ChannelHandler中的handlerAdded() EventExecutor executor = newCtx.executor(); if(!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute( new Runnable() { public void run() { DefaultChannelPipeline. this.callHandlerAdded0(newCtx); } }); return this; } } this.callHandlerAdded0(newCtx); return this; } // 将新的ChannelHandlerContext插入到尾部与尾部之前的节点之间 private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = this.tail.prev; newCtx.prev = prev; = this.tail; = newCtx; this.tail.prev = newCtx; } ..... } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | public class DefaultChannelPipeline implements ChannelPipeline { public final ChannelPipeline fireChannelRead(Object msg) { // 注意这里将头节点传入了进去 AbstractChannelHandlerContext.invokeChannelRead( this.head, msg); return this; } } abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if(executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute( new Runnable() { public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if( this.invokeHandler()) { try { ((ChannelInboundHandler) this.handler()).channelRead( this, msg); } catch (Throwable var3) { this.notifyHandlerException(var3); } } else { // 寻找下一个ChannelHandler this.fireChannelRead(msg); } } public ChannelHandlerContext fireChannelRead(Object msg) { invokeChannelRead( this.findContextInbound(), msg); return this; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx =; } while(!ctx.inbound); // 直到找到一个ChannelInboundHandler return ctx; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | package; public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max( 16, SystemPropertyUtil.getInt( "io.netty.eventLoop.maxPendingTasks", 2147483647)); private final Queue<Runnable> tailTasks; protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); } protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) { this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); } protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); this.tailTasks = this.newTaskQueue(maxPendingTasks); } protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); this.tailTasks = this.newTaskQueue(maxPendingTasks); } // 返回它所在的EventLoopGroup public EventLoopGroup parent() { return (EventLoopGroup) super.parent(); } public EventLoop next() { return (EventLoop); } // 注册Channel,这里ChannelPromise和Channel关联到了一起 public ChannelFuture register(Channel channel) { return this.register((ChannelPromise)( new DefaultChannelPromise(channel, this))); } public ChannelFuture register(ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); this, promise); return promise; } // 剩下这些函数都是用于调度任务 public final void executeAfterEventLoopIteration(Runnable task) { ObjectUtil.checkNotNull(task, "task"); if( this.isShutdown()) { reject(); } if(! this.tailTasks.offer(task)) { this.reject(task); } if( this.wakesUpForTask(task)) { this.wakeup( this.inEventLoop()); } } final boolean removeAfterEventLoopIterationTask(Runnable task) { return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task")); } protected boolean wakesUpForTask(Runnable task) { return !(task instanceof SingleThreadEventLoop.NonWakeupRunnable); } protected void afterRunningAllTasks() { this.runAllTasksFrom( this.tailTasks); } protected boolean hasTasks() { return super.hasTasks() || ! this.tailTasks.isEmpty(); } public int pendingTasks() { return super.pendingTasks() + this.tailTasks.size(); } interface NonWakeupRunnable extends Runnable { } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { ..... public void execute(Runnable task) { if(task == null) { throw new NullPointerException( "task"); } else { boolean inEventLoop = this.inEventLoop(); if(inEventLoop) { this.addTask(task); } else { this.startThread(); this.addTask(task); if( this.isShutdown() && this.removeTask(task)) { reject(); } } if(! this.addTaskWakesUp && this.wakesUpForTask(task)) { this.wakeup(inEventLoop); } } } public boolean inEventLoop(Thread thread) { return thread == this.thread; } ..... } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | // 该方法在Bootstrap的父类AbstractBootstrap中,泛型B为它当前子类的类型(为了链式调用) public B group(EventLoopGroup group) { if(group == null) { throw new NullPointerException( "group"); } else if( != null) { throw new IllegalStateException( "group set already"); } else { = group; return this; } } // ServerBootstrap中的实现,它也支持只用一个EventLoopGroup public ServerBootstrap group(EventLoopGroup group) { return, group); } public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {; if(childGroup == null) { throw new NullPointerException( "childGroup"); } else if( this.childGroup != null) { throw new IllegalStateException( "childGroup set already"); } else { this.childGroup = childGroup; return this; } } |
Bootstrap其实没有什么可以好说的,它就只是一个装配工,将各个组件拼装组合到一起,然后进行一些配置,有关它的详细API请参考Netty JavaDoc。下面我们将通过一个经典的Echo客户端与服务器的例子,来梳理一遍创建Netty应用的流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.printf( "Server received: %s \n", in.toString(CharsetUtil.UTF_8)); // 由于读事件不是一次性就能把完整消息发送过来的,这里并没有调用writeAndFlush ctx.write(in); // 直接把消息写回给客户端(会被出站消息处理器处理,不过我们的应用没有实现任何出站消息处理器) } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 等读事件已经完成时,冲刷之前写数据的缓冲区 // 然后添加了一个***,它会在Future完成时进行关闭该Channel. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } // 处理异常,输出异常信息,然后关闭Channel public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler(); EventLoopGroup group = new NioEventLoopGroup(); // 传输类型使用NIO try { ServerBootstrap b = new ServerBootstrap(); // 配置EventLoopGroup .channel(NioServerSocketChannel.class) // 配置Channel的类型 .localAddress( new InetSocketAddress(port)) // 配置端口号 .childHandler( new ChannelInitializer<SocketChannel>() { // 实现一个ChannelInitializer,它可以方便地添加多个ChannelHandler protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(serverHandler); } }); // i绑定地址,同步等待它完成 ChannelFuture f = b.bind().sync(); // 关闭这个Future; } finally { // 关闭应用程序,一般来说Netty应用只需要调用这个方法就够了 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.printf( "Usage: %s <port> \n", EchoServer.class.getSimpleName() ); return; } int port = Integer.parseInt(args[ 0]); new EchoServer(port).start(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public class EchoClientInboundHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 我们在Channel连接到远程节点直接发送一条消息给服务器 */ public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer( "Hello, Netty!", CharsetUtil.UTF_8)); } protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 输出从服务器Echo的消息 System.out.printf( "Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8)); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); .channel(NioSocketChannel.class) .remoteAddress( new InetSocketAddress(host, port)) // 服务器的地址 .handler( new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new EchoClientInboundHandler()); } }); ChannelFuture f = b.connect().sync(); // 连接到服务器; } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.printf( "Usage: %s <host> <port> \n", EchoClient.class.getSimpleName()); return; } String host = args[ 0]; int port = Integer.parseInt(args[ 1]); new EchoClient(host, port).start(); } } |