大纲
1.NameServer的启动脚本
2.NameServer启动时会解析哪些配置
3.NameServer如何初始化Netty网络服务器
4.NameServer如何启动Netty网络服务器
5.Broker启动时是如何初始化配置的
6.BrokerController的创建以及包含的组件
7.BrokerController的初始化
8.BrokerController的启动
9.Broker如何把自己注册到NameServer上
10.BrokerOuterAPI是如何发送注册请求的
11.NameServer如何处理Broker的注册请求
16.Broker如何发送定时心跳的以及故障感知
1.NameServer的启动脚本
NameServer会通过rocketmq-master源码中distribution/bin目录下的mqnamesrv脚本来启动。在mqnamesrv脚本中,用于启动NameServer进程的命令如下。也就是使用sh命令执行runserver.sh脚本,然后通过这个脚本去启动NamesrvStartup这个Java类。
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@
在runserver.sh脚本中,启动NamesrvStartup这个Java类的命令如下:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
$JAVA ${JAVA_OPT} $@
可以看到,上述命令大致简化一下就是类似如下这样的一行命令:
java -server -Xms4g -Xmx4g -Xmn2g org.apache.rocketmq.namesrv.NamesrvStartup
通过Java命令 + 一个有main()方法的NamesrvStartup类,就会启动一个JVM进程。这个JVM进程会执行NamesrvStartup类中的main()方法,从而完成启动NameServer需要的所有工作。
所以,启动NameServer的过程如下图示:
2.NameServer启动时会解析哪些配置
(1)什么是NamesrvController
(2)NamesrvController是如何被创建出来的
(3)NameServer的两个配置类
(4)NameServer两个配置类的解析
(5)完成NamesrvController组件的创建
(1)什么是NamesrvController
当执行NamesrvStartup的main()方法时,会执行NamesrvStartup的main0()方法:
//下面这个NamesrvStartup类,就是最为关键的NameServer进程的启动类
public class NamesrvStartup {
...
//NameServer进程启动时,会执行NamesrvStartup类的main()方法
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
//NamesrvController是NameServer的核心代码组件
//NameServer一启动,就会去启动这个NamesrvController
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
...
}
在上述源码中,有这么一行代码:
NamesrvController controller = createNamesrvController(args);
这行代码就是在创建一个NamesrvController类,这个类是NameServer中的一个核心组件。由于NameServer需要接收Broker发送过来的要把自己注册到NameServer上的请求(因为知道有哪些Broker和管理Broker),以及需要接收客户端发送过来的从NameServer拉取元数据的请求(因为客户端需要知道一个Topic的MessageQueue都在哪些Broker上),所以才需要在NameServer中创建NamesrvController这个类专门用来接收Broker和客户端的网络请求。如下图示,NamesrvController组件是NameServer中的核心组件,用来负责接受网络请求的。
(2)NamesrvController是如何被创建出来的
NamesrvStartup的createNamesrvController()方法会创建出NamesrvController这个关键组件。
public class NamesrvStartup {
...
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
...
}
(3)NameServer的两个配置类
public class NamesrvStartup {
...
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
...
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
...
}
...
}
在createNamesrvController()方法中,创建了NamesrvConfig和NettyServerConfig两个配置类。其中,NamesrvConfig配置了NameServer自身运行的一些参数,NettyServerConfig配置了用于接收网络请求的Netty服务器的一些参数。
在这里也能知道NameServer对外接收Broker和客户端的网络请求时,是基于Netty来实现网络服务器的,而且可知NameServer默认的监听请求端口号就是9876。
NameServer的两个配置类如下:
public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//RocketMQ的home主目录地址,它其实就是尝试去获取ROCKETMQ_HOME这个环境变量的值
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//NameServer存放kv配置属性的路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//NameServer自己的配置存储路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
//生产环境的名称,默认为center
private String productEnvName = "center";
//是否启动了clusterTest测试集群,默认是false
private boolean clusterTest = false;
//是否支持有序消息,默认是false,不支持
private boolean orderMessageEnable = false;
...
}
public class NettyServerConfig implements Cloneable {
//NettyServer默认的监听端口号是8888,但在NamesrvController中这个端口号会被设置为9876
private int listenPort = 8888;
//NettyServer的工作线程数量,默认是8
private int serverWorkerThreads = 8;
//Netty的public线程池的线程数量,默认是0
private int serverCallbackExecutorThreads = 0;
//Netty的IO线程池的线程数量,默认是3,这里的线程是负责解析网络请求的
//这里的线程解析完网络请求后,就会把请求转发给work线程来处理
private int serverSelectorThreads = 3;
//下面两个是Broker端的参数:就是Broker端在基于Netty构建网络服务器时,会使用下面两个参数
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
//如果一个网络连接空闲超时120s,就会被关闭
private int serverChannelMaxIdleTimeSeconds = 120;
//socket send buffer缓冲区以及receive buffer缓冲区的大小
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
//ByteBuffer是否开启缓存,默认是开启的
private boolean serverPooledByteBufAllocatorEnable = true;
//是否启动epoll IO模型,默认是不开启的
private boolean useEpollNativeSelector = false;
...
}
(4)NameServer两个配置类的解析
public class NamesrvStartup {
...
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
...
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
//下面这段代码的意思就是,如果用mqnamesrv启动时,带上了"-c"这个选项
//那么"-c"这个选项的意思就是带上一个配置文件的地址,接着它就可以读取配置文件里的内容
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
//基于输入流从配置文件里读取配置,读取的配置会放入一个Properties里
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
//然后可以基于工具类,把读取到的配置都放入到两个核心配置类里
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
...
}
...
}
上述代码意思是:在启动NameServer时,如果使用了-c选项带上一个配置文件的地址,那么就会把配置文件里的配置放入这两个配置类中。比如有一个配置文件是nameserver.properties,里面有一个配置是serverWorkerThreads=16,那么就会读取出这个配置,然后覆盖到NettyServerConfig中。
接着看配置相关的剩余代码;
public class NamesrvStartup {
...
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
...
//下面这段代码的意思是,如果用mqnamesrv启动时,带上了"-p"这个选项
//那么它的意思就是print,会打印处出NameServer的所有配置信息
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//下面一行代码会把mqnamesrv命令行中带上的配置选项,都读取出来,然后覆盖到NamesrvConfig里
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//如果发现ROCKETMQ_HOME是空的,那么就会输出一个异常日志,提示设置ROCKETMQ_HOME这个环境变量
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//下面5行代码也是日志、配置相关的,与Logger、Configurator相关
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
//这里会打印一下NameServer的所有配置信息
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
...
}
...
}
所以,NameServer启动时,刚开始就是在初始化和解析NameServerConfig、NettyServerConfig相关的配置信息。
由于NameServer刚启动会初始化和解析一些核心配置信息,尤其是NettyServer的一些网络配置信息,所以初始化配置信息后,会打印如下启动日志:
2020-02-05 15:10:05 INFO main - rocketmqHome=rocketmq-nameserver
2020-02-05 15:10:05 INFO main - kvConfigPath=namesrv/kvConfig.json
2020-02-05 15:10:05 INFO main - configStorePath=namesrv/namesrv.properties
2020-02-05 15:10:05 INFO main - productEnvName=center
2020-02-05 15:10:05 INFO main - clusterTest=false
2020-02-05 15:10:05 INFO main - orderMessageEnable=false
2020-02-05 15:10:05 INFO main - listenPort=9876
2020-02-05 15:10:05 INFO main - serverWorkerThreads=8
2020-02-05 15:10:05 INFO main - serverCallbackExecutorThreads=0
2020-02-05 15:10:05 INFO main - serverSelectorThreads=3
2020-02-05 15:10:05 INFO main - serverOnewaySemaphoreValue=256
2020-02-05 15:10:05 INFO main - serverAsyncSemaphoreValue=64
2020-02-05 15:10:05 INFO main - serverChannelMaxIdleTimeSeconds=120
2020-02-05 15:10:05 INFO main - serverSocketSndBufSize=65535
2020-02-05 15:10:05 INFO main - serverSocketRcvBufSize=65535
2020-02-05 15:10:05 INFO main - serverPooledByteBufAllocatorEnable=true
2020-02-05 15:10:05 INFO main - useEpollNativeSelector=false
(5)完成NamesrvController组件的创建
public class NamesrvStartup {
...
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
...
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
...
}
...
}
这里直接创建了NamesrvController组件,同时传递NamesrvConfig和NettyServerConfig这两个配置类给它。如下图示:
3.NameServer如何初始化Netty网络服务器
(1)NamesrvController的创建总结
(2)NamesrvController的构造函数
(3)NamesrvController启动时的工作
(4)创建NettyRemotingServer网络服务器组件
(5)NettyRemotingServer的初始化
(1)NamesrvController的创建总结
NameServer架构图如下:
由前面分析可知:NameServer启动时首先会解析配置文件,然后初始化NamesrvConfig和NettyServerConfig两个配置类,接着基于这两个配置类构建出NamesrvController组件。
而且根据类名可以推测出,NamesrvController内部肯定会包含基于Netty实现的网络通信组件。所以上面NameServer架构图中的Netty网络服务器,会负责监听和处理Broker与客户端发来的网络请求。
(2)NamesrvController的构造函数
NamesrvController被创建出来后,需要启动Netty网络服务器,这样NameServer才能在默认的9876端口上接收Broker和客户端的网络请求,比如Broker注册自己、客户端拉取Broker路由数据等。
在NamesrvController的构造函数中,其实就是保存一些实例变量的值而已,并没做什么实质性事情。
public class NamesrvController {
...
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
...
}
所以仅仅创建出一个NamesrvController实例还是不够,后续必须要有一些关键代码来启动里面的Netty服务器,才能接收网络请求。
(3)NamesrvController启动时的工作
回到NameServer启动时执行的main0()方法中:
public class NamesrvStartup {
...
public static NamesrvController main0(String[] args) {
//NamesrvController是NameServer的核心代码组件
//NameServer一启动,就会去启动这个NamesrvController
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
...
}
由上述代码可知,启动NameServer有两项工作:
工作一:创建NamesrvController,其中会创建和初始化两个配置类
工作二:执行start(controller)方法,也就是启动NamesrvController这个核心组件
NamesrvStartup的start()方法如下,首先会执行NamesrvController的initialize()方法初始化请求处理组件,然后会执行NamesrvController的start()方法启动NamesrvController请求处理组件。
public class NamesrvStartup {
...
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
//执行NamesrvController的initialize()方法进行初始化操作
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
//执行NamesrvController的start()方法启动NamesrvController
controller.start();
return controller;
}
...
}
(4)创建NettyRemotingServer网络服务器组件
执行NamesrvController的initialize()方法进行初始化操作时,会创建一个NettyRemotingServer网络服务器组件。
public class NamesrvController {
...
public boolean initialize() {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
...
}
...
}
(5)NettyRemotingServer的初始化
NettyRemotingServer构造方法的代码如下:
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
...
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
...
}
...
}
其中便通过Netty的ServerBootstrap类,创建一个Netty网络服务器。NettyRemotingServer是RocketMQ开发的一个网络服务器组件,它会基于Netty提供的ServerBootstrap类来实现一个Netty网络服务器。
4.NameServer如何启动Netty网络服务器
(1)NamesrvController初始化过程的完整代码
(2)NamesrvStartup.start()方法的工作
(3)Netty网络服务器是如何启动的
(1)NamesrvController初始化过程的完整代码
在执行NamesrvController的initialize()方法进行请求处理组件的初始化时,会创建一个NettyRemotingServer网络服务器组件。在创建NettyRemotingServer时,会创建一个ServerBootstrap的Netty网络服务器。
接下来是
NamesrvController.initialize()方法的完整源码:
public class NamesrvController {
...
public boolean initialize() {
//加载kv之类的配置
this.kvConfigManager.load();
//初始化Netty网络服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//Netty网络服务器的工作线程池
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//下面这行代码会把工作线程池交给Netty网络服务器
this.registerProcessor();
//下面这行代码就是启动一个后台线程,执行定时任务
//从scanNotActiveBroker()可知这里会定时扫描哪些Broker没发送心跳而挂掉的
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//下面这行代码就也是启动一个后台线程,执行定时任务
//不过只是定时打印kv配置信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//与FileWatchService相关
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
//Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
}
);
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
...
}
可见,
NamesrvController.initialize()方法的主要工作还是初始化Netty网络服务器,其他的工作就是启动后台线程执行一些定时任务。
(2)NamesrvStartup.start()方法的工作
public class NamesrvStartup {
...
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
//1.初始化NamesrvController请求处理组件
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//2.通过Runtime类注册一个JVM关闭时的shutdown钩子
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
//3.启动NamesrvController请求处理组件
controller.start();
return controller;
}
public void shutdown() {
//1.首先关闭NettyRemotingServer释放网络资源
this.remotingServer.shutdown();
//2.然后关闭RemotingExecutor释放Netty网络服务器的工作线程池资源
this.remotingExecutor.shutdown();
//3.最后关闭ScheduledExecutorService释放执行定时任务的后台线程资源
this.scheduledExecutorService.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
...
}
在NamesrvStartup的start()方法中:首先会执行NamesrvController的initialize()方法初始化请求处理组件,在初始化的过程中,会创建出一个Netty网络服务器。
然后会通过Runtime类注册一个JVM关闭时的shutdown钩子,当JVM关闭时需要执行注册的回调函数,在回调函数中会执行NamesrvController的shutdown()方法关闭Netty服务器释放网络资源和关闭线程池释放线程资源。
最后便会执行NamesrvController的start()方法启动NamesrvController请求处理组件。这样,Netty网络服务器才会监听9876这个默认的端口号。
(3)Netty网络服务器是如何启动的
NamesrvController.start()方法的源码如下所示。可以看到,启动NamesrvContorller的核心就是启动NettyRemotingServer,即启动Netty网络服务器。
public class NamesrvController {
...
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
...
}
NettyRemotingServer.start()方法的源码如下:
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
...
@Override
public void start() {
...
//下面是基于Netty的API去配置和启动一个Netty网络服务器
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
//设置Netty网络服务器要监听的端口号9876
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer() {
//设置一些网络请求处理器,只要Netty网络服务器收到一个请求,就会依次使用下面的处理器来处理请求
//比如handshakeHandler就是负责连接握手、NettyDecoder是负责编码解码、IdleStateHandler是负责连接空闲管理
//connectionManageHandler是负责网络连接管理、serverHandler是负责最关键的网络请求的处理
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
}
);
...
try {
//下面这行代码就是启动Netty网络服务器,其中的bind()方法就是绑定和监听一个端口号
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
...
}
...
}
至此Netty网络服务器便启动了,开始监听端口号9876,如下图示:
(4)总结
NameServer启动的核心就是:基于Netty实现了一个网络服务器,然后监听默认的9876端口号来接收Broker和客户端发送的网络请求。
当NameServer启动完之后,就需要关注:Broker是如何启动的、如何向NameServer进行注册、如何进行心跳、NameServer又是如何管理Broker的。
5.Broker启动时是如何初始化配置的
(1)NameServer的启动过程回顾
(2)BrokerStartup的入口源码分析
(3)创建Broker的几个配置组件
(4)为Broker的配置组件解析和填充信息
(1)NameServer的启动过程回顾
前面从NameServer的启动脚本开始,首先介绍了它的配置的初始化,然后介绍核心的NamesrvController请求处理组件的初始化和启动,最后通过源码发现其底层会构建一个Netty网络服务器监听9876端口。
如下图示:当NameServer启动后,有一个Netty网络服务器监听9876端口,此时Broker和客户端就可以与NameServer建立长连接进行网络通信。
(2)BrokerStartup的入口源码分析
既然NameServer已经启动了,而且会有一个Netty网络服务器监听9876端口,等待接收Broker和客户端的连接和请求,接下来就看Broker是如何启动的了。
由于启动Broker是通过mqbroker脚本来实现的,所以脚本里一定会启动一个JVM进程来执行BrokerStartup的main()方法。这里就不再重复介绍Broker的启动脚本了,而直接分析BrokerStartup的main()方法。
这个BrokerStartup类,就在rocketmq源码中的broker模块里。其源码如下:
public class BrokerStartup {
...
public static void main(String[] args) {
start(createBrokerController(args));
}
...
}
和NamesrvStratup的main()方法类似,同样会先创建一个Controller组件,再用start()方法启动这个Controller组件。
(3)创建Broker的几个配置组件
一.解析传递进来命令行参数
BrokerStartup的createBrokerContorller()方法源码如下:首先会通过ServerUtil的parseCmdLine()方法来解析传递进来命令行参数。
public class BrokerStartup {
...
public static BrokerController createBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
try {
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
...
}
...
}
...
}
二.创建Broker的几个配置组件
接着,createBrokerContorller()方法会创建Broker的几个配置组件。这些配置组件包括:Broker自己的配置BrokerConfig、Broker作为一个Netty服务器的配置NettyServerConfig、Broker作为一个Netty客户端的配置NettyClientConfig、Broker消息存储的配置MessageStoreConfig。
public class BrokerStartup {
...
public static BrokerController createBrokerController(String[] args) {
...
//下面三个是Broker的核心配置类,分别是Broker的配置、Netty服务器的配置、Netty客户端的配置
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
//这里设置了Netty客户端是否使用TLS的配置,TLS是一个加密机制
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
//设置Netty服务器的监听端口号是10911
nettyServerConfig.setListenPort(10911);
//接着新建一个很关键的配置组件MessageStoreConfig,这些配置与在Broker中存储消息有关
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
//如果当前这个Broker是Slave,那么就要设置一个特殊的参数accessMessageInMemoryMaxRatio
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
...
}
...
}
为什么Broker自己又是Netty服务器,又是Netty客户端?因为当客户端向Broker上发送请求时,Broker就是一个Netty服务器,负责监听客户端的连接请求。当Broker向NameServer发送请求时,Broker就是一个Netty客户端,要和NameServer的Netty服务器建立连接。
Broker的几个配置组件如下图示:
(4)为Broker的配置组件解析和填充信息
接下来看Broker启动时是如何为几个配置组件解析和填充信息的。如果在启动Broker时,用了-c选项带了一个配置文件的地址,那么就会读取配置文件里自定义的配置信息,然后读取出来覆盖到Broker的4个配置组件中。
public class BrokerStartup {
...
public static BrokerController createBrokerController(String[] args) {
...
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
...
}
...
}
一般用mqbroker脚本启动Broker时,都需要提前创建好一个Broker配置文件,然后再使用-c选项带上这个配置文件的地址路径。这样,createBrokerController()方法便会读取到自定义的配置文件,填充到对应的Broker配置类里。
下面的源码,便介绍了Broker启动时是如何解析和填充配置的。
public class BrokerStartup {
...
public static BrokerController createBrokerController(String[] args) {
...
//下面这段代码,就是判断一下Broker的角色,针对不同的角色做处理
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
//这里会判断是否是基于dleger技术来管理主从同步和commitlog
//如果是的话,就把Broker设置为-1
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
//下面这一行的配置,就是设置了HA监听端口号
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
//下面10行代码与日志相关
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
System.setProperty("brokerLogDir", "");
if (brokerConfig.isIsolateLogEnable()) {
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
}
if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
}
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
//如果命令行中包含了-p参数,就在启动Broker时打印一下所有配置类的启动参数
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
//如果命令行包含了-m参数,同样也是打印各种配置参数
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
//下面5行代码会打印Broker的配置参数
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
...
}
...
}
其实其他的开源项目,可能也有类似上述的代码。就是构建配置类、读取配置文件的配置、解析命令行的配置参数,然后进行各种配置的校验和设置。
6.BrokerController的创建以及包含的组件
(1)BrokerController是在哪里创建出来的
(2)为什么叫BrokerController
(3)BrokerController的构造函数会创建大量的组件和线程队列
(1)BrokerController是在哪里创建出来的
上面介绍了Broker在启动时,首先会执行BrokerStartup的createBrokerController()方法。在这个方法里,会初始化以及解析Broker的4个配置组件,如下图示。
这4个配置组件在初始化时,实际上就是用默认的配置参数值以及配置文件里的配置参数值、包括命令行传递的配置参数值,填充到配置组件中。然后在后续Broker运行的过程中,各种行为都会根据这些配置组件里的配置参数值来决定。
那么在准备好上述4个配置组件后,接下来就会创建最核心的Broker组件了。
public class BrokerStartup {
...
public static BrokerController createBrokerController(String[] args) {
...
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig
);
controller.getConfiguration().registerConfig(properties);
...
}
...
}
上述源码会创建一个BrokerController组件,这个BrokerController组件可以认为就是Broker自己。
(2)为什么叫BrokerController
下面介绍BrokerStartup、BrokerController和Broker之间的关系,以及为什么要这么设计。
首先BrokerStartup这个类是用来启动Broker的一个类,BrokerStartup里包含了对Broker进行初始化和完成Broker的启动工作的逻辑,所以BrokerStartup本身并不能代表一个Broker。
然后BrokerController类似于Java Web开发中使用Spring MVC框架开发的一系列Controller,专门用来负责接收和处理请求。但是毕竟中间件系统的架构设计思想和普通的Java Web业务系统还是不一样的,这两种Contorller还是存在区别。
BrokerController可以认为是Broker管理控制组件,也就是这个组件被创建出来以及完成初始化之后,是用来控制当前正在运行的Broker的。因此,使用mqbroker脚本启动的JVM进程,实际上也可以认为就是一个Broker。这里的Broker代表了一个JVM进程,而不是一个代码组件。而BrokerStartup作为一个拥有main()方法的类,则是一个代码组件。
因此,BrokerStartup的作用就是准备好4个配置组件,然后创建和启动BrokerController这个核心组件。也就是启动一个Broker管理控制组件,让BrokerController去控制和管理Broker这个JVM进程运行过程中的一切行为,包括接收网络请求、管理磁盘上的消息数据,以及管理后台线程的运行。
Broker、BrokerStartup、BrokerController之间的关系如下图示:
总结:首先Broker不是一个代码组件,而是用mqbroker脚本启动的JVM进程。然后BrokerStartup是用来启动JVM进程的拥有一个main()方法的类,它是一个启动组件,负责初始化4个配置组件,并基于这4个配置组件去启动BrokerControler这个管理控制组件。接着,在Broker这个JVM进程的运行期间,都是由BrokerController管理控制组件去管理Broker的请求处理、后台线程以及磁盘数据的。
(3)BrokerController的构造函数会创建大量的组件和线程队列
public class BrokerController {
...
public BrokerController(final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig) {
//下面的4行代码就是把4个配置组件保存到本地缓存
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
//接下来都是Broker的各种功能对应的组件
//比如consumerOffsetManager是专门管理Consumer消费offset的
//topicConfigManager是管理Topic配置的,pullMessageProcessor是处理Consumer发送请求过来拉取消息的
//所以可以认为,Broker会有很多很多功能,每一个功能都是由一个组件来负责的
//因此Broker在初始化时,内部就会有很多代码组件
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.filterServerManager = new FilterServerManager(this);
this.slaveSynchronize = new SlaveSynchronize(this);
//接下来是线程池的队列,它们是用来实现某些功能的后台线程池的队列
//不同的后台线程和处理请求的线程放在不同的线程池里去执行
//因为有些Broker的功能是接收请求进行处理时会用到上面的一些组件、有些Broker的功能就是由自己的后台线程去执行
this.sendThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.putThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPutThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity());
//下面这些同样是Broker的一些功能性组件
//比如brokerStatsManager就是metric统计组件,用于对Broker内部进行统计的
//比如brokerFastFailure是用来处理Broker故障的组件
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ?
new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) :
new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
...
}
BrokerController内部有一系列的功能性组件,以及大量的后台线程池,如下图示: