Netty最佳实践

Mars 2019年12月13日 45次浏览
  • 使用池化buffer
/*
*use unpooled buffers with caution 谨慎使用非池化的buffer
*allocation/deallocation is slow buffer的分配和回收是很缓慢的
* Use Pooling of buffers to reduce allocation / deallocation time! 创建池化buffer会减少分配和回收时间
*
*/
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);
  • 搜索ButeBuf
//使用ButeBuf的forEachByte和ByteBufProcessor代替普通循环查找
SlowSearch :(
ByteBuf buf = ...;
int index = -1;
for (int i = buf.readerIndex(); index == -1 && i <  buf.writerIndex(); i++) {
  if (buf.getByte(i) == '\n') {
    index = i;
  }
}
FastSearch :)
ByteBuf buf = ...;
int index = buf.forEachByte(new ByteBufProcessor() {
  @Override
  public boolean process(byte value) {
    return value != '\n';
  }
});

ByteBufProcessor is faster because it…

​ can eliminate range checks 可以消除范围(边界)检查
​ can be created and shared 可以共享
​ easier to inline by the JIT  更容易通过JIT内联
Use it whenever you need to find some pattern in a ByteBuf  当您需要在ByteBuf中查找时,可以使用它

  • 其他buffer相关技巧
​ alloc() over Unpooled
​ slice(), duplicate() over copy
​ bulk operations over loops

  • 使用零拷贝或者内存映射传输文件内容
//大文件
File file=new File("");
FileInputStream in = new FileInputStream(file); //1
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //2
ctx.channel().writeAndFlush(region).addListener(new ChannelFutureListener() { //3
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
            Throwable cause = future.cause(); //4
            // Do something
        }
    }
});

//=====================================分隔线===================================

//This only works if you not need to modify the data on the fly. If so use ChunkedWriteHandler and NioChunkedFile. 仅在不修改数据情况下使用,否则使用ChunkedWriteHandler and NioChunkedFile
channel.transforTo(new DefaultFileRegion(fc, 0, fileLength));
  • 不要在EventLoopGroup的线程里面执行如下操作
//睡眠、阻塞、耗时计算、阻塞的db查询(即耗时操作因交给用户线程,不应该是work线程或者称之为child线程)
Thread.sleep()
CountDownLatch.await() or any other blocking operation from java.util.concurrent
Long-lived computationally intensive operations
Blocking operations that might take a while (e.g. DB query)

  • 共享EventLoopGroup线程组
//Share EventLoopGroup between different Bootstraps
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap().group(group);
Bootstrap bootstrap2 = new Bootstrap().group(group);
//可以共享线程资源,继而达到资源利用最大化
  • 类似于代理应共享同一个线程组group

不应该是如下做法:

//Called once a new connection was accepted 接收新连接时候被调用
//Use a new EventLoopGroup instance to handle the connection to the remote peer 使用一个新的线程组来处理代理和远程服务器的连接
//Don’t do this! This will tie up more resources than needed and introduce extra context-switching overhead. 这样很耗费资源,并引入额外的上下文切换开销

public class ProxyHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) { 
    final Channel inboundChannel = ctx.channel();
    Bootstrap b = new Bootstrap();
    b.group(new NioEventLooopGroup()); 
    ...
    ChannelFuture f = b.connect(remoteHost, remotePort);
    ...
  }
}

正确做法:

//	Called once a new connection was accepted
// Share the same EventLoop between both Channels. This means all IO for both connected Channels are handled by the same Thread. 共享线程,意味着该代理和远程连接的服务的io都由同一个线程处理

public class ProxyHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) { 
    final Channel inboundChannel = ctx.channel();
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop()); 
    ...
    ChannelFuture f = b.connect(remoteHost, remotePort);
    ...
  }
}

时刻记着在Application中共享EventLoop 以下开发实例:

//实际在转发中使用同一线程组,以下是在某个server端handler的read方法接收到消息,并将消息转发到另外一个服务端
 @Override
public void channelRead(ChannelHandlerContext ctx, Object s) throws Exception {

    System.out.println("中转站服务端收到消息 :" + s);

    /*
     * 重点在于client和server使用同一个group,使用ctx.channel().eventLoop()来获取,切记不能调用sync方法阻塞当前线程,只能添加监听。
     */
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)).addLast(new StringEncoder(CharsetUtil.UTF_8))
                            .addLast("client2", new ClientHandler());
                }
            }).group(ctx.channel().eventLoop());
    ChannelFuture connect = bootstrap.connect("127.0.0.1", 6667);
    connect.addListener(new GenericFutureListener<Future<? super Void>>() {
        @Override
        public void operationComplete(Future<? super Void> future) throws Exception {
            System.out.println(connect.channel());
            connect.channel().writeAndFlush(s);
            connect.addListener(ChannelFutureListener.CLOSE);
        }
    });
}


  • 当在外部调用channel的write方法时

所有write方法,在真正往通道写数据之前,都会检查当前写线程是否是Netty自身EventLoopGroup之中的线程(即Fast,如果是外部的定义为slow),如果是则写,不是则将当前写任务加入到自身的线程队列。所以,所有的channel相关的io方法均是线程安全的,都是由netty自身线程完成的。

Always combine operations if possible when act on the Channel from outside the EventLoop to reduce overhead of wakeups and object creation!
当从EventLoop外部对通道进行操作时,尽可能合并操作,以减少唤醒和对象创建的开销!

Not recommended!不推荐的方式

channel.write(msg1);
channel.writeAndFlush(msg3);

Combine for the WIN! 推荐方法,尽量合并写操作

channel.eventLoop().execute(new Runnable() {
  @Override
  public void run() {
    channel.write(msg1);
    channel.writeAndFlush(msg3);
  }
});
  • 当在内部操作Channel时

Use the shortest path as possible to get the maximal performance.


ctx和channel均有write方法,当使用上下文ctx的时候,数据将会从当前handler流向ChannelPieline的下一个handler进行处理,
而使用channel的时候,则数据是从pipeline的尾端(若为InboundHandler入站处理器)开始一次流向所有handler的。
可以见两种方式流经处理器的数量是不一样的,因此效率也有差异。

public class YourHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    // BAD (most of the times)
    ctx.channel().writeAndFlush(msg); 

    // GOOD
    ctx.writeAndFlush(msg); 
   }
}
Channel.* methods ⇒ the operation will start at the tail of the ChannelPipeline
ChannelHandlerContext.* methods => the operation will start from this `ChannelHandler to flow through the ChannelPipeline.
  • 使用Shareable共享Handler

Share ChannelHandlers if stateless 若果handler无状态,那么可以共享使用同一个实例,还可以减少GC

如果ChannelHandler被注解为 @Sharable,全局只有一个handler实例,它会被多个Channel的Pipeline共享,会被多线程并发调用,因此它不是线程安全的;如果存在跨ChannelHandler的实例级变量共享,需要特别注意,它可能不是线程安全的。

在整个ChannelPipeline执行过程中,可能会发生线程切换。此时,如果同一个对象在多个ChannelHandler中被共享,可能会被多线程并发操作

@ChannelHandler.Shareable 
public class StatelessHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    logger.debug("Now client from " + ctx.channel().remoteAddress().toString());
   }
}

public class MyInitializer extends ChannelInitializer<Channel> {
  private static final ChannelHandler INSTANCE = new StatelessHandler();
  @Override
  public void initChannel(Channel ch) {
    ch.pipeline().addLast(INSTANCE);
  }
}

Annotate ChannelHandler that are stateless with @ChannelHandler.Shareable and use the same instance accross Channels to reduce GC.
  • 移除不再需要的handler,以减少遍历带来的开销
Remove ChannelHandler once not needed anymore

public class OneTimeHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    doOneTimeAction();
    ctx.channel().pipeline().remove(this); 
   }
}

Remove ChannelHandler once not needed anymore.
This keeps the ChannelPipeline as short as possible and so eliminate overhead of traversing as much as possible.

  • 当通道就绪且有数据时,是否自动从通道读取数据
To auto-read or not to auto-read
By default Netty will keep on reading data from the Channel once something is ready.

Need more fine grained control ?

channel.config().setAutoRead(false); 
channel.read(); 
channel.config().setAutoRead(true); 

Disable auto read == no more data will be read automatically of this Channel.
Tell the Channel to do one read operation once new data is ready
Enable again auto read == Netty will automatically read again
This can also be quite useful when writing proxy like applications!

  • 比NIO更快的传输
性能上从低到高如下:

1) OioSocketChannel:传统,阻塞式编程。
2) NioSocketChannel:select/poll或者epoll,jdk 7之后linux下会自动选择epoll。
3) EpollSocketChannel:epoll,仅限linux,提供更多额外选项。
4) EpollDomainSocketChannel:ipc模式,仅限客户端、服务端在相同主机的情况,从4.0.26版本开始支持,见https://github.com/netty/netty/pull/3344。

Switching to native transport is easy

Using NIO transport

Bootstrap bootstrap = new Bootstrap().group(new NioEventLoopGroup());
bootstrap.channel(NioSocketChannel.class);

Using native transport

Bootstrap bootstrap = new Bootstrap().group(new EpollEventLoopGroup());
bootstrap.channel(EpollSocketChannel.class);
  • 更好的解码器

ReplayingDecoder<S> extends ByteToMessageDecoder 
不再需要判断是否有可读字节,比如下面这样

 public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
  
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
  
       if (buf.readableBytes() < 4) {
          return;
       }
  
       buf.markReaderIndex();
       int length = buf.readInt();
  
       if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
       }
  
       out.add(buf.readBytes(length));
     }
   }