Netty笔记

Mars 2019年11月20日 65次浏览
  • 一个 EventLoopGroup当中会包含一个或多个 EventLoop。
  • 一个 EventLoop在它的整个生命周期当中都只会与唯一个 thread进行绑定。
  • 所有由 EventLoop所处理的各种工/o事件都将在它所关联的那个 hread上进行处理。
  • 一个 Channe1在它的整个生命周期中只会注册在一个 EventLoop上EventLoop在运行过程当中,会被分配给一个或者多个 Channe1

  • 重要结论:在 Netty中, Channel的实现一定是线程安全的;基于此,我们可以存储一个 Channe的引用,并且在需要向远程端点发送数据时,通过这个引用来调用 Channe1相应的方法;即便当时有很多线程都在使用它也不会出现多线程问题;而且,消息一定会按照顺序发送出去重要结论:我们在业务开发中,不要将长时间执行的耗时任务放入到 EventLoop的执行队列中,因为它将会一直阻塞该线程所对应的所有 Channe1上的其他执行任务,如果我们需要进行阻塞调用或是耗时的操作(实际开发中很常见),那么我们就需要使用一个专门的 EventExecutor(业务线程池)。
  • 通常会有两种实现方式
  • 在 ChannelHandler的回调方法中,使用自己定义的业务线程池,这样就可以实现异步调用。
  • 借助于Nety提供的向 ChannelPipeline添加 ChannelHandler时调用的 addLast方法来传递 EventExecute说明:默认情况下(调用 addLast(hand1ex), ChannelHandler中的回调方法都是由i/o线程所执行,如果调用了 ChannelPipeline addLast( EventExecutorGroup group, Channelhandler,, handlers);方法,那么 ChannelHandler中的回调方法就是由参数中的 group线程组来执行的。

  • JDK所提供的 Future只能通过手工方式检查执行结果,而这个操作是会阻塞的; Netty则对 ChannelFuture进行了增强,通过 Channelfuturelistener以回调的方式来获取 执行结果,去除了手工检查阻塞的操作;值得注意的是: Channe1 Futurelistener的 operationcomplete方法是由i/o线程执行的,因此要注意的是不要在这里执行耗时操作,否则需要通过另外的线程或线程池来执行

  • 在 Netty中有两种发送消息的方式,可以直接写到 Channe中,也可以写到与 ChannelHandler所关联ChannelHandlerContext中。对于前一种方式来说,消息会从ChannelPipeline的末尾开始流动;对于后一种方式来说,消息将从 ChannelPiple1ine中的下一个 Channelhandler开始流动。
  • 结论
  • ChannelHandlerContext与 ChannelHandler之间的关联绑定关系是永远都不会发生改变的,因此对其进行缓存是没有任何问题的。
  • 对于与 Channel的同名方法来说, ChannelHandlercontext的方法将会产生更短的事件流,所以我们应该在可能的情况下利用这个特性来提升应用性能。

  • Netty对于OIO和NIO几乎无缝未还,原理是通过设置Socket的Timeout来实现的,假如OIO方式,设置超时为100ms,之后没完成会抛出超时异常,Netty自己捕获,在下一个handler继续处理,以此类推,直到完成。

  • Netty的“消息中转最佳实践”,在服务端的handler中新建客户端,转发该服务端收到的消息,并且使用当前线程组EventLoopGroup的当前线程EventLoop来进行转发。 需要注意的是,Netty权威指南和Netty实战中都有谈到,不要在ChannelHandler方法中调用sync()或await()方法,会有可能引起死锁。因此如果不使用 connect.addListener()这种异步方式发送,而使用 connect.sync()之后再调用writeAndFlush()则会报错io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@7
 @Override
    public void channelRead(ChannelHandlerContext ctx, Object s) throws Exception {
        System.out.println("中转站服务端收到消息 :" + s);

        /**
         * 当有客户端发送消息来得时候,就连接上下个服务端(此时这个服务端就作为客户端),当然在channelRead0调用也可以
         * 重点在于client和server使用同一个group,使用ctx.channel().eventLoop()来获取。
         */
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)).addLast(new StringEncoder(CharsetUtil.UTF_8))
                                .addLast("client2", new ClientHandler());
                    }
                }).group(ctx.channel().eventLoop());
        ChannelFuture connect = bootstrap.connect("127.0.0.1", 6667);
        connect.addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                System.out.println(connect.channel());
                connect.channel().writeAndFlush(s);
                connect.addListener(ChannelFutureListener.CLOSE);
            }
        });
    }

错误代码:在handler中调用sync()将会报错。

 Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)).addLast(new StringEncoder(CharsetUtil.UTF_8))
                                .addLast("client2", new ClientHandler());
                    }
                }).group(ctx.channel().eventLoop());
        ChannelFuture connect = bootstrap.connect("127.0.0.1", 6667).sync();
        System.out.println(connect.channel());
        connect.channel().writeAndFlush(s);
        connect.addListener(ChannelFutureListener.CLOSE);

  • Netty之所以说是异步非阻塞网络框架是因为通过NioSocketChannel的write系列方法向连接里面写入数据时候是非阻塞的,马上会返回的,即使调用写入的线程是我们的业务线程。这是Netty通过在ChannelPipeline中判断调用NioSocketChannel的write的调用线程是不是其对应的NioEventLoop中的线程来实现的(即是不是在Netty的EventLoop线程中执行的,在handler中是在本身的EventLoop线程执行),如果发现不是则会把写入请求封装为WriteTask投递到其对应的NioEventLoop中的队列里面,然后等其对应的NioEventLoop中的线程轮询连接套接字的读写事件时候捎带从队列里面取出来执行总结说就是每个NioSocketChannel对应的读写事件都是在其对应的NioEventLoop管理的单线程内执行,对同一个NioSocketChannel不存在并发读写,所以无需加锁处理。即channel的相关读写方法是线程安全的。

  • 另外当从NioSocketChannel中读取数据时候,并不是使用业务线程来阻塞等待,而是等NioEventLoop中的IO轮询线程发现Selector上有数据就绪时候,通过事件通知方式来通知我们业务数据已经就绪,可以来读取并处理了。观察者模式
  • 下面我们讨论两个细节,第一是完成TCP三次握手的套接字应该注册到worker线程池中的哪一个NioEventLoop的Selector上,第二个是NioEventLoop中的线程负责监听注册到Selector上的所有连接的读写事件和处理队列里面的消息,那么会不会导致由于处理队列里面任务耗时太长导致来不及处理连接的读写事件?
  • 1.Netty默认使用的是PowerOfTwoEventExecutorChooser,采用的轮询取模方式来进行分配。
  • 2.Netty默认是采用时间均分策略来避免某一方处于饥饿状态:

如上代码1.1处理所有注册到当前NioEventLoop的Selector上的所有连接套接字的读写事件,代码1.2用来统计其耗时,由于默认情况下ioRatio为50,所以代码1.3尝试使用与代码1.2执行相同的时间来运行队列里面的任务。也就是处理套接字读写事件与运行队列里面任务是使用时间片轮转方式轮询执行。


  • 使用No进行文件读取所涉及的步骤: 1.从i1 eInputstream对象获取到 Channe1对象。 2.创建 Buffer。 3.将数据从 Channe1中读取到 Buffer对象中。

  • ByteBuffer 满足关系: 0 < mark < position < limit < capacity

  • flip()方法 1.将1imi值设为当前的 position。 2.将 positon设为0。

  • clear()方法。 1.将1imit值设为 capacity 2.将 position值设为0。

  • compact()方法。 1.将所有未读的数据复制到 buffer起始位置处。 2.将 position设为最后一个未读元素的后面 3.将1imit设为 capacity。 4.现在 buffer就准备好了,但是不会覆盖未读的数据。


Nety处理器重要概念

  • 1 Netty的处理器可以分为两类:入站处理器与出站处理器 2.入站处理器的顶层是 Channe1 nboundHandler,出站处理器的顶层是 ChanneloutboundHandler。 3.数据处理时常用的各种编解码器本质上都是处理器 4.编解码器:无论我们向网络中写入的数据是什么类型(int、char、 String、二进制等),数据在网络中传递时,其都是以字节流的形式呈现的;将数据由原本的形式转换 为字节流的操作成为编码( encode),将数据由字节转换为它原本的格式或是其他格式的操作成为解码( decode),编解码统一称为 codec 5.编码:本质上是一种出站处理器;因此,编码一定是一种 ChanneloutboundHandler 6,解码:本质上是一种入站处理器;因此,解码一定是一种 ChannelInboundHandler 7.在 Netty中,编码器通常以 XXXEncoder命名;解码器通常以 XXXDecoder命名。