单元测试

一、概述和原理

    Netty 的单元测试,主要是对业务逻辑的 ChannelHandler 做测试(毕竟对
Bootstrap、EventLoop
这些做测试着实没有多大意义),模拟一次入站数据或者出站数据,查看数据流经
ChannelHandler 变成什么样了,以此达到测试的目的。

    Netty 的单元测试将Junit4作为测试框架,将 EmbeddedChannel
作为测试通道。基本原理就是:将入站数据或者出站数据写入 EmbeddedChannel
中,然后检查是否有任何东西到达了 ChannelPipeline
的尾端。以这种方式,你便可以知道消息是否流经了 ChannelHandler
以及是否触发了任何 ChannelHandler 的动作,如下图:

图片 1

    EmbeddedChannel 提供了如下方法进行单元测试:

writeInbound(Object… msgs):
将入站消息写到 EmbeddedChannel 中。如果可以通过 readInbound()方法从
EmbeddedChannel 中读取数据,则返回 true。
readInbound() :从 EmbeddedChannel
中读取一个入站消息。任何返回的东西都穿越了整个
ChannelPipeline。如果没有任何可供读取的, 则返回 null。
writeOutbound(Object… msgs):
将出站消息写到EmbeddedChannel中。如果现在可以通过readOutbound()方法从
EmbeddedChannel 中读取到什么东西,则返回 true。
readOutbound(): 从 EmbeddedChannel
中读取一个出站消息。任何返回的东西都穿越了整个
ChannelPipeline。如果没有任何可供读取的,则返回 null。
finish() :将 EmbeddedChannel
标记为完成,并且如果有可被读取的入站数据或者出站数据,则返回
true。这个方法还将会调用 EmbeddedChannel 上的close()方法。

一、概念

   
先来整体的介绍一下这篇博文要介绍的几个概念(Channel、ChannelHandler、ChannelPipeline、ChannelHandlerContext、ChannelPromise):

Channel:Netty 中传入或传出数据的载体;
ChannelHandler:Netty 中处理入站和出站数据的应用程序逻辑的容器;
ChannelPipeline:ChannelHandler链 的容器;
ChannelHandlerContext:代表了 ChannelHandler 和 ChannelPipeline
之间的关联,每当有ChannelHandler 添加到 ChannelPipeline 中时,都会创建
ChannelHandlerContext;
ChannelPromise:ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),
从而使ChannelFuture不可变。

   
我们来举一个例子描述这些概念之间的逻辑关系:服务端接收到客户端的连接请求,创建一个Channel同客户端进行绑定,新创建的
Channel
会都将会被分配一个新的ChannelPipeline(这项关联是永久性的,Channel
既不会附加另外一个ChannelPipeline,也不能分离当前的)。而
ChannelPipeline 作为 ChannelHandler链 的容器,当Channel
生命周期中状态发生改变时,将会生成对应的事件,这些事件将会被
ChannelPipeline 中 ChannelHandler 所响应,响应方法的参数一般都有一个
ChannelHandlerContext ,一个 ChannelHandler 对应一个
ChannelHandlerContext,ChannelHandlerContext 代表了 ChannelHandler 和
ChannelPipeline 之间的关联,这项关联也是永远不会改变的。

图片 2

本文是Netty文集中“Netty in action”系列的文章。主要是对Norman Maurer
and Marvin Allen Wolfthal 的 《Netty in
action》一书简要翻译,同时对重要点加上一些自己补充和扩展。

二、测试入站数据

    1、将我们要测试的 ChannelHandler 写入 EmbeddedChannel 进行测试。

    2、writeInbound(Object… msgs)
将数据写入EmbeddedChannel(模拟接收数据)。

    3、ChannelHandler 处理后如果有返回数据,可以通过readInbound()
验证数据结果。如果没有返回数据,可以在 ChannelHandler
业务逻辑中,打印日志,以达到测试目的。

public class DecoderTest {


    //1、利用Junit执行单元测试
    @Test
    public void decoderTest() throws IllegalAccessException {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf buf1 = buf.duplicate();
        //2、创建EmbeddedChannel,并添加一个Decoder(我们的要测试 ChannelHandler) 其将以3字节帧长度被测试
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Decoder(3));
        //3、将数据写入 EmbeddedChannel
        boolean writeInbound = embeddedChannel.writeInbound(buf1.retain());
        assertTrue(writeInbound);
        //4、标记 Channel 为已完成状态
        boolean finish = embeddedChannel.finish();
        assertTrue(finish);

        //5、读取数据
        ByteBuf readInbound =  embeddedChannel.readInbound();
        ByteBuf readSlice = buf.readSlice(3);
        assertEquals(readInbound, readSlice);
        readInbound.release();

        readInbound =  embeddedChannel.readInbound();
        readSlice = buf.readSlice(3);
        assertEquals(readInbound, readSlice);
        readInbound.release();

        readInbound =  embeddedChannel.readInbound();
        readSlice = buf.readSlice(3);
        assertEquals(readInbound, readSlice);
        readInbound.release();

        //是否读取完数据了
        assertNull(embeddedChannel.readInbound());
        //释放资源
        buf.release();

    }
}

二、ChannelHandler

 
  Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议的ChannelHandler。因此,我们在自定义ChannelHandler实现用于处理我们的程序逻辑时,只需要继承Netty
的一些默认实现即可,主要有两种:

1、继承 ChannelHandlerAdapter (在4.0 中 处理入站事件继承
ChannelInboundHandlerAdapter,处理出站事件继承 ChannelOutboundHandlerAdapter ;在5.0 推荐直接继承
ChannelHandlerAdapter)

2、继承 SimpleChannelInboundHandler

    这两种方式有什么区别呢?  当我们处理
入站数据 和
出站数据时,都需要确保没有任何的资源泄露。在入站方向,继承 SimpleChannelInboundHandler
的实现类会在消息被处理之后自动处理消息,而继承 ChannelHandlerAdapter
的实现类需要手动的释放消息(ReferenceCountUtil.release(msg));在出站方向,不管继承的是哪一种的实现类,当你处理了
write() 操作并丢弃了一个消息,那么你就应该释放它,不仅如此,还要通知
ChannelPromise。否则可能会出现 ChannelFutureListener
收不到某个消息已经被处理了的通知的情况。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ReferenceCountUtil.release(msg);
    //ChannelPromise 是ChannelFuture的一个子类,设置成 true 通知 ChannelFutureListener 消息已经被处理了
    //当一个 Promise 被完成之后,其对应的 Future 的值便不能再进行任何修改了
    promise.setSuccess();
}

    tips:总之,如果一个消息被消费或者丢弃了,
并且没有传递给 ChannelPipeline 中的下一个ChannelOutboundHandler,
那么用户就有责任调用 ReferenceCountUtil.release()。

    来看看 ChannelHandler 的
API:

isSharable :如果其对应的实现被标注为
Sharable, 那么这个方法将返回 true, 表示它可以被添加到多个
ChannelPipeline中

— ChannelHandler 生命周期方法 —
handlerAdded :当把 ChannelHandler 添加到
ChannelPipeline 中时被调用
handlerRemoved :当从 ChannelPipeline
中移除 ChannelHandler 时被调用
exceptionCaught : 当处理过程中在
ChannelPipeline 中有错误产生时被调用

— 处理入站数据以及各种状态变化 —
channelRegistered : 当 Channel
已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
channelUnregistered : 当 Channel 从它的
EventLoop 注销并且无法处理任何 I/O 时被调用
channelActive : 当 Channel
处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
channelInactive : 当 Channel
离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete :
当Channel上的一个读操作完成时被调用
channelRead : 当从 Channel
读取数据时被调用
ChannelWritabilityChanged :当 Channel
的可写状态发生改变时被调用。
userEventTriggered : 当
ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用,因为一个
POJO 被传经了 ChannelPipeline

— 处理出站数据并且允许拦截所有的操作 —
bind : 当请求将 Channel
绑定到本地地址时被调用
connect : 当请求将 Channel
连接到远程节点时被调用
disconnect : 当请求将 Channel
从远程节点断开时被调用
close : 当请求关闭 Channel
时被调用
deregister(5.0中被废弃) : 当请求将
Channel 从它的 EventLoop 注销时被调用
read : 当请求从 Channel
读取更多的数据时被调用
flush : 当请求通过 Channel
将入队数据冲刷到远程节点时被调用
write :当请求通过 Channel
将数据写到远程节点时被调用

概要

  • ChannelHandler 和 ChannelPipeline 的 API
  • 资源泄漏检测
  • 异常处理

三、测试出站数据

    1、将我们要测试的 ChannelHandler 写入 EmbeddedChannel 进行测试。

    2、writeOutbound(Object… msgs)
将数据写入EmbeddedChannel(模拟发送数据)。

    3、ChannelHandler 处理后如果有返回数据,可以通过readOutbound()
验证数据结果。如果没有返回数据,可以在 ChannelHandler
业务逻辑中,打印日志,以达到测试目的。

public class EncoderTest {

    @Test
    public void encoderTest(){
        ByteBuf buf = Unpooled.buffer();
        for (int i =1; i < 10; i++){
            buf.writeInt(i * -1);
        }
        //1、创建一个EmbeddedChannel 并安装要测试的Encoder
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Encoder());
        //2、写入数据
        assertTrue(embeddedChannel.writeOutbound(buf));
        assertTrue(embeddedChannel.finish());
        //3、读取数据
        for (int i = 1; i < 10; i++){
            Object o = embeddedChannel.readOutbound();
            System.out.println(o);
        }
        assertNull(embeddedChannel.readOutbound());
    }
}

三、ChannelPipeline

    ChannelPipeline 是一个拦截流经 Channel
的入站和出站事件的ChannelHandler 实例链,它和 ChannelHandler
之间的交互组成了应用程序数据和事件处理逻辑的核心,而它们之间的关联交互就是通过 ChannelHandlerContext。

    如果一个入站事件被触发,它将被从 ChannelPipeline
的头部开始一直被传播到 Channel Pipeline 的尾端。如图,Netty 总是将
ChannelPipeline
的入站口作为头部,而将出站口作为尾端,如图,第一个被入站事件看到的
ChannelHandler 将是1,而第一个被出站事件看到的是 ChannelHandler 将是
5。稍微总结下这句拗口的话,入站事件顺序执行(1—>2—>3—>4—>5)、出站事件逆序执行(5—>4—>3—>2—>1)。

图片 3

    既然 ChannelPipeline 是 ChannelHandler链
的容器,让我们来看看ChannelPipeline 是如何管理 ChannelHandler的吧!

addFirst : 将一个 ChannelHandler 添加到
ChannelPipeline 最开始位置中
addBefore :将一个 ChannelHandler 添加到
ChannelPipeline 某个ChannelHandler前
addAfter:将一个 ChannelHandler 添加到
ChannelPipeline 某个ChannelHandler后
addLast : 将一个 ChannelHandler 添加到
ChannelPipeline 最末尾位置
remove :将一个 ChannelHandler 从
ChannelPipeline 中移除
replace :将 ChannelPipeline 中的一个
ChannelHandler 替换为另一个 ChannelHandler
get :通过类型或者名称返回
ChannelHandler
context :返回和 ChannelHandler 绑定的
ChannelHandlerContext
names :返回 ChannelPipeline 中所有
ChannelHandler 的名称

    ChannelPipeline 的API
用于调用入站操作的附加方法:

fireChannelRegistered: 调用
ChannelPipeline 中下一个 ChannelInboundHandler 的
channelRegistered(ChannelHandlerContext)方法
fireChannelUnregistered: 调用
ChannelPipeline 中下一个 ChannelInboundHandler
的channelUnregistered(ChannelHandlerContext)方法
fireChannelActive: 调用 ChannelPipeline
中下一个 ChannelInboundHandler
的channelActive(ChannelHandlerContext)方法
fireChannelInactive: 调用
ChannelPipeline 中下一个 ChannelInboundHandler
的channelInactive(ChannelHandlerContext)方法
fireExceptionCaught: 调用
ChannelPipeline 中下一个 ChannelInboundHandler
的exceptionCaught(ChannelHandlerContext, Throwable)方法
fireUserEventTriggered: 调用
ChannelPipeline 中下一个 ChannelInboundHandler
的userEventTriggered(ChannelHandlerContext, Object)方法
fireChannelRead: 调用 ChannelPipeline
中下一个 ChannelInboundHandler 的channelRead(ChannelHandlerContext,
Object msg)方法
fireChannelReadComplete: 调用
ChannelPipeline 中下一个 ChannelInboundHandler
的channelReadComplete(ChannelHandlerContext)方法
fireChannelWritabilityChanged: 调用
ChannelPipeline 中下一个 ChannelInboundHandler
的channelWritabilityChanged(ChannelHandlerContext)方法

    ChannelPipeline 的API
用于调用出站操作的附加方法:

bind: 将 Channel
绑定到一个本地地址,这将调用 ChannelPipeline
中的下一个ChannelOutboundHandler 的 bind方法
connect: 将 Channel
连接到一个远程地址,这将调用 ChannelPipeline
中的下一个ChannelOutboundHandler 的 connect方法
disconnect: 将 Channel
断开连接。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的
disconnect方法
close: 将 Channel 关闭。这将调用
ChannelPipeline 中的下一个 ChannelOutboundHandler 的 close方法
deregister(5.0中被废弃): 将 Channel
从它先前所分配的 EventExecutor(即 EventLoop)中注销。这将调用
ChannelPipeline 中的下一个 ChannelOutboundHandler 的
deregister方法
flush:
冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个ChannelOutboundHandler
的 flush方法
write: 将消息写入 Channel。这将调用
ChannelPipeline 中的下一个
ChannelOutboundHandler的write方法。注意:这并不会将消息写入底层的
Socket,而只会将它放入队列中。要将它写入 Socket,需要调用 flush()或者
writeAndFlush()方法
writeAndFlush: 这是一个先调用
write()方法再接着调用 flush()方法的便利方法
read: 请求从 Channel
中读取更多的数据。这将调用 ChannelPipeline
中的下一个ChannelOutboundHandler 的
read(ChannelHandlerContext)方法

The ChannelHandler family

四、结语

    截止到这篇文章,Netty
的基础部分差不多就结束了。不幸的是,公司安排我去研究下 Docker
技术,想在我们项目中使用起来。所以 Netty
的学习就不得不告一段落了~~才翻完《Netty
实战》一半的内容,后面还有编解码器、网络协议、案例研究三个部分,只能先欠下了~

 

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/unit

四、ChannelHandlerContext

    ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline
之间的关联。

    ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和
ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者
ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline
进行传播。而调用位于
ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler
开始,并且只会传播给位于该ChannelPipeline 中的下一个能够处理该事件的
ChannelHandler。因此,尽量使用 ChannelHandlerContext
的同名方法来处理逻辑,因为它将产生更短的事件流,
应该尽可能地利用这个特性来获得最大的性能。

Channel生命周期

Channel接口定义了简单但强大的状态模式来紧密的联系ChannelInboundHandler
API。包括了4种状态

图片 4

Channel正常生命周期状态改变如下图:

图片 5

当遇到状态改变时,相应的事件会被产生。该事件会在ChannelPipeline中的ChannelHandlers传递。

五、异常处理

    入站异常处理:

1、ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline
中的下一个 ChannelHandler;
2、如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;
3、要想定义自定义的处理逻辑,你需要重写
exceptionCaught()方法。一般将这个Channelhandler放在 ChannelPipeline
的最后,确保所有的入站异常都总会被处理。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.channel().close();
}

    出站异常处理:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
    //出站异常处理
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                future.cause().printStackTrace();
                future.channel().close();
            }
        }
    });
}
ChannelHandler的生命周期

图片 6

当一个ChannelHandler添加到一个ChannelPipeline或从一个ChannelPipeline中移除时,会调用ChannelHandler生命周期中相应的方法。每个方法都会接收一个ChannelHandlerContext参数。

注意,Channel生命周期状态改变时会有相应的事件产生,并且该事件会在ChannelPipeline中的ChannelHandlers传递。但,ChannelHandler生命周期状态改变时并不会有相应的事件产生与ChannelPipeline中传播,只有回调当前这个ChannelHandler的某个方法而已。这是很好理解的,因为ChannelPipeline中所有的ChannelHandler都是服务于一个Channel的,因此Channel的状态改变了,ChannelPipeline中的ChannelHandler都有可能需要做出相应的处理。而ChannelHandler自身的状态和其他ChannelHandler并无关系,所以ChannelHandler状态发生改变时,会有当前这个ChannelHandler相应的方法会被回调而已。

Netty定义了两个非常重要的ChannelHandler的子接口:

  • ChannelInboundHandler —— 处理入站数据和所有类型状态的改变
  • ChannelOutboundHandler —— 处理所有的出站数据和允许拦截所有的操作。

六、寄语

   
有时候也会迷茫,身边的人理论基础差一些的的,代码不一样敲的好好的?而我花大量时间细细的去研究这么理论真的值得吗?仔细想想,人生很多事情本来就是徒劳无功的啊,没必要急功近利,欲速则不达。要坚信,一切的付出总会在人生的某个时刻回报在我们身上。

ChannelInboundHandler 接口

下表列出了ChannelInboundHandler接口生命周期的所有方法。这些方法将被调用在接收数据或者Channel相关状态改变时。正如我们早前提及的,这些方法与Channel的生命周期紧密映射。

图片 7

当一个ChannelInboundHandler的实现重写了channelRead(),它需要负责明确释放ByteBuf实例相关的内存。为了达到这个目的,Netty提供了一个实用的方法:ReferenceCountUtil.release()

图片 8

Netty会通过警告级别的日志消息打印未释放的资源,这将能够简单的从代码中寻找出未释放的实例。但是通过这种方式管理资源是非常笨拙的。一个简单的替换方式是使用SimpleChannelInboundHandler,如下所示

图片 9

因为SimpleChannelInboundHandler会自动释放资源,你不能够存储任何消息的引用用于后面再次使用,因为这些消息会变得无效。

ChannelOutboundHandler 接口

通过ChannelOutboundHandler进行出站的操作和数据的处理。ChannelOutboundHandler的方法可以通过Channel、ChannelPipeline、ChannelHandlerContext调用。
ChannelOutboundHandler有一个强大的功能是根据需要可延迟一个操作或事件,这将允许通过一个复杂的方式去处理请求。例如,如果写数据到远端被暂停了,你能够延迟刷新操作并稍后恢复写操作。

图片 10

图片 11

ChannelPromise VS ChannelFuture
ChannelOutboundHandler的许多方法都会接受一个ChannelPromise参数用于当操作完成时得到一个通知。ChannelPromise是ChannelFuture的一个子类,ChannelPromise定义了可写入的方法,比如:setSuccess()或setFailure(),它使得ChannelFuture不可变(
因为,方法参数使用final域修饰,如下例子所示 )。
也就是说,ChannelPromise是一个特殊的ChannelFuture,ChannelPromise是可写的。从源码上看,ChannelOutboundHandler的write方法返回的ChannelFuture实际上就是ChannelPromise,ChannelPromise会在ChannelPipeline中传递。

图片 12

ChannelHandler 适配器

你能够使用ChannelInboundHandlerAdapter类和ChannelOutboundHandlerAdapter类作为你自己ChannelHandlers的入口。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。

图片 13

ChannelHandlerAdapter提供了一个非常实用的方法isSharable()。如果ChannelHandlerAdapter的实现类被注解了”@Sharable”,那么该方法将返回true,表明着该实现类实例能被添加到多个ChannelPipeline中,或者可以被添加到同个ChannelPipeline中多次。
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中方法通过调用ChannelHandlerContext等价的方法来将事件传递给pipeline中的下一个ChannelHandler。
如:

图片 14

你自己的处理器可以简单的实现适配器类,然后重写你想要自定义的方法。

资源管理

无论你通过调用 ChannelInboundHandler.channelRead() 或
ChannelOutboundHandler.writer()
来处理你的数据,你需要确保没有资源的泄漏。
Netty使用引用计数来处理ByteBufs。所以在你使用完一个ByteBuf时去调整引用计数是非常重要的。
为了帮助你诊断可能存在的问题,Netty提供了ResourceLeakDetector类,它将获取你应用已经分配的缓存中的1%数据作为采样来进行内存泄漏的检查。它的开销是非常小的。

如果一个泄漏被检测出来了,日志将打印类似下面的消息:

图片 15

图片 16

泄漏检查等级的设置是通过在下面的java系统属性来设置的:java -Dio.netty.leakDetectionLevel=ADVANCED或者通过调用ResourceLeakDetector.setLevel()方法来设置。

下面是一个典型的单元测试产生的泄漏报告:

图片 17

当入站消息不再传递给下一个ChannelInboundHandler时,通过ReferenceCountUtil.release(msg);来释放资源

图片 18

消费入站消息的简单方式:Netty提供了一个叫做SimpleChannelInboundHandler的ChannelInboundHandler的实现,该实现将自动释放一个消息,当该消息被channelRead0()消费后。源码如下:

图片 19

对于出站来说,如果你处理了一个write()操作并且需要废弃一个消息,那么你需要负责去释放该消息。

图片 20

注意,这里不仅需要释放资源,还需要通知ChannelPromise(即,上面的例子是通过调用promise.setSuccess()来通知异步的write操作已经成功完成)。
另一方面,还一种情况可能出现:ChannelFutureListener可能没有收到关于一个消息已经被处理的通知。

总而言之,用户有责任去通过调用ReferenceCountUtil.release()来释放一个已经被消费的消息或废弃并不会传递给ChannelPipeline中下一个ChannelOutboundHandler的消息。如果消息到真实的传输层,当他写完或Channel被关闭时将会被自动释放。

重要:

  • 内存泄漏针对于使用了池的ByteBuf,在从池中分配完ByteBuf后使用完又没有放回到池中。如,从池中分配ByteBuf,ctx.alloc()默认返回PooledByteBufAllocator

    图片 21

    👆这里要说明一点,如果是在正式项目中使用,没有rep.copy(),而是直接将rep传给writeAndFlush(…)是不会导致内存泄漏的,因为出站操作时,编码器encoder调用会自动释放资源。如果我们在正式项目中写rep.release();反而会导致错误。
    而在使用EmbeddedChannel测试入站操作时,直接将rep传给writeAndFlush(…)也是可以测出内存泄漏的,因为EmbeddedChannel测试入站操作时没有走出站流程,所以就导致从池中分配的ByteBuf一直无法得到释放。

    内存泄漏报告:

    图片 22

    非池分配,则不会有内存泄漏报告:

    图片 23

  • writeAndFlush(Bytebuf
    buf),如果是EmbeddedChannel单元测试且测试入站数据时,需要手动释放资源

    图片 24

    如果是EmbeddedChannel的出站数据测试,或正常程序运行,则不需要。因为编码器的encode()方法调用会自动释放资源。如果再手动调用反倒会报错:

    图片 25

Q:所以如果是涉及到I/O操作的就不需要用户去释放出站资源了吗?因为数据在网络层传输出去后会被自动释放?  
A:展开下encoder的底层:  
MessageToMessageEncoder:如果有执行encode(…)函数,则会在finally中释放资源,因为调用了encode函数则会生成新的ByteBuf或其他对象传递给下一个Encoder,那么当前传入的msg就已经不会再使用了,所以需要释放msg。  
如果没有执行encode(…)函数,则直接将msg传给下一个Encoder,由下一个Encoder执行一样的操作。

![](https://upload-images.jianshu.io/upload_images/4235178-94e778d9494af221.png)

MessageToByteEncoder:也会有相关的释放资源的操作。

图片 26

ChannelPipeline 接口

每一个新的Channel被创建时都会分配一个新的ChannelPipeline。他们的关系是永久不变的,Channel既不能依附于其他ChannelPipeline也不能和当前ChannelPipeline分离。这是一个固定的操作在Netty组件的生命周期中并且不需要开发者做任何的额外的操作。
一个事件要么被一个ChannelInboundHandler处理要么被一个ChannelOutboundHandler处理。随后该事件通过一个ChannelHandlerContext来实现传递给下一个具有一样父类的处理器,即同一个方向的处理器。

ChannelHandlerContext
ChannelHandlerContext使一个ChannelHandler能和它所在的ChannelPipeline中的其他的ChannelHandler交互。一个处理器能够通知ChannelPipeline中下一个ChannelHandler并且能够动态修改它所属的ChannelPipeline。
也就是说,事件是通过ChannelHandlerContext在ChannelPipeline中的ChannelHandler传递的。处理器在获得ChannelPipeline后,可以对其进行动态修改,即修改后即生效。

图片 27

ChannelPipeline根本上是一系列的ChannelHandlers。ChannelPipeline还提供了方法用于传播事件通过ChannelPipeline本身。

ChannelHandler在ChannelPipeline中的顺序是由我们通过ChannelPipeline.add*()方法来确定的。

当管道传播一个事件时,它会确定是否管道中下一个ChannelHandler符合移动的方向。如果不符合,ChannelPipeline会跳过这个ChannelHandler并继续操作下一个,直到它找到一个符合它想要的方向的ChannelHandler。(当然,一个handler可能同时实现了ChannelInboundHandler和ChannelOutboundHandler接口)

修改一个ChannelPipeline

ChannelHandler能够实时修改ChannelPipeline的布局,通过添加、删除、替换为其他ChannelHandlers
( 它也能将自己从ChannelPipeline中移除
)。这是ChannelHandler非常重要的功能。

图片 28

图片 29

ChannelHandler 的执行和阻塞
通常,ChannelPipeline中的每个ChannelHandler将事件传递给它所在的EventLoop
( I/O线程
)进行处理。非常重要的是不要阻塞这个线程,这样做会对该线程上所有的I/O处理造成负面影响。

有时候我们可能需要对接使用了阻塞的遗留代码。在这种情况下,可以通过ChannelPipeline的add()方法来接收一个EventExecutorGroup参数。如果一个事件被传给了一个自定义的EventExecutorGroup,它将被这个EventExecutorGroup中的某个EventExecutor所处理,并且会从Channel所在的EventLoop中移除。对于这种使用情况Netty提供了一个叫做DefaultEventExecutorGroup的实现。

图片 30

也就是说,只有传入的EventExecutorGroup的ChannelHandler才会放到EventExecutorGroup中执行,不会影响到ChannelPipeline中其他的ChannelHandler。
在通过ChannelHandlerContext将事件传播给下一个ChannelHandler的时候,又会依据下一个ChannelHandler关联的ChannelHandlerContext的executor()获得执行ChannelHandler的执行器,如果在将ChannelHandler添加到ChannelPipeline中时没有指定执行的EventExecutorGroup,那么默认就是在Channel所注册到的EventLoop所在线程上执行。

图片 31

事件触发

ChannelPipeline API 暴露了附加的方法用于调用入站和出站的操作。

图片 32

在出站方面,处理一个事件将导致底层socket采取某些动作。

图片 33

注意,ChannelPipeline附加的对入站和出站的操作,都只是触发ChannelPipeline中消息从管道头(入站操作)或管道尾(出站操作)开始处理该消息,ChannelPipeline这些方法本身并不会去对事件做一个逻辑处理。如下:

图片 34

总结:

  • 一个ChannelPipeline持有一个Channel相关的所有ChannelHandlers。
  • 能够根据需要通过添加和删除来动态修改一个ChannelPipeline。
  • ChannelPipeline拥有丰富的API用于调用action来回应入站和出站事件。

ChannelHandlerContext 接口

一个ChannelHandlerContext代表一个ChannelHandler和一个ChannelPipeline之间的关联,并且无论何时一个ChannelHandler被添加到一个ChannelPipeline时它都会被创建。ChannelHandlerContext一个主要的功能是管理与它相关的ChannelHandler在ChannelPipeline中与其他ChannelHandler间的交互。
ChannelHandlerContext有许多的方法,有些方法也出现在了Channel和ChannelPipeline类中,但是它们有着很重要的不同处。如果你通过一个Channel实例或ChannelPipeline实例调用这些方法,它们将会传播通过整个管道。但相同的方法通过ChannelHandlerContext被调用时,它将从当前关联的ChannelHandler开始并传播给管道中下一个能够处理该事件的ChannelHandler。
也就是说,ChannelPipeline和Channel中调用的方法都会通过整个管道;而ChannelHandlerContext调用的方法会从当前ChannelHandler对应方向的下一个ChannelHandler开始执行(
当前ChannelHandler也不会处理该事件 )。

图片 35

图片 36

从源码上看write操作是当前ChannelHandler对应的下一个ChannelOutboundHandler开始处理写操作,注意,出站的方向是从tail
-> head;

Q:从👆的API来看,read方法是会从pipeline中第一个入站缓冲区,read是通过整个pipeline吗?
A:是的,Channel会将每次读取到的数据传到pipeline中。注意,这里是每一次读到的数据,而不是读到完整的消息或全部读完数据,所以才有后面需要handler来解析收到的数组以组装成一个消息。
ChannelPipeline中的head的invokeChannelRead(..)的操作就是执行AbstractChannelHandlerContext的fireChannelRead方法,将msg传递给pipeline中的下个ChannelInboundHandler。

图片 37

当使用ChannelHandlerContext API时请注意下面几点:

  • ChannelHandlerContext关联的ChannelHandler是不会改变的,所以缓存ChannelHandlerContext引用是安全的。
  • 与其他类中有着相同名字的有效方法比较,ChannelHandlerContext方法提供了一个更短的事件流。这可被利用于提供一个更好的性能。
ChannelHandlerContext 的使用

图片 38

图片 39

从一个ChannelHandlerContext对象来访问Channel

图片 40

从一个ChannelHandlerContext对象来访问ChannelPipeline

上面两个例子是相同的。这里重要的是要注意,当通过Channel或ChannelPipeline调用write()的时候,事件会传播通过整个管道。它们通过ChannelHandlerContext将事件从一个处理器传递到下一个处理器。

图片 41

为什么你想要传播一个事件从ChannelPipeline中的指定的某一点开始?

  • 减小传递事件通过不感兴趣的ChannelHandlers的开销。即,通过事件不传递给不感兴趣的ChannelHandler来减小开销。
  • 为了防止对事件感兴趣的处理器处理事件

为了调用程序从一个指定的ChannelHandler开始,你必须引用一个指定ChannelHandler前一个的ChannelHandler相关联的ChannelHandlerContext。
👆这里再次说明,通过ChannelHandlerContext调用方法时,当前ChannelHandler是不会对事件进行处理的,而是从下一个ChannelHandler开始对事件进行处理。

图片 42

图片 43

ChannelHandler 和 ChannelHandlerContext 的高级用法

你能够通过ChannelHandlerContext的pipeline()方法获取的一个ChannelPipeline引用。它能在运行时操作管道中的ChannelHandlers,这能够被利用与实现复杂的设计。比如,你能够添加一个ChannelHandler到一个管道以支持一个动态协议的改变。
另一个高级用法是支持缓存一个ChannelHandlerContext引用,用于稍后使用,该引用能在任何ChannelHandler以外的方法中使用,甚至可以在不同的线程中使用。

图片 44

因为一个ChannelHandler能够属于多个ChannelPipeline,它能够被绑定到多个ChannelHandlerContext实例上。一个用于该目的的ChannelHandler必须有@Sharable注解;如果一个不具有@Sharable注解的ChannelHandler被尝试去添加到多个ChannelPipeline中将会触发一个异常。显然,为了安全的在多个并发Channel中使用ChannelHandler,可共享的ChannelHandler必须是线程安全的。

图片 45

图片 46

👆这个例子是错误的,因为它是非线程安全类。

为什么要共享一个ChannelHandler ?
一个常见的原因是:构建一个单例的ChannelHandler在多个ChannelPipeline中为了跨越多个Channels来收集统计资料。

ChannelHandler、ChannelHandlerContext 补充:
如果一个ChannelHandler被标识为了’Sharable’的,并且“该ChannelHandler对象被添加到一个ChannelPipeline中多次”
或者 “该ChannelHandler对象被添加到多个ChannelPipeline中一次或者多次”
都会导致该单一ChannelHandler拥有了多个ChannelHandlerContext。
当然,如果你添加一个非共享(即,未被标示为’Sharable’)的ChannelHandle到一个ChannelPipeline多次或则添加到多个ChannelPipeline中都会导致抛出一个ChannelPipelineException异常。

异常的处理

异常处理是非常重要的部分在任何实质应用中,并且它能通过多种方式进行处理。因此,Netty提供了几种选择用于处理异常的抛出在入站或出站处理中。

处理入站异常

如果一个异常在处理一个入站事件期间被抛出,它将从被触发该异常的ChannelInboundHandler所在的位置开始流经ChannelPipeline。
Q:这句话的意思是异常也是从抛出异常的当前handler开始处理,然后传递到管道中的下一个ChannelInboundHandler?
A:是的。异常会从当前的handler开始处理,如果当前的ChannelHandler的exceptionCaught方法有调用‘ctx.fireExceptionCaught(cause);’或者‘super.exceptionCaught(ctx,
cause);[其底层就是调用ctx.fireExceptionCaught(cause)]’就会将异常传递给管道中的一个ChannelInboundHandler

为了处理入站异常,你需要重写ChannelInboundHandler实现的exceptionCaught(…)方法:

图片 47

因为异常将继续流进入站方向的ChannelHandler( 就和其他入站事件一样
),实现该逻辑的ChannelInboundHandler通常位于ChannelPipeline中的最后面(
即,最后一个ChannelInboundHandler
)。这能确保所有入站中的异常总能被处理,无论该异常在ChannelPipeline中的哪里发生。

如何应对一个异常可能和你的应用的具体情况而定。你可能想要关闭这个channel或者你可能试图恢复这个channel。如果你没有实现任何入站异常的处理(
或者说,没有消费任何异常 ),Netty将日志记录未处理异常的真实情况。

总结:

  • ChannelHandler.exceptionCaught()方法的默认实现是传递当前异常到管道中的下一个处理器中。
  • 如果一个异常到达了管道的结尾,该异常将被记录为未处理。
  • 你可以通过重写exceptionCaught()方法来自定义异常处理。然后你能够觉得是否要让该异常跨过该点(
    即,是否需要将该异常传递到管道中的下一个处理器中 )。
处理出站异常

在出站操作中处理正常完成和异常的选项是基于下面的通知机制:

  • 每一个出站操作将返回一个ChannelFuture。注册ChannelFutureListeners到一个ChannelFuture中,无论该事件成功与否ChannelFutureListeners在事件完成时都将得到一个通知。
  • 几乎所有的ChannelOutboundHandler方法都会传递一个ChannelPromise实例(
    作为方法的参数来传递
    )。作为ChannelFuture的子类,ChannelPromise也能给指定的监听进行异步事件的通知。但是ChannelPromise还提供了可写的方法来提供即时通知:

    图片 48

    也就是说,ChannelPromise相比于ChannelFuture来说多了可以设置异步操作完成(成功或失败)的方法。

通过一个ChannelFuture实例的addListener(ChannelFutureListener)方法来添加一个ChannelFutureListener,并且有两种方式来实现这。其中一个最常见的用法是在ChannelFuture上调用addListener()方法,该ChannelFuture是一个出站操作(
如,write() )的返回值。

图片 49

第二个方式是,添加一个ChannelFutureListener到一个ChannelPromise,该ChannelPromise将以参数的方式在ChannelOutboundHandler方法中传递。

图片 50

6.14示例和6.13示例是等价的。
但使用6.14示例的方式来处理异常不如6.13示例来的专业,而6.14示例通过自定义OutboundExceptionHandler方式处理异常虽然不专业但实现更简单。

总的来说,入站操作异常由exceptionCaught()方法处理;出站异常由ChannelFuture处理。

出站异常示例:

图片 51

控制台:

图片 52

👆这里是没有异常堆栈信息的。异常在ChannelFuture中持有的,我们可以通过👇的方式来获取异常。

图片 53

ChannelPromise 的可写方法
通过调用ChannelPromise的setSuccess()和setFailure()方法,你能立即得知操作的状态在该ChannelHandler方法返回给调用者后。
我想这里想要表述的是,一旦调用了ChannelPromise的setSuccess()和setFailure()方法,则说明当前的异步操作就完成了。所以在ChannelPromise的setSuccess()和setFailure()方法调用后,你就能够得到异步操作的状态了。

如果你的ChannelOutboundHandler自己抛出了一个异常,那么将会发生什么?在这种情况下,Netty将会通知所有已经注册到相应ChannelPromise的监听器

后记

本文主要对Netty的ChannelHandler和ChannelPipeline进行了介绍,这两个都是是Netty中非常重要的组件,也涉及到了不少的知识点。
若文章有任何错误,望大家不吝指教:)

参考

《Netty in
action》

Post Author: admin

发表评论

电子邮件地址不会被公开。 必填项已用*标注