RocketMQ原理—2.源码设计简单分析一

大纲

1.NameServer的启动脚本

2.NameServer启动时会解析哪些配置

3.NameServer如何初始化Netty网络服务器

4.NameServer如何启动Netty网络服务器

5.Broker启动时是如何初始化配置的

6.BrokerController的创建以及包含的组件

7.BrokerController的初始化

8.BrokerController的启动

9.Broker如何把自己注册到NameServer上

10.BrokerOuterAPI是如何发送注册请求的

11.NameServer如何处理Broker的注册请求

16.Broker如何发送定时心跳的以及故障感知


1.NameServer的启动脚本

NameServer会通过rocketmq-master源码中distribution/bin目录下的mqnamesrv脚本来启动。在mqnamesrv脚本中,用于启动NameServer进程的命令如下。也就是使用sh命令执行runserver.sh脚本,然后通过这个脚本去启动NamesrvStartup这个Java类

sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@ 

在runserver.sh脚本中,启动NamesrvStartup这个Java类的命令如下:

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8  -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

$JAVA ${JAVA_OPT} $@

可以看到,上述命令大致简化一下就是类似如下这样的一行命令:

java -server -Xms4g -Xmx4g -Xmn2g org.apache.rocketmq.namesrv.NamesrvStartup 

通过Java命令 + 一个有main()方法的NamesrvStartup类,就会启动一个JVM进程。这个JVM进程会执行NamesrvStartup类中的main()方法,从而完成启动NameServer需要的所有工作。


所以,启动NameServer的过程如下图示:


2.NameServer启动时会解析哪些配置

(1)什么是NamesrvController

(2)NamesrvController是如何被创建出来的

(3)NameServer的两个配置类

(4)NameServer两个配置类的解析

(5)完成NamesrvController组件的创建


(1)什么是NamesrvController

当执行NamesrvStartup的main()方法时,会执行NamesrvStartup的main0()方法:

//下面这个NamesrvStartup类,就是最为关键的NameServer进程的启动类
public class NamesrvStartup {
    ...
    //NameServer进程启动时,会执行NamesrvStartup类的main()方法
    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {
        //NamesrvController是NameServer的核心代码组件
        //NameServer一启动,就会去启动这个NamesrvController
        try {
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
    ...
}

在上述源码中,有这么一行代码:

NamesrvController controller = createNamesrvController(args);

这行代码就是在创建一个NamesrvController类,这个类是NameServer中的一个核心组件。由于NameServer需要接收Broker发送过来的要把自己注册到NameServer上的请求(因为知道有哪些Broker和管理Broker),以及需要接收客户端发送过来的从NameServer拉取元数据的请求(因为客户端需要知道一个Topic的MessageQueue都在哪些Broker上),所以才需要在NameServer中创建NamesrvController这个类专门用来接收Broker和客户端的网络请求。如下图示,NamesrvController组件是NameServer中的核心组件,用来负责接受网络请求的。

(2)NamesrvController是如何被创建出来的

NamesrvStartup的createNamesrvController()方法会创建出NamesrvController这个关键组件。

public class NamesrvStartup {
    ...
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        //remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }
    ...
}

(3)NameServer的两个配置类

public class NamesrvStartup {
    ...
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        ...
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        ...
    }
    ...
}

在createNamesrvController()方法中,创建了NamesrvConfig和NettyServerConfig两个配置类。其中,NamesrvConfig配置了NameServer自身运行的一些参数NettyServerConfig配置了用于接收网络请求的Netty服务器的一些参数


在这里也能知道NameServer对外接收Broker和客户端的网络请求时,是基于Netty来实现网络服务器的,而且可知NameServer默认的监听请求端口号就是9876。

NameServer的两个配置类如下:

public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //RocketMQ的home主目录地址,它其实就是尝试去获取ROCKETMQ_HOME这个环境变量的值
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    //NameServer存放kv配置属性的路径
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    //NameServer自己的配置存储路径
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    //生产环境的名称,默认为center
    private String productEnvName = "center";
    //是否启动了clusterTest测试集群,默认是false
    private boolean clusterTest = false;
    //是否支持有序消息,默认是false,不支持
    private boolean orderMessageEnable = false;
    ...
}
public class NettyServerConfig implements Cloneable {
    //NettyServer默认的监听端口号是8888,但在NamesrvController中这个端口号会被设置为9876
    private int listenPort = 8888;
    //NettyServer的工作线程数量,默认是8
    private int serverWorkerThreads = 8;
    //Netty的public线程池的线程数量,默认是0
    private int serverCallbackExecutorThreads = 0;
    //Netty的IO线程池的线程数量,默认是3,这里的线程是负责解析网络请求的
    //这里的线程解析完网络请求后,就会把请求转发给work线程来处理
    private int serverSelectorThreads = 3;
    
    //下面两个是Broker端的参数:就是Broker端在基于Netty构建网络服务器时,会使用下面两个参数
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    
    //如果一个网络连接空闲超时120s,就会被关闭
    private int serverChannelMaxIdleTimeSeconds = 120;
    
    //socket send buffer缓冲区以及receive buffer缓冲区的大小
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    
    private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
    private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
    private int serverSocketBacklog = NettySystemConfig.socketBacklog;
    
    //ByteBuffer是否开启缓存,默认是开启的
    private boolean serverPooledByteBufAllocatorEnable = true;
    
    //是否启动epoll IO模型,默认是不开启的
    private boolean useEpollNativeSelector = false;
    ...
}

(4)NameServer两个配置类的解析

 public class NamesrvStartup {
    ...
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        ...
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        //下面这段代码的意思就是,如果用mqnamesrv启动时,带上了"-c"这个选项
        //那么"-c"这个选项的意思就是带上一个配置文件的地址,接着它就可以读取配置文件里的内容
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                //基于输入流从配置文件里读取配置,读取的配置会放入一个Properties里
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);

                //然后可以基于工具类,把读取到的配置都放入到两个核心配置类里
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }
        ...
    }
    ...
}

上述代码意思是:在启动NameServer时,如果使用了-c选项带上一个配置文件的地址,那么就会把配置文件里的配置放入这两个配置类中。比如有一个配置文件是nameserver.properties,里面有一个配置是serverWorkerThreads=16,那么就会读取出这个配置,然后覆盖到NettyServerConfig中。


接着看配置相关的剩余代码;

public class NamesrvStartup {
    ...
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        ...
        //下面这段代码的意思是,如果用mqnamesrv启动时,带上了"-p"这个选项
        //那么它的意思就是print,会打印处出NameServer的所有配置信息
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        //下面一行代码会把mqnamesrv命令行中带上的配置选项,都读取出来,然后覆盖到NamesrvConfig里
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        //如果发现ROCKETMQ_HOME是空的,那么就会输出一个异常日志,提示设置ROCKETMQ_HOME这个环境变量
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        //下面5行代码也是日志、配置相关的,与Logger、Configurator相关
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        //这里会打印一下NameServer的所有配置信息
        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        ...
    }
    ...
}

所以,NameServer启动时,刚开始就是在初始化和解析NameServerConfig、NettyServerConfig相关的配置信息。

由于NameServer刚启动会初始化和解析一些核心配置信息,尤其是NettyServer的一些网络配置信息,所以初始化配置信息后,会打印如下启动日志:

2020-02-05 15:10:05 INFO main - rocketmqHome=rocketmq-nameserver
2020-02-05 15:10:05 INFO main - kvConfigPath=namesrv/kvConfig.json
2020-02-05 15:10:05 INFO main - configStorePath=namesrv/namesrv.properties
2020-02-05 15:10:05 INFO main - productEnvName=center
2020-02-05 15:10:05 INFO main - clusterTest=false
2020-02-05 15:10:05 INFO main - orderMessageEnable=false
2020-02-05 15:10:05 INFO main - listenPort=9876
2020-02-05 15:10:05 INFO main - serverWorkerThreads=8
2020-02-05 15:10:05 INFO main - serverCallbackExecutorThreads=0
2020-02-05 15:10:05 INFO main - serverSelectorThreads=3
2020-02-05 15:10:05 INFO main - serverOnewaySemaphoreValue=256
2020-02-05 15:10:05 INFO main - serverAsyncSemaphoreValue=64
2020-02-05 15:10:05 INFO main - serverChannelMaxIdleTimeSeconds=120
2020-02-05 15:10:05 INFO main - serverSocketSndBufSize=65535
2020-02-05 15:10:05 INFO main - serverSocketRcvBufSize=65535
2020-02-05 15:10:05 INFO main - serverPooledByteBufAllocatorEnable=true
2020-02-05 15:10:05 INFO main - useEpollNativeSelector=false

(5)完成NamesrvController组件的创建

public class NamesrvStartup {
    ...
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        ...
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
      
        //remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);
        ...
    }
    ...
}

这里直接创建了NamesrvController组件,同时传递NamesrvConfig和NettyServerConfig这两个配置类给它。如下图示:


3.NameServer如何初始化Netty网络服务器

(1)NamesrvController的创建总结

(2)NamesrvController的构造函数

(3)NamesrvController启动时的工作

(4)创建NettyRemotingServer网络服务器组件

(5)NettyRemotingServer的初始化


(1)NamesrvController的创建总结

NameServer架构图如下:

由前面分析可知:NameServer启动时首先会解析配置文件,然后初始化NamesrvConfig和NettyServerConfig两个配置类,接着基于这两个配置类构建出NamesrvController组件。


而且根据类名可以推测出,NamesrvController内部肯定会包含基于Netty实现的网络通信组件。所以上面NameServer架构图中的Netty网络服务器,会负责监听和处理Broker与客户端发来的网络请求。


(2)NamesrvController的构造函数

NamesrvController被创建出来后,需要启动Netty网络服务器,这样NameServer才能在默认的9876端口上接收Broker和客户端的网络请求,比如Broker注册自己、客户端拉取Broker路由数据等。


在NamesrvController的构造函数中,其实就是保存一些实例变量的值而已,并没做什么实质性事情。

public class NamesrvController {
    ...
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }
    ...
}

所以仅仅创建出一个NamesrvController实例还是不够,后续必须要有一些关键代码来启动里面的Netty服务器,才能接收网络请求。


(3)NamesrvController启动时的工作

回到NameServer启动时执行的main0()方法中:

public class NamesrvStartup {
    ...
    public static NamesrvController main0(String[] args) {
        //NamesrvController是NameServer的核心代码组件
        //NameServer一启动,就会去启动这个NamesrvController
        try {
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
    ...
}

由上述代码可知,启动NameServer有两项工作:

工作一:创建NamesrvController,其中会创建和初始化两个配置类

工作二:执行start(controller)方法,也就是启动NamesrvController这个核心组件


NamesrvStartup的start()方法如下,首先会执行NamesrvController的initialize()方法初始化请求处理组件,然后会执行NamesrvController的start()方法启动NamesrvController请求处理组件

public class NamesrvStartup {
    ...
    public static NamesrvController start(final NamesrvController controller) throws Exception {
        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        
        //执行NamesrvController的initialize()方法进行初始化操作
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        
        //执行NamesrvController的start()方法启动NamesrvController
        controller.start();
        return controller;
    }
    ...
}

(4)创建NettyRemotingServer网络服务器组件

执行NamesrvController的initialize()方法进行初始化操作时,会创建一个NettyRemotingServer网络服务器组件

public class NamesrvController {
    ...
    public boolean initialize() {
        this.kvConfigManager.load();
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        ...
    }
    ...
}

(5)NettyRemotingServer的初始化

NettyRemotingServer构造方法的代码如下:

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    ...
    public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        ...
    }
    ...
}

其中便通过Netty的ServerBootstrap类,创建一个Netty网络服务器。NettyRemotingServer是RocketMQ开发的一个网络服务器组件,它会基于Netty提供的ServerBootstrap类来实现一个Netty网络服务器。


4.NameServer如何启动Netty网络服务器

(1)NamesrvController初始化过程的完整代码

(2)NamesrvStartup.start()方法的工作

(3)Netty网络服务器是如何启动的


(1)NamesrvController初始化过程的完整代码

在执行NamesrvController的initialize()方法进行请求处理组件的初始化时,会创建一个NettyRemotingServer网络服务器组件。在创建NettyRemotingServer时,会创建一个ServerBootstrap的Netty网络服务器


接下来是
NamesrvController.initialize()方法的完整源码:

public class NamesrvController {
    ...
    public boolean initialize() {
        //加载kv之类的配置
        this.kvConfigManager.load();
        //初始化Netty网络服务器
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //Netty网络服务器的工作线程池
        this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //下面这行代码会把工作线程池交给Netty网络服务器
        this.registerProcessor();
        
        //下面这行代码就是启动一个后台线程,执行定时任务
        //从scanNotActiveBroker()可知这里会定时扫描哪些Broker没发送心跳而挂掉的
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        
        //下面这行代码就也是启动一个后台线程,执行定时任务
        //不过只是定时打印kv配置信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);


        //与FileWatchService相关
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            //Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    }
                );
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }
        return true;
    }
    ...
}

可见,
NamesrvController.initialize()方法的主要工作还是
初始化Netty网络服务器,其他的工作就是启动后台线程执行一些定时任务


(2)NamesrvStartup.start()方法的工作

public class NamesrvStartup {
    ...
    public static NamesrvController start(final NamesrvController controller) throws Exception {
        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        //1.初始化NamesrvController请求处理组件
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        //2.通过Runtime类注册一个JVM关闭时的shutdown钩子
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        //3.启动NamesrvController请求处理组件
        controller.start();
        return controller;
    }
    
    public void shutdown() {
        //1.首先关闭NettyRemotingServer释放网络资源
        this.remotingServer.shutdown();
        //2.然后关闭RemotingExecutor释放Netty网络服务器的工作线程池资源
        this.remotingExecutor.shutdown();
        //3.最后关闭ScheduledExecutorService释放执行定时任务的后台线程资源
        this.scheduledExecutorService.shutdown();


        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    }
    ...
}

在NamesrvStartup的start()方法中:首先会执行NamesrvController的initialize()方法初始化请求处理组件,在初始化的过程中,会创建出一个Netty网络服务器。


然后会通过Runtime类注册一个JVM关闭时的shutdown钩子,当JVM关闭时需要执行注册的回调函数,在回调函数中会执行NamesrvController的shutdown()方法关闭Netty服务器释放网络资源和关闭线程池释放线程资源。


最后便会执行NamesrvController的start()方法启动NamesrvController请求处理组件。这样,Netty网络服务器才会监听9876这个默认的端口号。


(3)Netty网络服务器是如何启动的

NamesrvController.start()方法的源码如下所示。可以看到,启动NamesrvContorller的核心就是启动NettyRemotingServer,即启动Netty网络服务器。

public class NamesrvController {
    ...
    public void start() throws Exception {
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }
    ...
}

NettyRemotingServer.start()方法的源码如下:

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    ...
    @Override
    public void start() {
        ...
        //下面是基于Netty的API去配置和启动一个Netty网络服务器
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            //设置Netty网络服务器要监听的端口号9876
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer() {
                //设置一些网络请求处理器,只要Netty网络服务器收到一个请求,就会依次使用下面的处理器来处理请求
                //比如handshakeHandler就是负责连接握手、NettyDecoder是负责编码解码、IdleStateHandler是负责连接空闲管理
                //connectionManageHandler是负责网络连接管理、serverHandler是负责最关键的网络请求的处理
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                    .addLast(defaultEventExecutorGroup,
                        encoder,
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                        connectionManageHandler,
                        serverHandler
                    );
                }
            }
        );
        ...
        try {
            //下面这行代码就是启动Netty网络服务器,其中的bind()方法就是绑定和监听一个端口号
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
        ...
    }
    ...
}

至此Netty网络服务器便启动了,开始监听端口号9876,如下图示:

(4)总结

NameServer启动的核心就是:基于Netty实现了一个网络服务器,然后监听默认的9876端口号来接收Broker和客户端发送的网络请求。


当NameServer启动完之后,就需要关注:Broker是如何启动的如何向NameServer进行注册如何进行心跳NameServer又是如何管理Broker的


5.Broker启动时是如何初始化配置的

(1)NameServer的启动过程回顾

(2)BrokerStartup的入口源码分析

(3)创建Broker的几个配置组件

(4)为Broker的配置组件解析和填充信息


(1)NameServer的启动过程回顾

前面从NameServer的启动脚本开始,首先介绍了它的配置的初始化,然后介绍核心的NamesrvController请求处理组件的初始化和启动,最后通过源码发现其底层会构建一个Netty网络服务器监听9876端口


如下图示:当NameServer启动后,有一个Netty网络服务器监听9876端口,此时Broker和客户端就可以与NameServer建立长连接进行网络通信。

(2)BrokerStartup的入口源码分析

既然NameServer已经启动了,而且会有一个Netty网络服务器监听9876端口,等待接收Broker和客户端的连接和请求,接下来就看Broker是如何启动的了。


由于启动Broker是通过mqbroker脚本来实现的,所以脚本里一定会启动一个JVM进程来执行BrokerStartup的main()方法。这里就不再重复介绍Broker的启动脚本了,而直接分析BrokerStartup的main()方法。


这个BrokerStartup类,就在rocketmq源码中的broker模块里。其源码如下:

public class BrokerStartup {
    ...
    public static void main(String[] args) {
        start(createBrokerController(args));
    }
    ...
}

和NamesrvStratup的main()方法类似,同样会先创建一个Controller组件再用start()方法启动这个Controller组件


(3)创建Broker的几个配置组件

一.解析传递进来命令行参数

BrokerStartup的createBrokerContorller()方法源码如下:首先会通过ServerUtil的parseCmdLine()方法来解析传递进来命令行参数

public class BrokerStartup {
    ...
    public static BrokerController createBrokerController(String[] args) {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        try {
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }
            ...
        }
        ...
    }
    ...
}

二.创建Broker的几个配置组件

接着,createBrokerContorller()方法会创建Broker的几个配置组件。这些配置组件包括:Broker自己的配置BrokerConfig、Broker作为一个Netty服务器的配置NettyServerConfig、Broker作为一个Netty客户端的配置NettyClientConfig、Broker消息存储的配置MessageStoreConfig。

public class BrokerStartup {
    ...
    public static BrokerController createBrokerController(String[] args) {
        ...
        //下面三个是Broker的核心配置类,分别是Broker的配置、Netty服务器的配置、Netty客户端的配置
        final BrokerConfig brokerConfig = new BrokerConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
        //这里设置了Netty客户端是否使用TLS的配置,TLS是一个加密机制
        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        //设置Netty服务器的监听端口号是10911
        nettyServerConfig.setListenPort(10911);
        //接着新建一个很关键的配置组件MessageStoreConfig,这些配置与在Broker中存储消息有关
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

        //如果当前这个Broker是Slave,那么就要设置一个特殊的参数accessMessageInMemoryMaxRatio
        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
        }
        ...
    }
    ...
}

为什么Broker自己又是Netty服务器,又是Netty客户端?因为当客户端向Broker上发送请求时,Broker就是一个Netty服务器,负责监听客户端的连接请求。当Broker向NameServer发送请求时,Broker就是一个Netty客户端,要和NameServer的Netty服务器建立连接。


Broker的几个配置组件如下图示:

(4)为Broker的配置组件解析和填充信息

接下来看Broker启动时是如何为几个配置组件解析和填充信息的。如果在启动Broker时,用了-c选项带了一个配置文件的地址,那么就会读取配置文件里自定义的配置信息,然后读取出来覆盖到Broker的4个配置组件中。

public class BrokerStartup {
    ...
    public static BrokerController createBrokerController(String[] args) {
        ...
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                configFile = file;
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);

                properties2SystemEnv(properties);
                MixAll.properties2Object(properties, brokerConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                MixAll.properties2Object(properties, nettyClientConfig);
                MixAll.properties2Object(properties, messageStoreConfig);

                BrokerPathConfigHelper.setBrokerConfigPath(file);
                in.close();
            }
        }
        ...
    }
    ...
}

一般用mqbroker脚本启动Broker时,都需要提前创建好一个Broker配置文件,然后再使用-c选项带上这个配置文件的地址路径。这样,createBrokerController()方法便会读取到自定义的配置文件,填充到对应的Broker配置类里。


下面的源码,便介绍了Broker启动时是如何解析和填充配置的。

public class BrokerStartup {
    ...
    public static BrokerController createBrokerController(String[] args) {
        ...
        //下面这段代码,就是判断一下Broker的角色,针对不同的角色做处理
        switch (messageStoreConfig.getBrokerRole()) {
            case ASYNC_MASTER:
            case SYNC_MASTER:
                brokerConfig.setBrokerId(MixAll.MASTER_ID);
                break;
            case SLAVE:
                if (brokerConfig.getBrokerId() <= 0) {
                    System.out.printf("Slave's brokerId must be > 0");
                    System.exit(-3);
                }
                break;
            default:
                break;
        }
        //这里会判断是否是基于dleger技术来管理主从同步和commitlog
        //如果是的话,就把Broker设置为-1
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            brokerConfig.setBrokerId(-1);
        }
        //下面这一行的配置,就是设置了HA监听端口号
        messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
        //下面10行代码与日志相关
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        System.setProperty("brokerLogDir", "");
        if (brokerConfig.isIsolateLogEnable()) {
            System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
        }
        if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
            System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
        }
        configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

        //如果命令行中包含了-p参数,就在启动Broker时打印一下所有配置类的启动参数
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            MixAll.printObjectProperties(console, nettyClientConfig);
            MixAll.printObjectProperties(console, messageStoreConfig);
            System.exit(0);
        } else if (commandLine.hasOption('m')) {
            //如果命令行包含了-m参数,同样也是打印各种配置参数
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig, true);
            MixAll.printObjectProperties(console, nettyServerConfig, true);
            MixAll.printObjectProperties(console, nettyClientConfig, true);
            MixAll.printObjectProperties(console, messageStoreConfig, true);
            System.exit(0);
        }
        //下面5行代码会打印Broker的配置参数
        log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        MixAll.printObjectProperties(log, brokerConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        MixAll.printObjectProperties(log, nettyClientConfig);
        MixAll.printObjectProperties(log, messageStoreConfig);
        ...
    }
    ...
}

其实其他的开源项目,可能也有类似上述的代码。就是构建配置类读取配置文件的配置解析命令行的配置参数,然后进行各种配置的校验和设置


6.BrokerController的创建以及包含的组件

(1)BrokerController是在哪里创建出来的

(2)为什么叫BrokerController

(3)BrokerController的构造函数会创建大量的组件和线程队列


(1)BrokerController是在哪里创建出来的

上面介绍了Broker在启动时,首先会执行BrokerStartup的createBrokerController()方法。在这个方法里,会初始化以及解析Broker的4个配置组件,如下图示。

这4个配置组件在初始化时,实际上就是用默认的配置参数值以及配置文件里的配置参数值、包括命令行传递的配置参数值,填充到配置组件中。然后在后续Broker运行的过程中,各种行为都会根据这些配置组件里的配置参数值来决定。


那么在准备好上述4个配置组件后,接下来就会创建最核心的Broker组件了。

public class BrokerStartup {
    ...
    public static BrokerController createBrokerController(String[] args) {
        ...
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig
        );
        controller.getConfiguration().registerConfig(properties);
        ...
    }
    ...
}

上述源码会创建一个BrokerController组件,这个BrokerController组件可以认为就是Broker自己


(2)为什么叫BrokerController

下面介绍BrokerStartup、BrokerController和Broker之间的关系,以及为什么要这么设计。


首先BrokerStartup这个类是用来启动Broker的一个类,BrokerStartup里包含了对Broker进行初始化完成Broker的启动工作的逻辑,所以BrokerStartup本身并不能代表一个Broker


然后BrokerController类似于Java Web开发中使用Spring MVC框架开发的一系列Controller,专门用来负责接收和处理请求。但是毕竟中间件系统的架构设计思想和普通的Java Web业务系统还是不一样的,这两种Contorller还是存在区别。


BrokerController可以认为是Broker管理控制组件,也就是这个组件被创建出来以及完成初始化之后,是用来控制当前正在运行的Broker的。因此,使用mqbroker脚本启动的JVM进程,实际上也可以认为就是一个Broker。这里的Broker代表了一个JVM进程而不是一个代码组件。而BrokerStartup作为一个拥有main()方法的类,则是一个代码组件


因此,BrokerStartup的作用就是准备好4个配置组件,然后创建和启动BrokerController这个核心组件。也就是启动一个Broker管理控制组件,让BrokerController去控制和管理Broker这个JVM进程运行过程中的一切行为,包括接收网络请求、管理磁盘上的消息数据,以及管理后台线程的运行。


Broker、BrokerStartup、BrokerController之间的关系如下图示:

总结:首先Broker不是一个代码组件,而是用mqbroker脚本启动的JVM进程。然后BrokerStartup是用来启动JVM进程的拥有一个main()方法的类,它是一个启动组件,负责初始化4个配置组件,并基于这4个配置组件去启动BrokerControler这个管理控制组件。接着,在Broker这个JVM进程的运行期间,都是由BrokerController管理控制组件去管理Broker的请求处理、后台线程以及磁盘数据的。


(3)BrokerController的构造函数会创建大量的组件和线程队列

public class BrokerController {
    ...
    public BrokerController(final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig) {
        //下面的4行代码就是把4个配置组件保存到本地缓存
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;

        //接下来都是Broker的各种功能对应的组件
        //比如consumerOffsetManager是专门管理Consumer消费offset的
        //topicConfigManager是管理Topic配置的,pullMessageProcessor是处理Consumer发送请求过来拉取消息的
        //所以可以认为,Broker会有很多很多功能,每一个功能都是由一个组件来负责的
        //因此Broker在初始化时,内部就会有很多代码组件
        this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
        this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
        this.pullMessageProcessor = new PullMessageProcessor(this);
        this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
        this.consumerFilterManager = new ConsumerFilterManager(this);
        this.producerManager = new ProducerManager();
        this.clientHousekeepingService = new ClientHousekeepingService(this);
        this.broker2Client = new Broker2Client(this);
        this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        this.filterServerManager = new FilterServerManager(this);

        this.slaveSynchronize = new SlaveSynchronize(this);

        //接下来是线程池的队列,它们是用来实现某些功能的后台线程池的队列
        //不同的后台线程和处理请求的线程放在不同的线程池里去执行
        //因为有些Broker的功能是接收请求进行处理时会用到上面的一些组件、有些Broker的功能就是由自己的后台线程去执行
        this.sendThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity());
        this.putThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPutThreadPoolQueueCapacity());
        this.pullThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.replyThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getReplyThreadPoolQueueCapacity());
        this.queryThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity());
        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
        this.endTransactionThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity());

        //下面这些同样是Broker的一些功能性组件
        //比如brokerStatsManager就是metric统计组件,用于对Broker内部进行统计的
        //比如brokerFastFailure是用来处理Broker故障的组件
        this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? 
            new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : 
            new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
        this.brokerFastFailure = new BrokerFastFailure(this);
        this.configuration = new Configuration(
            log,
            BrokerPathConfigHelper.getBrokerConfigPath(),
            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
        );
    }
    ...
}

BrokerController内部有一系列的功能性组件,以及大量的后台线程池,如下图示: