Kafka主题的创建过程(kafka 创建主题)
本篇将深入分析Kafka主题的创建过程。
可以使用kafka-topics.sh脚本创建主题,命令如下:
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic hello-topic
Created topic hello-topic.
使用上面的命令创建主题时,Kafka会自动为主题分区分配副本列表。另外,我们也可以使用以下命令,指定每个分区对应的副本列表:
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replica-assignment 1:3,1:2,2:3 --topic hello-topic
- --replica-assignment:依次指定每个分区的AR副本列表对应的Broker节点,格式为“p0-r0:p1-r1:..., p1-r0:p1-r1:...”(以“:”分隔同一个分区的不同副本,以“,”分隔不同的分区)。
在上面的例子中指定了新主题第一个分区的副本位于节点Broker1、Broker3,第二个分区的副本位于节点Broker1、Broker2,第三个分区的副本位于节点Broker2、Broker3。注意,分区的第一个副本将成为leader副本。
提示:--replica-assignment参数不能与--partitions、--replication-facto参数同时使用。
另外,Kafka默认会自动创建主题。例如,当我们发送消息到一个不存在的主题时,Kafka会为我们创建一个新的主题。
CreateTopics请求的处理流程
当我们使用kafka-topics.sh脚本时,该脚本会给KafkaController节点发送CreateTopics请求,从而在Kafka集群中创建主题。
创建主题
KafkaApis负责处理所有的Kafka请求。KafkaApis会调用KafkaApis#handle- CreateTopicsRequest方法处理CreateTopics命令,执行如下操作:
(1)检查请求内容是否正确并对用户进行认证、权限检查(如果开启了用户认证鉴权机制)。
(2)为新主题的分区生成AR副本列表(分区副本分配规则)。
(3)在ZooKeeper中创建节点,并存储主题相关的元数据。
(4)KafkaController节点给集群所有Broker节点发送信息,通知它们更新集群元数据。如果Broker节点发现自己被分配了新的副本,则需要完成对应的工作:如果被分配了leader副本,则需要接收生产者发送的消息,如果被分配了follow副本,则需要与leader副本同步数据。
KafkaApis#handleCreateTopicsRequest会调用ZkAdminManager#createTopics方法完成创建主题的逻辑:
def createTopics(timeout: Int,
validateOnly: Boolean,
toCreate: Map[String, CreatableTopic],
...): Unit = {
val brokers = metadataCache.getAliveBrokers()
val metadata = toCreate.values.map(topic =>
try {
...
// 【1】
val resolvedNumPartitions = if (topic.numPartitions == NO_NUM_PARTITIONS)
defaultNumPartitions else topic.numPartitions
val resolvedReplicationFactor = if (topic.replicationFactor == NO_REPLICATION_FACTOR)
defaultReplicationFactor else topic.replicationFactor
// 【2】
val assignments = if (topic.assignments.isEmpty) {
AdminUtils.assignReplicasToBrokers(
brokers, resolvedNumPartitions, resolvedReplicationFactor)
} else {
topic.assignments.forEach { assignment =>
assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int)
}
assignments
}
...
// 【3】
if (validateOnly) {
CreatePartitionsMetadata(topic.name, assignments.keySet)
} else {
controllerMutationQuota.record(assignments.size)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId)
populateIds(includeConfigsAndMetadata, topic.name)
CreatePartitionsMetadata(topic.name, assignments.keySet)
}
} ...
}
【1】如果用户指定了分区的数量、每个分区的副本数量,则使用用户指定值,否则使用Broker配置项指定的默认数量。
【2】如果用户指定了分区的副本列表,则使用用户指定的副本列表,否则调用AdminUtils#assignReplicasToBrokers方法为主题分区生成副本列表。
【3】将主题元数据(包括主题名、分区、分区AR的副本列表等内容)写入ZooKeeper。注意,如果validateOnly参数为true,则仅验证该创建主题的方案是否正确,不实际创建主题,所以不需要写入元数据。
内容摘自《深入理解Kafka与Pulsar》,本书详细介绍了Kafka与Pulsar的使用方式,并深入分析了它们的实现机制。通过阅读本书,读者可以快速入门和使用Kafka与Pulsar,并深入理解它们的实现原理。
适读人群 :Kafka、Pulsar的专业技术人员。 大数据相关应用的开发者、运维者和爱好者。