RocketMQ原理—2.源码设计简单分析三

大纲

1.NameServer的启动脚本

2.NameServer启动时会解析哪些配置

3.NameServer如何初始化Netty网络服务器

4.NameServer如何启动Netty网络服务器

5.Broker启动时是如何初始化配置的

6.BrokerController的创建以及包含的组件

7.BrokerController的初始化

8.BrokerController的启动

9.Broker如何把自己注册到NameServer上

10.BrokerOuterAPI是如何发送注册请求的

11.NameServer如何处理Broker的注册请求

16.Broker如何发送定时心跳的以及故障感知


10.BrokerOuterAPI是如何发送注册请求的

(1)发送Broker注册请求的方法入口

(2)NettyClient的网络请求方法invokeSync()

(3)Broker如何与NameServer建立网络连接

(4)如何通过Channel网络连接发送请求


(1)发送Broker注册请求的方法入口

现在进入到向NameServer真正注册Broker的网络请求方法里去看,其入口就是:

//真正执行注册的代码是下面这一行
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
//注册完了,注册结果就放到一个List里去
if (result != null) {
    registerBrokerResultList.add(result);
}

BrokerOuterAPI的registerBroker()方法如下:

public class BrokerOuterAPI {
    ...
    private RegisterBrokerResult registerBroker(final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        //把请求头和请求体都封装到RemotingCommand中
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);
        //这个oneway是指不用等待注册结果,属于特殊的请求
        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }
        //一般情况,真正的发送网络请求的逻辑都在下面
        //这个remotingClient其实就是NettyClient,通过NettyClient将网络请求发送出去
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

        //接下来就是处理网络请求的返回结果
        //把注册请求的结果封装成Result保存起来,并且返回Result
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
    }
    ...
}

由上述代码可知,注册请求最终是基于NettyClient组件发送给NameServer的。

(2)NettyClient的网络请求方法invokeSync()

接着进入NettyClient的网络请求方法invokeSync()去看,代码如下:

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    ...
    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        //获取当前时间
        long beginStartTime = System.currentTimeMillis();
        //下面这行代码获取了一个Channel,这个Channel就是当前这台Broker机器和NameServer机器之间的一个连接
        //所以会将NameServer的地址作为参数传递进去,表示要和该NameServer机器建立一个网络连接
        //当连接建立后,就会用一个Channel来代表该连接
        final Channel channel = this.getAndCreateChannel(addr);
        //如果和NameServer之间的网络连接是OK的,那么就可以发送请求了
        if (channel != null && channel.isActive()) {
            try {
                //发送请求前RPC钩子做的事情
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
                }
                //下面这行代码会真正发送网络请求出去
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                //发送请求后RPC钩子做的事情
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
    ...
}

由上述代码可知,可以在下图中加入Channel来表示出Broker和NameServer之间的一个网络连接。然后通过这个Channel,Broker就可以发送实际的网络请求给NameServer了。

(3)Broker如何与NameServer建立网络连接

接下来进入this.getAndCreateChannel(addr)这行代码,看看Broker是如何与NameServer建立网络连接的。具体会先从缓存里尝试获取连接如果没有缓存就创建一个连接

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    ...
    private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
        if (null == addr) {
            return getAndCreateNameserverChannel();
        }
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
        return this.createChannel(addr);
    }
    ...
}

接着看this.createChannel(addr)方法是如何通过一个NameServer的地址创建出一个网络连接的。

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    ...
    private Channel createChannel(final String addr) throws InterruptedException {
        //下面这一行就是在尝试获取缓存里的连接,如果有缓存就返回连接
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }

        if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                //下面的十几行代码也是在获取缓存里的连接
                boolean createNewConnection;
                cw = this.channelTables.get(addr);
                if (cw != null) {
                    if (cw.isOK()) {
                        return cw.getChannel();
                    } else if (!cw.getChannelFuture().isDone()) {
                        createNewConnection = false;
                    } else {
                        this.channelTables.remove(addr);
                        createNewConnection = true;
                    }
                } else {
                    createNewConnection = true;
                }

                if (createNewConnection) {
                    //这里才是真正创建连接的地方,会基于Netty的Bootstrap这个类的connect()方法来构建出一个真正的Channel网络连接
                    ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
                    log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
                    cw = new ChannelWrapper(channelFuture);
                    this.channelTables.put(addr, cw);
                }
            } catch (Exception e) {
                log.error("createChannel: create channel exception", e);
            } finally {
                this.lockChannelTables.unlock();
            }
        } else {
            log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        //下面的代码就是把Channel连接返回回去
        if (cw != null) {
            ChannelFuture channelFuture = cw.getChannelFuture();
            if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
                if (cw.isOK()) {
                    log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                    return cw.getChannel();
                } else {
                    log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
                }
            } else {
                log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString());
            }
        }
        return null;
    }
    ...
}

(4)如何通过Channel网络连接发送请求

通过Channel网络连接发送各种请求的入口其实就是NettyRemotingClient类中的invokeSync()方法里面的代码:

RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);

进入这个invokeSyncImpl()方法,如下所示:

public abstract class NettyRemotingAbstract {
    ...
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();
        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            //基于Netty来开发,核心就是基于Channel把请求写出去
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            //下面这行代码就在等待请求的响应结果回来
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }
            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }
    ...
}

上述代码的重点,就是最终会基于Netty的Channel API,把注册请求发送给NameServer。


11.NameServer如何处理Broker的注册请求

前面介绍完了Broker启动时是如何通过BrokerOuterAPI发送注册请求到NameServer去的,如下图示:

接下来介绍NameServer接收到这个注册请求后是如何进行处理的,这里会涉及Netty网络通信相关的内容。


现在回到NamesrvController这个类的初始化方法,也就是NamesrvController的initialize()方法,其中的一个源码片段如下所示:

public class NamesrvController {
    ...
    public boolean initialize() {
        //加载kv之类的配置
        this.kvConfigManager.load();
        //初始化Netty服务器
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //Netty服务器的工作线程池
        this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //下面这行代码会把工作线程池交给Netty服务器
        this.registerProcessor();
        ...
    }
    ...
}

继续看registerProcessor()方法的源码:

public class NamesrvController {
    ...
    private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            //这里是用于处理测试集群的
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor);
        } else {
            //这里会把NameServer的默认请求处理组件注册了进NettyServer中,所以NettyServer接收到的网络请求,都会交给这个组件来处理
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }
    ...
}

由上述源码可知,下图NameServer中的NettyServer会用于接收网络请求,然后交给DefaultRequestProcessor这个请求处理组件来进行处理。

所以如果想要知道Broker的注册请求是如何被NameServer进行处理的直接看DefaultRequestProcessor中的代码即可。下面是DefaultRequestProcessor这个类的一些源码:

public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    ...
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        //打印调试日志
        if (ctx != null) {
            log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request);
        }

        //根据请求类型的不同进行不同的处理
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                //这里就是处理Broker注册的请求
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    //Broker注册的请求处理逻辑,就在下面的registerBroker()方法里
                    return this.registerBroker(ctx, request);
                }
             ...
        }
        return null;
    }
    ...
}

接着进入DefaultRequestProcessor这个类的registerBroker()方法,去看如何完成Broker注册。

public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    ...
    public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
        //下面的代码就是解析注册请求,然后构造返回响应
        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
        final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }

        TopicConfigSerializeWrapper topicConfigWrapper;
        if (request.getBody() != null) {
            topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
        } else {
            topicConfigWrapper = new TopicConfigSerializeWrapper();
            topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
            topicConfigWrapper.getDataVersion().setTimestamp(0);
        }
    
        //核心在这里,这里会调用RouteInfoManager这个核心功能组件的注册Broker方法
        //RouteInfoManager就是路由信息管理组件,它是一个功能组件
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            topicConfigWrapper,
            null,
            ctx.channel()
        );
      
        //下面在构造返回的响应
        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());

        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
    ...
}

根据上述代码,在下图中添加RouteInfoManager这个路由数据管理组件,实际上Broker注册就是通过它来实现的。

而RouteInfoManager的注册Broker的方法,会把一个Broker机器的数据放入RouteInfoManager中维护的路由数据表里。其实现思路就是用一些Map类的数据结构,去存放Broker的核心路由数据:ClusterName、BrokerId、BrokerName等。而且在更新时,会基于Java并发包下的ReadWriteLock进行读写锁加锁,因为在这里更新那么多的内存Map数据结构,必须要加一个写锁,此时只能有一个线程来更新它们才行。


16.Broker如何发送定时心跳的以及故障感知

(1)NameServer处理Broker注册请求的原理

(2)Broker如何定时发送心跳到NameServer


(1)NameServer处理Broker注册请求的原理

NameServer核心就是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor这个请求处理组件来处理Broker注册请求。而真正的Broker注册逻辑是放在RouteInfoManager这个路由数据管理组件里来进行实现的,最终Broker路由数据都会存放在RouteInfoManager内部的一些Map数据结构组成的路由数据表中

(2)Broker如何定时发送心跳到NameServer

接下来介绍Broker是如何定时发送心跳到NameServer让NameServer感知Broker一直都存活的,以及如果Broker一段时间没有发送心跳到NameServer,那么NameServer是如何感知到Broker已经挂掉了的。


首先看一下Broker中的发送注册请求给NameServer的一个源码入口,这其实就在BrokerController的start()方法中。BrokerController启动时并不仅仅会发送一次注册请求还会启动一个定时任务:每隔一段时间就发送一次注册请求

public class BrokerController {
    ...
    public void start() throws Exception {
        //启动消息存储组件
        if (this.messageStore != null) { this.messageStore.start(); }
        //启动Netty服务器,这样就可以接收请求了
        if (this.remotingServer != null) { this.remotingServer.start(); }
        if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); }
        //启动和文件相关的服务组件fileWatchService
        if (this.fileWatchService != null) { this.fileWatchService.start(); }
        //brokerOuterAPI是核心组件,该组件可以让Broker通过Netty客户端发送请求出去的
        //比如Broker发送请求到NameServer去注册以及进行心跳就是通过这个组件实现的
        if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); }
        if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); }
        if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); }
        if (this.filterServerManager != null) { this.filterServerManager.start(); }
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            this.registerBrokerAll(true, false, true);
        }

        //这里往线程池里提交了一个任务,让它去NameServer进行注册
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

        //下面是一些功能组件的启动
        if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); }
        if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); }
    }
    ...
}

上面的代码会启动一个定时调度任务,默认是每隔30s执行一次Broker注册的过程。


所以默认情况下,第一次发送注册请求就是在进行注册,此时会把Broker路由数据放入到NameServer的RouteInfoManager的路由数据表里去。但是后续每隔30s都会发送一次注册请求,这些后续定时发送的注册请求,其实本质上就是Broker发送心跳给NameServer

那么后续每隔30s,Broker就会发送一次注册请求,作为心跳来发送给NameServer时,NameServer对后续重复发送过来的注册请求(也就是心跳)是如何进行处理的呢?接下来看RouteInfoManager的注册Broker的源码:

public class RouteInfoManager {
    ...
    public RegisterBrokerResult registerBroker(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List filterServerList, final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                //这里加写锁,同一时间只能一个线程来执行
                this.lock.writeLock().lockInterruptibly();
                //下面根据clusterAddrTable获取一个set集合:这就是在维护一个集群里有哪些Broker存在的一个set数据结构
                Set brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                //然后直接把brokerName放入这个set集合里
                //如果Broker在注册后每隔30s发送注册请求作为心跳,这里是没影响的
                //因为同样一个brokerName反复发送,这里的set集合会自动去重
                brokerNames.add(brokerName);

                boolean registerFirst = false;

                //这里根据brokerName获取到BrokerData
                //然后用一个brokerAddrTable作为核心路由数据表,存放了所有的Broker的详细的路由数据
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);

                //以下几行是核心的处理逻辑:
                //如果Broker是第一次发送注册请求过来,这里的brokerData就是null
                //于是就会封装一个BrokerData,并放入到这个路由数据表里————这也就是Broker的注册过程
                //如果Broker注册后每隔30s发送注册请求作为心跳,这里是没影响的
                //因为当Broker重复发送注册请求时,这个BrokerData已经存在了,不会重复进行处理
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
            
                //下面会对路由数据做一些处理
                Map brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                Iterator> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry item = it.next();
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }

                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);

                if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
                        ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }

                //以下几行是核心的处理逻辑
                //当Broker每隔30s发送注册请求作为心跳到这里时,这里就会封装一个新的BrokerLiveInfo放入brokerLiveTable这个Map
                //所以每隔30s,最新的BrokerLiveInfo都会覆盖上一次的BrokerLiveInfo
                //而这个BrokerLiveInfo里就有一个当前时间戳,代表着Broker最近的一次心跳时间
                //这也就是Broker每隔30s发送注册请求作为心跳的处理逻辑
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo(System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                //下面的代码可先不管
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }

                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }
        return result;
    }
    ...
}

如下图示,当Broker每隔30s发送一个注册请求作为心跳时,RouteInfoManager路由数据管理组件就会进行心跳时间的刷新处理。

假设Broker已经挂了或者故障了,隔了很久都没有发送每隔30s一次的注册请求,那么此时NameServer是如何感知Broker已经挂掉呢?


回到NamesrvController的initialize()方法里,里面有一处代码就是启动RouteInfoManager中的一个定时扫描不活跃Broker的任务的。

public class NamesrvController {
    ...
    public boolean initialize() {
        ...
        //下面这行代码就是启动一个后台线程,执行定时任务
        //从scanNotActiveBroker()可知这里会定时扫描哪些Broker没发送心跳而挂掉的
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        ...
    }
    ...
}

上面这段代码会启动一个定时调度线程,每隔10s扫描一次目前不活跃的Broker。其中便调用了RouteInfoManager的scanNotActiveBroke()方法,该方法会感知一个Broker是否挂掉



RouteInfoManager.scanNotActiveBroker()方法的源码如下:

public class RouteInfoManager {
    ...
    public void scanNotActiveBroker() {
        //这里的代码会扫描存储着BrokerLiveInfo心跳数据的brokerLiveTable这个Map
        //通过遍历拿到每个Broker最近一次心跳刷新的BrokerLiveInfo,从而知道一个Broker最近一次发送心跳的时间
        Iterator> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            //核心判断逻辑如下:
            //如果当前时间距离上一次心跳时间,超过了Broker心跳超时时间,默认是120s
            //也就是如果一个Broker两分钟没发送心跳,就认为它已经死掉了
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                //此时就会把这个Broker从路由数据表里剔除出去
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }
    ...
}