Dubbo Provider的服务导出其实就是初始化好一个Netty服务端以供其他Consumer连接,因此本文的重点是学习一下Dubbo如何使用Netty Server提供RPC服务,Netty Server如何初始化和应用,这也是阅读源码的好处。
reactor线程模型
首先看一下reactor线程模型的使用:
// 设置Netty的boss线程池和worker线程池
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
这里设置了2个线程池,bossGroup主要用于处理客户端的accept连接、断开连接等,workerGroup处理数据的传输,包括read、write事件和pipeline链条中的handler。
简单来说,reactor线程模型就是将线程池的职责分开,处理连接的属于低频操作且阻塞时间相对较长(因为需要进行TCP3次握手),处理数据的IO传输属于高频且和业务相关性紧密,使用互不干扰的线程池可以提高效率。
同时如果将pipeline链条中的业务处理handler里面,使用新的线程池进行所有的业务处理,然后完成业务处理的时候唤醒外部的workerGroup,由workerGroup只处理编解码等其他handler操作。这样就是kafka使用的线程模型,也就是3级线程处理,这样的效率将进一步提升。
netty server参数优化
// 设置netty的业务处理类
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
//一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用,用于断开后的重连。
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
// 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
// ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
上面有几个使用netty server的必要设置的参数都有写了注释:SO_REUSEADDR、TCP_NODELAY、ALLOCATOR。但其实除了上面那几个参数外,还有一些比较重要的参数在这块源码中没有体现,因此我从Seata(分布式事务框架)将相关的源码摘出来:
this.serverBootstrap
.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
// TCP 3次握手的队列缓冲区大小
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
//一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
.option(ChannelOption.SO_REUSEADDR, true)
// 连接保活
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送
.childOption(ChannelOption.TCP_NODELAY, true)
// 发送数据缓冲区大小
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
// 接收数据缓冲区大小
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
// 控制网络水位,将网络传输速度维持在比较平稳的状态
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort))
其他比较重要的参数就是上面几个:SO_BACKLOG、SO_KEEPALIVE、SO_SNDBUF、SO_RCVBUF。而这个参数WRITE_BUFFER_WATER_MARK是属于新版本才有的,原理大概就是设置一个高水位、一个低水位,当输出的数据速率高于高水位时,则暂停一下write事件,改为处理read事件,这样就可以控制网络传输的速度比较平稳,避免大流量打死网卡。
netty server 处理链条
下面我们可以看到pipeline()里面加入了很多handler,其实这里就是使用了责任链设计模式,接收到的网络请求,都会依次使用每个handler处理一下:
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
// 默认不启用SSL
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
// 解码器
.addLast("decoder", adapter.getDecoder())
// 编码器
.addLast("encoder", adapter.getEncoder())
// 心跳检查handler
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
// 业务处理handler
.addLast("handler", nettyServerHandler);
}
});
// bind
// 绑定本地端口,并启动监听服务
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
netty的pipeline处理顺序:如果是输入的请求,则是从上到下依次执行handler;如果是输出的响应,则是从下到上逆序执行handler。
这里我们再了解一下IdleStateHandler心跳处理器,这个handler其实是netty提供的,目的是维持server端和client端的长连接,如果没有设置这个handler,就会经常出现TCP的超时时间到了,然后客户端直接断开和服务端的连接,这个笔者之前遇到过这个bug,现象就是建立连接一段时间之后,没有新的网络传输的时候,莫名其妙地断开了连接。
更多详细的内容请订阅CSDN专栏:Dubbo源码深度剖析_Gemini的博客-CSDN博客
现在订阅专栏Dubbo源码深度剖析_Gemini的博客-CSDN博客,发送订阅截图的私信给作者,将可以获得一份丰富的面试题整理资料,里面收录的面试题多达几百道。