大纲
1.NameServer的启动脚本
2.NameServer启动时会解析哪些配置
3.NameServer如何初始化Netty网络服务器
4.NameServer如何启动Netty网络服务器
5.Broker启动时是如何初始化配置的
6.BrokerController的创建以及包含的组件
7.BrokerController的初始化
8.BrokerController的启动
9.Broker如何把自己注册到NameServer上
10.BrokerOuterAPI是如何发送注册请求的
11.NameServer如何处理Broker的注册请求
16.Broker如何发送定时心跳的以及故障感知
10.BrokerOuterAPI是如何发送注册请求的
(1)发送Broker注册请求的方法入口
(2)NettyClient的网络请求方法invokeSync()
(3)Broker如何与NameServer建立网络连接
(4)如何通过Channel网络连接发送请求
(1)发送Broker注册请求的方法入口
现在进入到向NameServer真正注册Broker的网络请求方法里去看,其入口就是:
//真正执行注册的代码是下面这一行
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
//注册完了,注册结果就放到一个List里去
if (result != null) {
registerBrokerResultList.add(result);
}
BrokerOuterAPI的registerBroker()方法如下:
public class BrokerOuterAPI {
...
private RegisterBrokerResult registerBroker(final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
//把请求头和请求体都封装到RemotingCommand中
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
//这个oneway是指不用等待注册结果,属于特殊的请求
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//一般情况,真正的发送网络请求的逻辑都在下面
//这个remotingClient其实就是NettyClient,通过NettyClient将网络请求发送出去
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
//接下来就是处理网络请求的返回结果
//把注册请求的结果封装成Result保存起来,并且返回Result
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
...
}
由上述代码可知,注册请求最终是基于NettyClient组件发送给NameServer的。
(2)NettyClient的网络请求方法invokeSync()
接着进入NettyClient的网络请求方法invokeSync()去看,代码如下:
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
...
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
//获取当前时间
long beginStartTime = System.currentTimeMillis();
//下面这行代码获取了一个Channel,这个Channel就是当前这台Broker机器和NameServer机器之间的一个连接
//所以会将NameServer的地址作为参数传递进去,表示要和该NameServer机器建立一个网络连接
//当连接建立后,就会用一个Channel来代表该连接
final Channel channel = this.getAndCreateChannel(addr);
//如果和NameServer之间的网络连接是OK的,那么就可以发送请求了
if (channel != null && channel.isActive()) {
try {
//发送请求前RPC钩子做的事情
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
}
//下面这行代码会真正发送网络请求出去
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//发送请求后RPC钩子做的事情
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
...
}
由上述代码可知,可以在下图中加入Channel来表示出Broker和NameServer之间的一个网络连接。然后通过这个Channel,Broker就可以发送实际的网络请求给NameServer了。
(3)Broker如何与NameServer建立网络连接
接下来进入this.getAndCreateChannel(addr)这行代码,看看Broker是如何与NameServer建立网络连接的。具体会先从缓存里尝试获取连接,如果没有缓存就创建一个连接。
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
...
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
}
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
return this.createChannel(addr);
}
...
}
接着看this.createChannel(addr)方法是如何通过一个NameServer的地址创建出一个网络连接的。
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
...
private Channel createChannel(final String addr) throws InterruptedException {
//下面这一行就是在尝试获取缓存里的连接,如果有缓存就返回连接
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
//下面的十几行代码也是在获取缓存里的连接
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
//这里才是真正创建连接的地方,会基于Netty的Bootstrap这个类的connect()方法来构建出一个真正的Channel网络连接
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
//下面的代码就是把Channel连接返回回去
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString());
}
}
return null;
}
...
}
(4)如何通过Channel网络连接发送请求
通过Channel网络连接发送各种请求的入口其实就是NettyRemotingClient类中的invokeSync()方法里面的代码:
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
进入这个invokeSyncImpl()方法,如下所示:
public abstract class NettyRemotingAbstract {
...
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
//基于Netty来开发,核心就是基于Channel把请求写出去
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
//下面这行代码就在等待请求的响应结果回来
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
...
}
上述代码的重点,就是最终会基于Netty的Channel API,把注册请求发送给NameServer。
11.NameServer如何处理Broker的注册请求
前面介绍完了Broker启动时是如何通过BrokerOuterAPI发送注册请求到NameServer去的,如下图示:
接下来介绍NameServer接收到这个注册请求后是如何进行处理的,这里会涉及Netty网络通信相关的内容。
现在回到NamesrvController这个类的初始化方法,也就是NamesrvController的initialize()方法,其中的一个源码片段如下所示:
public class NamesrvController {
...
public boolean initialize() {
//加载kv之类的配置
this.kvConfigManager.load();
//初始化Netty服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//Netty服务器的工作线程池
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//下面这行代码会把工作线程池交给Netty服务器
this.registerProcessor();
...
}
...
}
继续看registerProcessor()方法的源码:
public class NamesrvController {
...
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
//这里是用于处理测试集群的
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor);
} else {
//这里会把NameServer的默认请求处理组件注册了进NettyServer中,所以NettyServer接收到的网络请求,都会交给这个组件来处理
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
...
}
由上述源码可知,下图NameServer中的NettyServer会用于接收网络请求,然后交给DefaultRequestProcessor这个请求处理组件来进行处理。
所以如果想要知道Broker的注册请求是如何被NameServer进行处理的,直接看DefaultRequestProcessor中的代码即可。下面是DefaultRequestProcessor这个类的一些源码:
public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
...
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//打印调试日志
if (ctx != null) {
log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request);
}
//根据请求类型的不同进行不同的处理
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
//这里就是处理Broker注册的请求
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
//Broker注册的请求处理逻辑,就在下面的registerBroker()方法里
return this.registerBroker(ctx, request);
}
...
}
return null;
}
...
}
接着进入DefaultRequestProcessor这个类的registerBroker()方法,去看如何完成Broker注册。
public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
...
public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
//下面的代码就是解析注册请求,然后构造返回响应
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
} else {
topicConfigWrapper = new TopicConfigSerializeWrapper();
topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
topicConfigWrapper.getDataVersion().setTimestamp(0);
}
//核心在这里,这里会调用RouteInfoManager这个核心功能组件的注册Broker方法
//RouteInfoManager就是路由信息管理组件,它是一个功能组件
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
//下面在构造返回的响应
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
...
}
根据上述代码,在下图中添加RouteInfoManager这个路由数据管理组件,实际上Broker注册就是通过它来实现的。
而RouteInfoManager的注册Broker的方法,会把一个Broker机器的数据放入RouteInfoManager中维护的路由数据表里。其实现思路就是用一些Map类的数据结构,去存放Broker的核心路由数据:ClusterName、BrokerId、BrokerName等。而且在更新时,会基于Java并发包下的ReadWriteLock进行读写锁加锁,因为在这里更新那么多的内存Map数据结构,必须要加一个写锁,此时只能有一个线程来更新它们才行。
16.Broker如何发送定时心跳的以及故障感知
(1)NameServer处理Broker注册请求的原理
(2)Broker如何定时发送心跳到NameServer
(1)NameServer处理Broker注册请求的原理
NameServer核心就是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor这个请求处理组件来处理Broker注册请求。而真正的Broker注册逻辑是放在RouteInfoManager这个路由数据管理组件里来进行实现的,最终Broker路由数据都会存放在RouteInfoManager内部的一些Map数据结构组成的路由数据表中。
(2)Broker如何定时发送心跳到NameServer
接下来介绍Broker是如何定时发送心跳到NameServer让NameServer感知Broker一直都存活的,以及如果Broker一段时间没有发送心跳到NameServer,那么NameServer是如何感知到Broker已经挂掉了的。
首先看一下Broker中的发送注册请求给NameServer的一个源码入口,这其实就在BrokerController的start()方法中。BrokerController启动时并不仅仅会发送一次注册请求,还会启动一个定时任务:每隔一段时间就发送一次注册请求。
public class BrokerController {
...
public void start() throws Exception {
//启动消息存储组件
if (this.messageStore != null) { this.messageStore.start(); }
//启动Netty服务器,这样就可以接收请求了
if (this.remotingServer != null) { this.remotingServer.start(); }
if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); }
//启动和文件相关的服务组件fileWatchService
if (this.fileWatchService != null) { this.fileWatchService.start(); }
//brokerOuterAPI是核心组件,该组件可以让Broker通过Netty客户端发送请求出去的
//比如Broker发送请求到NameServer去注册以及进行心跳就是通过这个组件实现的
if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); }
if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); }
if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); }
if (this.filterServerManager != null) { this.filterServerManager.start(); }
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
//这里往线程池里提交了一个任务,让它去NameServer进行注册
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
//下面是一些功能组件的启动
if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); }
if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); }
}
...
}
上面的代码会启动一个定时调度任务,默认是每隔30s执行一次Broker注册的过程。
所以默认情况下,第一次发送注册请求就是在进行注册,此时会把Broker路由数据放入到NameServer的RouteInfoManager的路由数据表里去。但是后续每隔30s都会发送一次注册请求,这些后续定时发送的注册请求,其实本质上就是Broker发送心跳给NameServer。
那么后续每隔30s,Broker就会发送一次注册请求,作为心跳来发送给NameServer时,NameServer对后续重复发送过来的注册请求(也就是心跳)是如何进行处理的呢?接下来看RouteInfoManager的注册Broker的源码:
public class RouteInfoManager {
...
public RegisterBrokerResult registerBroker(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List filterServerList, final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
//这里加写锁,同一时间只能一个线程来执行
this.lock.writeLock().lockInterruptibly();
//下面根据clusterAddrTable获取一个set集合:这就是在维护一个集群里有哪些Broker存在的一个set数据结构
Set brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet();
this.clusterAddrTable.put(clusterName, brokerNames);
}
//然后直接把brokerName放入这个set集合里
//如果Broker在注册后每隔30s发送注册请求作为心跳,这里是没影响的
//因为同样一个brokerName反复发送,这里的set集合会自动去重
brokerNames.add(brokerName);
boolean registerFirst = false;
//这里根据brokerName获取到BrokerData
//然后用一个brokerAddrTable作为核心路由数据表,存放了所有的Broker的详细的路由数据
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
//以下几行是核心的处理逻辑:
//如果Broker是第一次发送注册请求过来,这里的brokerData就是null
//于是就会封装一个BrokerData,并放入到这个路由数据表里————这也就是Broker的注册过程
//如果Broker注册后每隔30s发送注册请求作为心跳,这里是没影响的
//因为当Broker重复发送注册请求时,这个BrokerData已经存在了,不会重复进行处理
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap());
this.brokerAddrTable.put(brokerName, brokerData);
}
//下面会对路由数据做一些处理
Map brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
//以下几行是核心的处理逻辑
//当Broker每隔30s发送注册请求作为心跳到这里时,这里就会封装一个新的BrokerLiveInfo放入brokerLiveTable这个Map
//所以每隔30s,最新的BrokerLiveInfo都会覆盖上一次的BrokerLiveInfo
//而这个BrokerLiveInfo里就有一个当前时间戳,代表着Broker最近的一次心跳时间
//这也就是Broker每隔30s发送注册请求作为心跳的处理逻辑
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo(System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
//下面的代码可先不管
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
...
}
如下图示,当Broker每隔30s发送一个注册请求作为心跳时,RouteInfoManager路由数据管理组件就会进行心跳时间的刷新处理。
假设Broker已经挂了或者故障了,隔了很久都没有发送每隔30s一次的注册请求,那么此时NameServer是如何感知Broker已经挂掉呢?
回到NamesrvController的initialize()方法里,里面有一处代码就是启动RouteInfoManager中的一个定时扫描不活跃Broker的任务的。
public class NamesrvController {
...
public boolean initialize() {
...
//下面这行代码就是启动一个后台线程,执行定时任务
//从scanNotActiveBroker()可知这里会定时扫描哪些Broker没发送心跳而挂掉的
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
...
}
...
}
上面这段代码会启动一个定时调度线程,每隔10s扫描一次目前不活跃的Broker。其中便调用了RouteInfoManager的scanNotActiveBroke()方法,该方法会感知一个Broker是否挂掉。
RouteInfoManager.scanNotActiveBroker()方法的源码如下:
public class RouteInfoManager {
...
public void scanNotActiveBroker() {
//这里的代码会扫描存储着BrokerLiveInfo心跳数据的brokerLiveTable这个Map
//通过遍历拿到每个Broker最近一次心跳刷新的BrokerLiveInfo,从而知道一个Broker最近一次发送心跳的时间
Iterator> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
//核心判断逻辑如下:
//如果当前时间距离上一次心跳时间,超过了Broker心跳超时时间,默认是120s
//也就是如果一个Broker两分钟没发送心跳,就认为它已经死掉了
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
//此时就会把这个Broker从路由数据表里剔除出去
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
...
}