Netty pipeline分析(二)

ChannelPipeline/ChannelHandlerContext

Posted by Jay on August 24, 2019

Netty pipeline分析(二)

Netty pipeline分析(一)这篇文章分析了pipeline在Netty中所处的角色,像是一条流水线,控制着字节流的读写。本文在这个基础上继续深挖pipeline在事件传播、异常传播等方面的细节。主要分为以下几点:

  • Netty中的Unsafe
  • inbound事件的传播
  • outbound事件的传播
  • 异常事件的传播

一、Netty中的Unsafe

pipeline中所有的IO操作最终都会由Unsafe完成,因此在分析pipeline事件传播方面的细节之前,先介绍Unsafe的作用。

1.Unsafe接口

Unsafe接口在Channel接口中定义,属于Channel的内部类,表明Unsafe和Channel密切相关。

interface Unsafe {
    RecvByteBufAllocator.Handle recvBufAllocHandle();

    SocketAddress localAddress();
    SocketAddress remoteAddress();

    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();

    ChannelPromise voidPromise();
    ChannelOutboundBuffer outboundBuffer();
}

从Unsafe接口中定义的方法可以看出,Unsafe接口的主要功能是接收数据时分配内存、获取本地与远端地址、将channel注册到事件循环、绑定端口、socket的连接和关闭、socket的读写等,这些操作都和JDK底层相关。

2.Unsafe继承结构

  • Unsafe作为顶层接口,AbstractUnsafe是其抽象实现,子类必须继承AbstractUnsafe。

  • 针对基于Selector的NIO读写场景,增加了NioUnsafe接口,其扩展了Unsafe接口,并增加了以下方法:

    public interface NioUnsafe extends Unsafe {
        SelectableChannel ch();
        void finishConnect();
        void read();
        void forceFlush();
    }
    

    NioUnsafe增加了获取底层JDK NIO SelectableChannel的方法,同时定义了从SelectableChannel读取数据的方法。

  • AbstractNioUnsafe是基于Selector的NIO读写场景下的Unsafe抽象实现,能够通过其外部类AbstractNioChannel的相关方法如selectionKey()、javaChannel()等方法获得SelectionKey、SelectableChannel等。

  • NioByteUnsafe和NioSocketChannelUnsafe作为客户端NioSocketChannel的Unsafe实现,实现了channel基本的IO操作,如数据的读写,这些都与JDK底层相关。

  • NioMessageUnsafe和NioByteUnsafe是处在同一层次的实现,用于读取新连接。Netty将一个新连接的建立也当作一个IO操作来处理,这里的message的含义可以当作是一个SelectableChannel,读的意思就是accept一个SelectableChannel。

3.Unsafe的读写操作

从以上继承结构来看,可以总结出两种类型的Unsafe实现,一种是与连接的字节数据读写相关的NioByteUnsafe,一种是与新连接建立操作相关的NioMessageUnsafe。

NioByteUnsafe中的读:委托到外部类NioSocketChannel

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

可以看到,doReadBytes()方法从JDK SocketChannel中读取字节数据到Netty的ByteBuf中。

NioByteUnsafe中的写:委托到外部类NioSocketChannel

protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

doWriteBytes()方法将Netty ByteBuf中的字节数据写出到JDK SocketChannel中。

NioMessageUnsafe中的读:委托到外部类NioServerSocketChannel

protected int doReadMessages(List<Object> buf) throws Exception {
    // 调用JDK ServerSocketChannel.accept()方法接受连接
    SocketChannel ch = javaChannel().accept(); 

    try {
        if (ch != null) {
            // 将JDK连接的SocketChannel封装成Netty NioSocketChannel
            buf.add(new NioSocketChannel(this, ch)); 
            return 1; // 成功接收连接,返回1
        }
    } catch (Throwable t) {
				// 省略代码...
    }

    return 0;
}

NioMessageUnsafe的读操作中调用JDK ServerSocketChannel的accept()方法,接收一条新连接,并包装成Netty NioSocketChannel。

二、inbound事件的传播

1.inbound事件

首先看ChannelInboundHandler接口的定义:

public interface ChannelInboundHandler extends ChannelHandler {
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

从ChannelInboundHandler接口的定义可以看出,inbound事件包含channelRegistered、channelUnregistered、channelActive、channelInactive、channelRead、channelReadComplete、exceptionCaught等事件,而且当比如channelRead()方法被调用时,该事件已经发生,handler是被动触发的。

2.channelRead事件的传播

下面以channelRead事件的传播为例,来分析inbound事件的传播细节。先看服务端ServerBootstrap的配置:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new InBoundHandlerA());
                ch.pipeline().addLast(new InBoundHandlerC());
                ch.pipeline().addLast(new InBoundHandlerB());
            }
        });

可知服务端在接收新连接时会为客户端channel pipeline添加InBoundHandlerA、InBoundHandlerC、InBoundHandlerB三个ChannelInboundHandler。

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA: " + msg);
        ctx.fireChannelRead(msg);
    }
}
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerB: " + msg);
        ctx.fireChannelRead(msg);
    }
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.channel().pipeline().fireChannelRead("hello world");  // 1
//        ctx.fireChannelRead("hello world");  // 2
    }
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerC: " + msg);
        ctx.fireChannelRead(msg);
    }
}

因此客户端channel pipeline的结构如下图所示:

可以看到,在新连接接入时,会先回调channelActive()方法,此时InBoundHandlerB的channelActive()方法得到执行,触发客户端pipeline.fireChannelRead()方法,将channlRead事件传播至pipeline。在实际工作中一般是由NioEventLoop轮询到读IO事件,并触发NioByteUnsafe.read()操作,如下:

// NioEventLoop.processSelectedKey方法局部
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    // 对于NioSocketChannel,是NioByteUnsafe.read()
    unsafe.read();
    // ...
}
// NioByteUnsafe
public final void read() {
    final ChannelConfig config = config(); // NioSocketChannel配置、pipeline
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator(); // ByteBuf分配
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // AdaptiveRecvByteBufAllocator.HandleImpl
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    do {
        byteBuf = allocHandle.allocate(allocator);  // doubt 分配ByteBuf
        allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 将数据读取到分配的ByteBuf中去
        if (allocHandle.lastBytesRead() <= 0) {
            // nothing was read. release the buffer.
            byteBuf.release();
            byteBuf = null;
            close = allocHandle.lastBytesRead() < 0;
            break;
        }

        allocHandle.incMessagesRead(1);
        readPending = false;
        pipeline.fireChannelRead(byteBuf); // 触发事件,将会引发pipeline的读事件传播
        byteBuf = null;
    } while (allocHandle.continueReading());  // doubt

    allocHandle.readComplete(); // doubt
    pipeline.fireChannelReadComplete();
}

在read()方法中读取到IO数据之后,调用pipeline.fireChannelRead(byteBuf);代码将channelRead事件传播至pipeline,并从HeadContext开始处理该读到的数据。

这里为了分析方便,使用InBoundHandlerB的channelActive()方法模拟触发客户端channel读取到数据并传播至pipeline的逻辑,并分为两种情况分析:

  • ctx.channel().pipeline().fireChannelRead("hello world");: 调用pipeline的fireChannelRead()方法传播事件;
  • ctx.fireChannelRead("hello world");: 调用ChannelHandlerContext的fireChannelRead()方法传播事件。

分析两种触发方式的区别。

ctx.channel().pipeline().fireChannelRead("hello world");: 调用pipeline的fireChannelRead()方法传播事件

// DefaultChannelPipeline
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}
// AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
// HeadContext
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

DefaultChannelPipeline.fireChannelRead()方法首先调用到HeadContext.channelRead()方法:

// HeadContext
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

HeadContext.channelRead()方法将事件往后传播:

// HeadContext
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}
private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this; // 遍历链表
    do {
        ctx = ctx.next; // 往后查找inbound handler
    } while (!ctx.inbound);
    return ctx;
}

此时找到InBoundHandlerA,并调用invokeChannelRead()方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

就这样,InBoundHandlerA的channelRead()方法就会回调到。类似的,InBoundHandlerC、InBoundHandlerB的channelRead()方法也会回调到。最终channelRead()事件到达TailContext:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

对于未处理的msg,TailContext只是打印了一条debug日志并释放内存。

根据上面的分析可知,通过pipleline触发inbound事件传播时,从HeadContext开始传播。对于inbound事件,会按照ChannelInboundHandler添加的顺序处理该事件,HeadContext首先处理该事件,然后依次传递到pipeline中的ChannelInboundHandler中。

ctx.fireChannelRead("hello world");: 调用ChannelHandlerContext的fireChannelRead()方法传播事件

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

通过调用ChannelHandlerContext的fireChannelRead()方法传播channelRead事件时,直接查找到当前节点的下一个inbound节点,将事件传播至该节点,不会从HeadContext开始传递。

以上便是inbound事件传播的流程分析。

3.SimpleChannelInboundHandler介绍
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {

    private final boolean autoRelease;

    protected SimpleChannelInboundHandler() {
        this(true);
    }
    protected SimpleChannelInboundHandler(boolean autoRelease) {
        matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
        this.autoRelease = autoRelease;
    }

    protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
        this(inboundMessageType, true);
    }

    protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
        matcher = TypeParameterMatcher.get(inboundMessageType);
        this.autoRelease = autoRelease;
    }

    public boolean acceptInboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg); // 处理数据
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg); // 释放内存
            }
        }
    }

    protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}

用户一般可以继承SimpleChannelInboundHandler类来实现自己的ChannelInboundHandler,同时可以指定该handler想要处理的数据类型。这样channelRead事件传播至该handler时,可以根据指定的数据类型决定是否处理传入的数据,如果匹配,则将传入的数据转为指定的类型,并调用channelRead0()方法,并在事件处理完毕后释放内存;如果类型不匹配,直接将事件往pipeline后面传播。

三、outbound事件的传播

1.outbound事件

首先来看ChannelOutboundHandler接口:

public interface ChannelOutboundHandler extends ChannelHandler {
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void read(ChannelHandlerContext ctx) throws Exception;
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    void flush(ChannelHandlerContext ctx) throws Exception;
}

从ChannelOutboundHandler接口的定义可以看出,outbound事件包括端口绑定bind、连接connect、断开连接disconnect、关闭连接close、取消channel在EventLoop的注册、读写数据、刷新数据等。这些操作一般都是由用户主动触发的,这与inbound事件(如channelRead)被动触发的情况不同。

2.write事件的传播

下面以write事件的传播为例,来分析outbound事件的传播细节。先看服务端ServerBootstrap的配置:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new OutBoundHandlerA());
                ch.pipeline().addLast(new OutBoundHandlerC());
                ch.pipeline().addLast(new OutBoundHandlerB());
            }
        });

可知服务端在接收新连接时会为客户端channel pipeline添加OutBoundHandlerA、OutBoundHandlerC、OutBoundHandlerB三个ChannelOutboundHandler。

public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerA: " + msg);
        ctx.write(msg, promise);
    }
}
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerB: " + msg);
        ctx.write(msg, promise);
    }
    public void handlerAdded(final ChannelHandlerContext ctx) {
        ctx.executor().schedule(() -> { // 定时任务。模拟用户写操作
            ctx.channel().write("hello world"); // 1
//            ctx.write("hello world");  // 2
        }, 3, TimeUnit.SECONDS);
    }
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerC: " + msg);
        ctx.write(msg, promise);
    }
}

因此客户端channel pipeline的结构如下图所示:

可以看到,在新连接接入时,会先回调OutBoundHandlerB的handlerAdded()方法,该方法会调度一个定时任务,模拟用户触发的写操作,将write事件传播至pipeline。

这里分两种情况进行分析,介绍两种触发方式的区别:

  • ctx.channel().write("hello world");: 调用channel(也即pipeline)的write()方法传播事件;
  • ctx.write("hello world");: 调用ChannelHandlerContext的write()方法传播事件。

ctx.channel().write("hello world");: 调用channel(也即pipeline)的write()方法传播事件

// DefaultChannelPipeline
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}
// TailContext
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
// TailContext
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    try {
        if (!validatePromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    write(msg, false, promise); // 这里!!!

    return promise;
}
// TailContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound(); // 找到下一个outbound handler
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) { // false
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise); // 这里!!!
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

执行 ctx.channel().write("hello world");时,会调用到TailContxt.write()方法。TailContxt.write()方法中首先找出下一个outbound handler:

// TailContext
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev; // 反向遍历
    } while (!ctx.outbound);
    return ctx;
}

findContextOutbound()方法通过链表反向遍历的方式查找下一个outbound handler,这里是找到了OutBoundHandlerB,并调用OutBoundHandlerB.invokeWrite()方法。

// OutBoundHandlerB对应的ChannelHandlerContext
private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}
// OutBoundHandlerB对应的ChannelHandlerContext
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}
// OutBoundHandlerB
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    System.out.println("OutBoundHandlerB: " + msg);
    ctx.write(msg, promise);
}

在调用完OutBoundHandlerB.write()方法后,通过ctx.write(msg, promise);继续传播事件:

// OutBoundHandlerB对应的ChannelHandlerContext
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    try {
        if (!validatePromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    write(msg, false, promise);

    return promise;
}
// OutBoundHandlerB对应的ChannelHandlerContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

跟上面类似的逻辑,调用ctx.write(msg, promise);时直接查找下一个outbound handler,这里是OutBoundHandlerC。接下来是通过next.invokeWrite(m, promise);调用OutBoundHandlerC.write()方法,与上面相同。就这样,write事件将传播至HeadContext。

// HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

HeadContext将调用NioByteUnsafe.write()方法,最终处理这个写出的数据:

public final void write(Object msg, ChannelPromise promise) {

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg); // mas转换,比如堆内存转为直接内存
        size = pipeline.estimatorHandle().size(msg); // 计算消息的大小
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise); // 消息添加到ChannelOutboundBuffer
}

根据上面的分析可知,通过channel(也即pipleline)触发outbound事件传播时,从TailContext开始传播。对于outbound事件,会按照ChannelOutboundHandler添加的顺序逆序处理该事件,TailContext由于是inbound类型的ChannelHandler,它直接将outbound事件传播至下一个outbound节点,然后逐渐传递到pipeline中的HeadContext节点,最终事件由HeadContext节点处理。

ctx.write("hello world");: 调用ChannelHandlerContext的write()方法传播事件

// OutBoundHandlerB对应的ChannelHandlerContext
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
   
  	// ... 省略
  
    write(msg, false, promise);

    return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
    // 直接查找下一个outbound handler
    AbstractChannelHandlerContext next = findContextOutbound(); 
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise); // 这里!!!
        } 
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

调用ChannelHandlerContext的write()方法传播outbound事件时,直接从当前节点开始反向遍历context链表,查找下一个outbound handler,并调用其write方法,然后将write事件传播至HeadContext。

至此,outbound事件传播分析完毕。

四、异常事件的传播

下面分析异常事件传播的细节。先看服务端ServerBootstrap的配置:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new InBoundHandlerA());
                ch.pipeline().addLast(new InBoundHandlerB());
                ch.pipeline().addLast(new InBoundHandlerC());
                ch.pipeline().addLast(new OutBoundHandlerA());
                ch.pipeline().addLast(new OutBoundHandlerB());
                ch.pipeline().addLast(new OutBoundHandlerC());
            }
        });

可知服务端在接收新连接时会为客户端channel pipeline添加InBoundHandlerA、InBoundHandlerB、InBoundHandlerC、OutBoundHandlerA、OutBoundHandlerB、OutBoundHandlerC六个ChannelHandler。

因此客户端channel pipeline的结构如下图所示:

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InBoundHandlerA.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        throw new BusinessException("from InBoundHandlerB"); // 抛出异常
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InBoundHandlerB.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InBoundHandlerC.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("OutBoundHandlerA.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("OutBoundHandlerB.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("OutBoundHandlerC.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class BusinessException extends Exception {
    public BusinessException(String message) {
        super(message);
    }
}

从InBoundHandlerB的定义可以看出,在接收到channelRead事件时将抛出BusinessException,这种情况模拟了inbound事件在pipeline传播以及处理过程中发生的异常。下面先来分析inbound事件传播过程中发生异常时,异常事件传播的细节。

1.inbound事件传播过程中发生异常

假设channel读取到了一定数据,并回调了InBoundHandlerB.channelRead()方法,此时抛出BusinessException异常:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    throw new BusinessException("from InBoundHandlerB");
}
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
          	// handler(): InBoundHandlerB
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

此时将进入notifyHandlerException(t);:

private void notifyHandlerException(Throwable cause) {
    if (inExceptionCaught(cause)) {
        if (logger.isWarnEnabled()) {
            logger.warn(
                    "An exception was thrown by a user handler " +
                            "while handling an exceptionCaught event", cause);
        }
        return;
    }

    invokeExceptionCaught(cause);
}

notifyHandlerException()方法直接调用invokeExceptionCaught(cause);传播异常事件:

private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            // 省略...
        }
    } else {
        fireExceptionCaught(cause);
    }
}

在inbound事件传播过程中发生异常时,首先调用发生异常所在handler的exceptionCaught方法,即InBoundHandlerB.exceptionCaught():

// InBoundHandlerB
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("InBoundHandlerB.exceptionCaught()");
    ctx.fireExceptionCaught(cause);
}

然后调用ctx.fireExceptionCaught(cause);继续传播异常事件:

// InBoundHandlerB对应的ChannelHandlerContext
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
  	// next: InBoundHandlerC
    invokeExceptionCaught(next, cause); // 直接调用next节点的exceptionCaught方法
    return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
    ObjectUtil.checkNotNull(cause, "cause");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeExceptionCaught(cause); // 这里!!!
    } else {
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeExceptionCaught(cause);
                }
            });
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to submit an exceptionCaught() event.", t);
                logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
            }
        }
    }
}
private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause); // InBoundHandlerC
        } catch (Throwable error) {
            // 省略..
        }
    } else {
        fireExceptionCaught(cause);
    }
}

可以看到,在异常发生节点InBoundHandlerB继续传播事件时,是直接调用了InBoundHandlerB对应context节点的next节点InBoundHandlerC的exceptionCaught方法,而不管下一个节点是inbound还是outbound类型。就这样,异常事件按顺序经过InBoundHandlerB、InBoundHandlerC、OutBoundHandlerA、OutBoundHandlerB、OutBoundHandlerC,最终到达TailContext:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    onUnhandledInboundException(cause);
}
protected void onUnhandledInboundException(Throwable cause) {
    try {
        logger.warn(
                "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                        "It usually means the last handler in the pipeline did not handle the exception.",
                cause);
    } finally {
        ReferenceCountUtil.release(cause);
    }
}

如果TailContext之前的handler都未处理该异常事件,在TailContext将以warn日志的方式记录该异常信息,并释放内存。

以上是inbound事件传播过程中发生异常,异常事件的传播过程。

2.outbound事件传播过程中发生异常

下面以channel.writeAndFlush事件的传播为例,分析outbound事件传播过程中发生异常时,异常事件的传播细节。

// AbstractChannelHandlerContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound(); // 某个outbound handler
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise); // 这里!!!
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0(); // 异常捕获
    } else {
        writeAndFlush(msg, promise);
    }
}

假设writeAndFlush事件传播至context链表中的某个节点,因此将调用以上的invokeWriteAndFlush()方法。继续看 invokeWrite0(msg, promise);中的逻辑:

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
    if (!(promise instanceof VoidChannelPromise)) {
        PromiseNotificationUtil.tryFailure(promise, cause, logger); // promise设置为失败
    }
}

可见,如果在ChannelOutboundHandler.write()方法中发生异常,只是调用notifyOutboundHandlerException()方法,将promise设置为失败状态,不抛出任何异常。

再来看 invokeFlush0();的逻辑:

private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

可见,如果在ChannelOutboundHandler.flush()方法中发生异常,将调用notifyHandlerException()方法:

private void notifyHandlerException(Throwable cause) {
    // ...省略

    invokeExceptionCaught(cause);
}
 private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            // 省略...
        }
    } else {
        fireExceptionCaught(cause);
    }
}

同样的,会触发异常事件从当前节点向后传播,最后到达TailContext。

3.异常事件传播的总结

如果在inbound或者outbound事件传播的过程中抛出异常,异常事件exceptionCaught会按照ChannelHandler添加的顺序,从当前节点开始传播到尾部TailContext(与inbound、outbound无关)。

4.异常处理的最佳实践

根据以上异常事件传播的细节,针对上面的例子,异常处理的最佳实践如下:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new InBoundHandlerA());
                ch.pipeline().addLast(new InBoundHandlerB());
                ch.pipeline().addLast(new InBoundHandlerC());
                ch.pipeline().addLast(new OutBoundHandlerA());
                ch.pipeline().addLast(new OutBoundHandlerB());
                ch.pipeline().addLast(new OutBoundHandlerC());
                ch.pipeline().addLast(new ExceptionCaughtHandler()); // 异常处理器
            }
        });

public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // ..
        if (cause instanceof BusinessException) {
            System.out.println("BusinessException");
        }
    }
}

由于异常事件最终会传播到pipeline尾部,因此可以在channel pipeline的尾部增加全局异常处理器,针对不同异常做不同的处理,用于处理pipeline前面的ChanelHandler未捕获的异常。

五、面试问题

1.Netty是如何判断ChannelHandler类型的?

​ 在添加ChannelHandler并创建ChannelHandlerContext的时候,通过instanceof判断handler是否是ChannelInboundHandler和ChannelOutboundHanler,并将结果保存到AbstractChannelHandlerContext的inbound和outbound两个boolean变量中。

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler; // 保存handler
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler; // 判断是否是ChannelInboundHandler
    }

    private static boolean isOutbound(ChannelHandler handler) { // 判断是否是ChannelOutboundHandler
        return handler instanceof ChannelOutboundHandler;
    }
}
2.对于ChannelHandler的添加应该遵循什么样的顺序?

对于inbound事件的传播,事件的处理顺序与ChannelInboundHandler的添加顺序相同;对于outbound事件的传播,事件的处理顺序与ChannelOutboundHandler的添加顺序相反。对于异常事件的传播,事件的处理顺序与ChannelHandler的添加顺序相同,与inbound、outbound无关。

3.用户手动触发事件传播,不同的触发方式有什么样的区别?

​ (1) ctx.channel.xxx(): 对于inbound事件,从pipeline头部节点head开始传播;对于outbound事件,从pipeline尾部节点tail开始传播。

​ (2) ctx.xxx() : 对于inbound事件,从当前节点下一节点开始传播(指向尾部tail);对于outbound事件,从当前节点下一节点开始传播(指向头部head)。

xxx()方法指fireChannelRead、write等方法。

参考文章