Dubbo源码深度剖析:Dubbo如何使用Netty服务端

Dubbo Provider的服务导出其实就是初始化好一个Netty服务端以供其他Consumer连接,因此本文的重点是学习一下Dubbo如何使用Netty Server提供RPC服务,Netty Server如何初始化和应用,这也是阅读源码的好处。


reactor线程模型

首先看一下reactor线程模型的使用:

// 设置Netty的boss线程池和worker线程池
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
        getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
        "NettyServerWorker");

这里设置了2个线程池,bossGroup主要用于处理客户端的accept连接、断开连接等,workerGroup处理数据的传输,包括read、write事件和pipeline链条中的handler。

简单来说,reactor线程模型就是将线程池的职责分开,处理连接的属于低频操作且阻塞时间相对较长(因为需要进行TCP3次握手),处理数据的IO传输属于高频且和业务相关性紧密,使用互不干扰的线程池可以提高效率。

同时如果将pipeline链条中的业务处理handler里面,使用新的线程池进行所有的业务处理,然后完成业务处理的时候唤醒外部的workerGroup,由workerGroup只处理编解码等其他handler操作。这样就是kafka使用的线程模型,也就是3级线程处理,这样的效率将进一步提升。

netty server参数优化

// 设置netty的业务处理类
  final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
   channels = nettyServerHandler.getChannels();
 
   bootstrap.group(bossGroup, workerGroup)
           .channel(NettyEventLoopFactory.serverSocketChannelClass())
           //一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用,用于断开后的重连。
           .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
           // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性
           .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
           // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块
           .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

上面有几个使用netty server的必要设置的参数都有写了注释:SO_REUSEADDR、TCP_NODELAY、ALLOCATOR。但其实除了上面那几个参数外,还有一些比较重要的参数在这块源码中没有体现,因此我从Seata(分布式事务框架)将相关的源码摘出来:

this.serverBootstrap
      .group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
      .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
      // TCP 3次握手的队列缓冲区大小
  .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
      //一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
  .option(ChannelOption.SO_REUSEADDR, true)
      // 连接保活
  .childOption(ChannelOption.SO_KEEPALIVE, true)
      // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送
  .childOption(ChannelOption.TCP_NODELAY, true)
      // 发送数据缓冲区大小
  .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
      // 接收数据缓冲区大小
  .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
      // 控制网络水位,将网络传输速度维持在比较平稳的状态
  .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
      new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
          nettyServerConfig.getWriteBufferHighWaterMark()))
  .localAddress(new InetSocketAddress(listenPort))

其他比较重要的参数就是上面几个:SO_BACKLOG、SO_KEEPALIVE、SO_SNDBUF、SO_RCVBUF。而这个参数WRITE_BUFFER_WATER_MARK是属于新版本才有的,原理大概就是设置一个高水位、一个低水位,当输出的数据速率高于高水位时,则暂停一下write事件,改为处理read事件,这样就可以控制网络传输的速度比较平稳,避免大流量打死网卡。

netty server 处理链条

下面我们可以看到pipeline()里面加入了很多handler,其实这里就是使用了责任链设计模式,接收到的网络请求,都会依次使用每个handler处理一下:

.childHandler(new ChannelInitializer() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // FIXME: should we use getTimeout()?
        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
        // 默认不启用SSL
        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
            ch.pipeline().addLast("negotiation",
                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
        }
        ch.pipeline()
                // 解码器
                .addLast("decoder", adapter.getDecoder())
                // 编码器
                .addLast("encoder", adapter.getEncoder())
                // 心跳检查handler
                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                // 业务处理handler
                .addLast("handler", nettyServerHandler);
    }
});
// bind
// 绑定本地端口,并启动监听服务
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();

netty的pipeline处理顺序:如果是输入的请求,则是从上到下依次执行handler;如果是输出的响应,则是从下到上逆序执行handler。

这里我们再了解一下IdleStateHandler心跳处理器,这个handler其实是netty提供的,目的是维持server端和client端的长连接,如果没有设置这个handler,就会经常出现TCP的超时时间到了,然后客户端直接断开和服务端的连接,这个笔者之前遇到过这个bug,现象就是建立连接一段时间之后,没有新的网络传输的时候,莫名其妙地断开了连接。

更多详细的内容请订阅CSDN专栏Dubbo源码深度剖析_Gemini的博客-CSDN博客

现在订阅专栏Dubbo源码深度剖析_Gemini的博客-CSDN博客,发送订阅截图的私信给作者,将可以获得一份丰富的面试题整理资料,里面收录的面试题多达几百道。