Dubbo通信框架Netty4

Netty4

Posted by Jay on April 1, 2019

Dubbo通信框架Netty4

Netty4是2.5.6引入的,2.5.6之前的Netty用的是Netty3。在Dubbo源码中相较于Netty3,添加Netty4主要仅仅改了两个类:NettyServer,NettyClient,还有就是编解码。

使用方式:

提供者端:

<dubbo:protocol server="netty4" />
<dubbo:provider server="netty4" />

消费者端:

<dubbo:consumer client="netty4" />

一、服务提供者端NettyServer

class NettyServer extends AbstractServer implements Server {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    // 绑定到服务器的数据通道映射表
    // {@code <ip:port(消费者), channel(NettyChannel)>}
    private Map<String, Channel> channels;

    // 服务器引导程序
    private ServerBootstrap bootstrap;

    // Netty数据通道(Server Bind)
    private io.netty.channel.Channel channel;

    // Netty主线程的NIO事件轮询组,用于处理事件
    private EventLoopGroup bossGroup;
    // Netty工作者线程的NIO事件轮询组,用于处理具体的请求
    private EventLoopGroup workerGroup;

    // 创建 NettyServer 实例
    // @param url 提供者url
    // @param handler DecodeHandler实例
    NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

    @Override
    protected void doOpen() throws Throwable {
        // 设置logger factory
        NettyHelper.setNettyLoggerFactory();

        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        // Netty服务端处理器
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                // 设置TCP底层相关的属性
                // 是否开启Nagle算法,true表示关闭,要求数据的高实时性
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                // 是否允许重复使用本地地址和端口,true表示允许
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                // 使用对象池,重用缓冲区
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 构造器参数: <DubboCountCodec,提供者url,ChannelHandler(NettyServer实例)>
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())  // 解码器
                                .addLast("encoder", adapter.getEncoder())  // 编码器
                                .addLast("handler", nettyServerHandler);   // 服务端逻辑处理器,处理入站、出站事件
                    }
                });
        // bind [id: 0x7b324585, /0.0.0.0:20881] 监听在任意ip地址
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        // 同步等待bind完成
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

    @Override
    protected void doClose() throws Throwable {
        try {
            if (channel != null) {
                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {
                for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (bootstrap != null) {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    @Override
    public Collection<Channel> getChannels() {
        Collection<Channel> chs = new HashSet<Channel>();
        for (Channel channel : this.channels.values()) {
            if (channel.isConnected()) {
                chs.add(channel);
            } else {
                // 若发现数据通道已断开连接,则移除
                channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
            }
        }
        return chs;
    }

    @Override
    public Channel getChannel(InetSocketAddress remoteAddress) {
        return channels.get(NetUtils.toAddressString(remoteAddress));
    }

    @Override
    public boolean isBound() {
        return channel.isActive();
    }

}

Netty4的写法与Netty3有很大不同,下面是Netty3

// 启动Netty服务,监听客户端连接
protected void doOpen() throws Throwable {
    // 设置logger factory
    NettyHelper.setNettyLoggerFactory();
    // boss worker线程池
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker,
            getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory); // 线程模型、IO模型
    // getUrl()----提供者url dubbo://....
    // ChannelHandler 处理入站、出站事件
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    // <ip:port(消费者), channel(NettyChannel)>
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    // 设置pipeline创建工厂
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            // 构造器参数: <DubboCountCodec,提供者url,ChannelHandler(NettyServer实例)>
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder()); // 解码器
            pipeline.addLast("encoder", adapter.getEncoder()); // 编码器
            pipeline.addLast("handler", nettyHandler); // 服务端逻辑处理器,处理入站、出站事件
            return pipeline;
        }
    });
    // bind [id: 0x7b324585, /0.0.0.0:20881] 监听在任意ip地址
    channel = bootstrap.bind(getBindAddress());
}

二、消费者端NettyClient

class NettyClient extends AbstractClient {

    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    // Netty客户端的NIO事件轮询事件组
    private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS,
            new DefaultThreadFactory("NettyClientWorker", true));

    // 客户端引导程序
    private Bootstrap bootstrap;

    // Netty数据通道
    private volatile Channel channel;

    // 初始化
    // @param url 合并消费者参数等之后的提供者url
    // @param handler DecodeHandler实例
    NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        // wrapChannelHandler(url, handler)--MultiMessageHandler实例
        super(url, wrapChannelHandler(url, handler));
    }

    @Override
    protected void doOpen() throws Throwable {
        // 设置logger factory
        NettyHelper.setNettyLoggerFactory();

        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                // 设置TCP底层相关的属性
                // 这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,
                // TCP会自动发送一个活动探测数据报文。
                .option(ChannelOption.SO_KEEPALIVE, true)
                // 是否开启Nagle算法,true表示关闭,要求数据的高实时性
                .option(ChannelOption.TCP_NODELAY, true)
                // 使用对象池,重用缓冲区
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);

        // 设置连接超时时间。getTimeout--RPC调用超时时间
        if (getTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
        }

        // Netty客户端处理器
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap.handler(new ChannelInitializer() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                // 构造器参数: <DubboCountCodec,合并消费者参数之后的提供者url,ChannelHandler(NettClient实例)>
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder()) // 解码器
                        .addLast("encoder", adapter.getEncoder()) // 编码器
                        .addLast("handler", nettyClientHandler);  // 客户端逻辑处理器,处理入站、出站事件
            }
        });
    }

    @Override
    protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis(); // 连接开始时间
        // getConnectAddress()返回服务端地址
        ChannelFuture future = bootstrap.connect(getConnectAddress());  // 异步操作,返回的是一个future
        try {
            // 在指定的时间内,等待连接建立
            boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);

            // 指定时间内建立连接成功
            if (ret && future.isSuccess()) {
                Channel newChannel = future.channel();
                try {
                    // 关闭旧的连接(copy reference)
                    Channel oldChannel = NettyClient.this.channel;
                    if (oldChannel != null) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                            }
                            oldChannel.close();
                        } finally {
                            // 移除channel缓存
                            NettyChannel.removeChannelIfDisconnected(oldChannel);
                        }
                    }
                } finally {
                    // 客户端是否关闭
                    if (NettyClient.this.isClosed()) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                            }
                            newChannel.close();
                        } finally {
                            NettyClient.this.channel = null;
                            NettyChannel.removeChannelIfDisconnected(newChannel);
                        }
                    } else {
                        // 这里成员变量channel进行复制
                        NettyClient.this.channel = newChannel;
                    }
                }
            } else if (future.cause() != null) {
                // 建立连接失败,发生了异常
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
            } else {
                // 在建立连接过程中超时了
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + " client-side timeout "
                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
            }
        } finally {
            if (!isConnected()) {
                //future.cancel(true);
            }
        }
    }

    @Override
    protected void doDisconnect() throws Throwable {
        try {
            // 移除channel缓存
            NettyChannel.removeChannelIfDisconnected(channel);
        } catch (Throwable t) {
            logger.warn(t.getMessage());
        }
    }

    @Override
    protected void doClose() throws Throwable {
        //can't shutdown nioEventLoopGroup
        //nioEventLoopGroup.shutdownGracefully();
    }

    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        // channel为null或channel连接断开,则为null
        if (c == null || !c.isActive()) {
            return null;
        }
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }

}

Netty3:

@Override
protected void doOpen() throws Throwable {
    // 设置logger factory
    NettyHelper.setNettyLoggerFactory();
    bootstrap = new ClientBootstrap(channelFactory);
    // config 配置Channel TCP 属性
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    bootstrap.setOption("keepAlive", true); // TCP心跳
    bootstrap.setOption("tcpNoDelay", true); // 关闭Nagle算法,实时性高
    bootstrap.setOption("connectTimeoutMillis", getTimeout()); // 连接超时
    // ChannelHandler 处理入站、出站事件
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            // 构造器参数: <DubboCountCodec,提供者url(消费者端是合并消费者参数之后的提供者url),ChannelHandler(NettClient实例)>
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);

            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder()); // 解码器
            pipeline.addLast("encoder", adapter.getEncoder()); // 编码器
            pipeline.addLast("handler", nettyHandler); // 客户端逻辑处理器,处理入站、出站事件
            return pipeline;
        }
    });
}

@Override
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis(); // 连接开始时间
    // getConnectAddress()返回服务端地址
    ChannelFuture future = bootstrap.connect(getConnectAddress()); // 异步操作,返回的是一个future
    try {
        // 在指定的超时时间内,等待连接建立
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
        // 指定时间内建立连接成功
        if (ret && future.isSuccess()) {
            Channel newChannel = future.getChannel();
            newChannel.setInterestOps(Channel.OP_READ_WRITE); // 设置Channel感兴趣的事件是读写事件
            try {
                // 关闭旧的连接
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        }
                        oldChannel.close();
                    } finally {
                        // 移除channel缓存
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                // 客户端是否关闭
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                        }
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    // 这里成员变量channel进行复制
                    NettyClient.this.channel = newChannel;
                }
            }
        } else if (future.getCause() != null) {
            // 建立连接失败,发生了异常
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + ", error message is: " + future.getCause().getMessage(), future.getCause());
        } else {
            // 在建立连接过程中超时了
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + " client-side timeout "
                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
        }
    } finally {
        if (!isConnected()) {
            // 取消连接
            future.cancel();
        }
    }
}

三、编解码(后面详细分析)

Netty4:

final class NettyCodecAdapter {

    // 消息编码器
    private final ChannelHandler encoder = new InternalEncoder();

    // 消息解码器
    private final ChannelHandler decoder = new InternalDecoder();

    private final Codec2         codec;
    
    private final URL            url;

    // 数据通道处理器
    private final com.alibaba.dubbo.remoting.ChannelHandler handler;

    // 初始化
    // @param codec  DubboCountCodec实例
    // @param url 提供者url(消费者端是合并消费者参数之后的提供者url)
    // @param handler NettyServer/NettyClient
    NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
    }

    ChannelHandler getEncoder() {
        return encoder;
    }

    ChannelHandler getDecoder() {
        return decoder;
    }

    // 编码器
    private class InternalEncoder extends MessageToByteEncoder {

        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            // 包装Netty ByteBuf
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
            Channel ch = ctx.channel();
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                // 编码
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
        }
    }

    // 解码器
    private class InternalDecoder extends ByteToMessageDecoder {

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
            // 包装Netty ByteBuf
            ChannelBuffer message = new NettyBackedChannelBuffer(input);

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            Object msg;

            int saveReaderIndex;

            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    msg = codec.decode(channel, message);
                    // 解码时发现需要更多的字节数据
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        // 则设置读指针
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        //is it possible to go here ?
                        if (saveReaderIndex == message.readerIndex()) {
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            out.add(msg);
                        }
                    }
                } while (message.readable());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    }
}

Netty3:

final class NettyCodecAdapter {

    private final ChannelHandler encoder = new InternalEncoder();

    private final ChannelHandler decoder = new InternalDecoder();

    private final Codec2 codec;

    private final URL url;

    private final int bufferSize;

    private final com.alibaba.dubbo.remoting.ChannelHandler handler;

    // @param codec DubboCountCodec实例
    // @param url 提供者url(消费者端是合并消费者参数之后的提供者url)
    // @param handler NettyServer/NettyClient
    public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
        int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
        this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
    }

    public ChannelHandler getEncoder() {
        return encoder;
    }

    public ChannelHandler getDecoder() {
        return decoder;
    }

    // 出站写事件 编码
    @Sharable
    private class InternalEncoder extends OneToOneEncoder {

        @Override
        protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                    com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
            return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
        }
    }

    // 入站读事件ChannelHandler 解码
    private class InternalDecoder extends SimpleChannelUpstreamHandler {

        private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            Object o = event.getMessage();
            if (!(o instanceof ChannelBuffer)) {
                ctx.sendUpstream(event);
                return;
            }

            ChannelBuffer input = (ChannelBuffer) o;
            int readable = input.readableBytes();
            if (readable <= 0) {
                return;
            }

            com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
            if (buffer.readable()) {
                if (buffer instanceof DynamicChannelBuffer) {
                    buffer.writeBytes(input.toByteBuffer());
                    message = buffer;
                } else {
                    int size = buffer.readableBytes() + input.readableBytes();
                    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                            size > bufferSize ? size : bufferSize);
                    message.writeBytes(buffer, buffer.readableBytes());
                    message.writeBytes(input.toByteBuffer());
                }
            } else {
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                        input.toByteBuffer());
            }

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            Object msg;
            int saveReaderIndex;

            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());
            } finally {
                if (message.readable()) {
                    message.discardReadBytes();
                    buffer = message;
                } else {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                }
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            ctx.sendUpstream(e);
        }
    }
}