多中心kafka集群同步方案:聊聊mm2那些事
时光闹钟app开发者,请关注我,后续分享更精彩!
坚持原创,共同进步!
1.概述
kafka作为一款性能高效的消息中间件,在企业中被广泛使用。在一些灾备,异地备份等场景下,如何实现多中心机房间kafka集群的数据同步呢?本文将向大家介绍kafka官方的mirrormaker2(简称mm2)跨集群同步方案。希望对有类似需求的朋友们,有所帮助和参考。
跨集群同步示意图
2.环境资源
虚拟机(Centos) | 安装服务 |
host地址: 192.168.10.10 dev1 | 源kafka集群 |
host地址: 192.168.10.11 dev2 | 公共zookeeper集群实例1, 目标kafka集群实例1, mm2实例1, kafdrop监控(docker) |
host地址: 192.168.10.12 dev3 | 公共zookeeper集群实例2, 目标kafka集群实例2, mm2实例2, kafdrop监控(docker) |
host地址: 192.168.10.13 dev4 | 公共zookeeper集群实例3, 目标kafka集群实例3, mm2实例3 |
如上表格,环境说明如下:
- 4台虚拟机,虚拟机系统为centos 7,各虚拟机间以host地址访问。host地址以安装虚拟机网络ip为准,本文ip仅为示例说明。
- dev1机器上安装源kafka集群,3个kafka实例,不同实例间以不同端口区分。
- zookeeper集群分别安装到dev2-dev4 虚拟机上,源kafka和目标kafka集群公用zookeeper集群,以节约机器资源。
- mm2集群分别安装到dev2-dev4虚拟机上。
- 4台虚拟机分别绑定好host域名和hostname映射,设置方式请自行度娘,这里不作过多赘诉。
- 4台虚拟机分别安装jdk8环境,kafka安装依赖jdk环境。
3.kafka下载
本文以kafka_2.12-2.7.1版本作为演示示例,不同kafka版本间存在差异,或有兼容性和配置差异问题,请务必确认相关版本。
#下载地址
wget -c https://archive.apache.org/dist/kafka/2.7.1/kafka_2.12-2.7.1.tgz --no-check-certificate
#下载完成,copy到其他2台机器并解压到以下路径
/home/kafka/kafka_2.12-2.7.1
4.zookeeper集群安装
配置文件修改
机器dev2-dev4 3台机器上分别修改zookeeper.properties以下内容:
#创建zookeeper的配置数据存储目录
mkdir -p /data/zookeeper
#切换到kafka配置目录
cd {kafka目录}/config/
#修改配置
vim zookeeper.properties
#---zookeeper.properties中添加或修改以下配置项---
dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=dev2:2888:3888
server.2=dev3:2888:3888
server.3=dev4:2888:3888
myid文件添加
dev2-dev4 3台机器添加zookeeper myid文件
#dev2机器
cat >/data/zookeeper/myid<<eof
1
eof
#dev3机器
cat >/data/zookeeper/myid<<eof
2
eof
#dev4机器
cat >/data/zookeeper/myid<<eof
3
eof
zookeeper启动
dev2-dev4 3台机器分别启动zookeeper
cd /home/kafka/kafka_2.12-2.7.1
nohup bin/zookeeper-server-start.sh ./config/zookeeper.properties > ./zookeeper.log 2>&1 &
5.kafka broker集群安装
源kafka集群安装
源kafka集群,依赖公共zookeeper集群(dev2-dev4)。部署在dev1一个虚拟机上,基于不同端口的方式部署3个broker实例。
dev1虚拟机分别修改server1.properties、server2.properties、server3.properties配置
#切换到配置目录
cd /home/kafka/kafka_2.12-2.7.1/config
# 复制kafka server配置文件
cp server.properties server1.properties
cp server.properties server2.properties
cp server.properties server3.properties
#创建日志目录
mkdir -p /data/kafka/log1
mkdir -p /data/kafka/log2
mkdir -p /data/kafka/log3
#server1.properties配置修改
# 日志目录
log.dirs=/data/kafka/log1
# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka1
#broker id每个实例必须唯一
broker.id=1
#broker server监听地址
listeners=PLAINTEXT://dev1:9092
#server2.properties配置修改
# 日志目录
log.dirs=/data/kafka/log2
# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka1
#broker id每个实例必须唯一
broker.id=2
#broker server监听地址
listeners=PLAINTEXT://dev1:9093
#server3.properties配置修改
# 日志目录
log.dirs=/data/kafka/log3
# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka1
#broker id每个实例必须唯一
broker.id=3
#broker server监听地址
listeners=PLAINTEXT://dev1:9094
kafka启动,dev1机器上分别启动3个不同实例
cd /home/kafka/kafka_2.12-2.7.1
nohup bin/kafka-server-start.sh ./config/server1.properties > ./kafka1.log 2>&1 &
nohup bin/kafka-server-start.sh ./config/server2.properties > ./kafka2.log 2>&1 &
nohup bin/kafka-server-start.sh ./config/server3.properties > ./kafka3.log 2>&1 &
目标kafka集群安装
目标kafka集群,依赖公共zookeeper集群(dev2-dev4)。部署在dev2-dev4 3个虚拟机上,每个虚拟机部署1个broker实例。
dev2-dev4虚拟机分别修改配置
#dev2 broker配置
#切换到配置目录
cd /home/kafka/kafka_2.12-2.7.1/config
# 复制kafka server配置文件
cp server.properties server1.properties
#创建日志目录
mkdir -p /data/kafka/log1
#-------server1.properties配置修改
# 日志目录
log.dirs=/data/kafka/log1
# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka2
#broker id每个实例必须唯一
broker.id=1
#broker server监听地址
listeners=PLAINTEXT://dev2:9092
#dev3 broker配置
#切换到配置目录
cd /home/kafka/kafka_2.12-2.7.1/config
# 复制kafka server配置文件
cp server.properties server1.properties
#创建日志目录
mkdir -p /data/kafka/log1
#-------server1.properties配置修改
# 日志目录
log.dirs=/data/kafka/log1
# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka2
#broker id每个实例必须唯一
broker.id=2
#broker server监听地址
listeners=PLAINTEXT://dev3:9092
#dev4 broker配置
#切换到配置目录
cd /home/kafka/kafka_2.12-2.7.1/config
# 复制kafka server配置文件
cp server.properties server1.properties
#创建日志目录
mkdir -p /data/kafka/log1
#-------server1.properties配置修改
# 日志目录
log.dirs=/data/kafka/log1
# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka2
#broker id每个实例必须唯一
broker.id=3
#broker server监听地址
listeners=PLAINTEXT://dev4:9092
kafka启动,dev2-dev4 3台机器分别启动
cd /home/kafka/kafka_2.12-2.7.1
nohup bin/kafka-server-start.sh ./config/server1.properties > ./kafka1.log 2>&1 &
6.kafdrop监控安装
kafdrop是kafka集群监控工具。基于docker安装,使用前请提前在部署机器上安装docker运行环境。
源kafka集群监控,安装在虚拟机dev2上
docker run -d --name kafdrop1 --net=host --restart=always \
-e KAFKA_BROKERCONNECT=dev1:9092,dev1:9093,dev1:9094 \
-e JVM_OPTS="-Xms32M -Xmx64M" \
-e SERVER_SERVLET_CONTEXTPATH="/" \
obsidiandynamics/kafdrop:latest
#访问界面:
http://dev2:9000
目标kafka集群监控,安装在虚拟机dev3上
docker run -d --name kafdrop2 --net=host --restart=always \
-e KAFKA_BROKERCONNECT=dev2:9092,dev3:9092,dev4:9092 \
-e JVM_OPTS="-Xms32M -Xmx64M" \
-e SERVER_SERVLET_CONTEXTPATH="/" \
obsidiandynamics/kafdrop:latest
#访问界面:
http://dev3:9000
启动成功后浏览器分别访问以下地址。
源kafka集群监控
http://dev2:9000
目标kafka集群监控
http://dev3:9000
7. mirror maker2介绍
1.参考文档
网友更新:
https://www.cnblogs.com/listenfwind/p/14269259.html
https://zhuanlan.zhihu.com/p/343819512
官方文档:https://kafka.apache.org/31/documentation/#georeplication
官方wiki文档:https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
github: https://github.com/apache/kafka/tree/trunk/connect/mirror
其他同步方案:
https://www.zhihu.com/question/339709372
2.mm2介绍
Geo-Replication(跨地域复制)应用场景
- Geo-replication(跨地域复制)
- Disaster recovery(灾备)
- Feeding edge clusters into a central, aggregate cluster(边沿集群向中心集群数据收集,数据汇集)
- Physical isolation of clusters (such as production vs. testing)(物理集群隔离和同步,如生成环境到测试环境数据验证等)
- Cloud migration or hybrid cloud deployments(云迁移或易购云部署)
- Legal and compliance requirements(合规等需求)
支持特性
- Replicates topics (data plus configurations)(主题复制)
- Replicates consumer groups including offsets to migrate applications between clusters(消费组及其消费offsets值)
- Replicates ACLs(权限)
- Preserves partitioning(分区)
- Automatically detects new topics and partitions(自动检测新的主题和分区)
- Provides a wide range of metrics, such as end-to-end replication latency across multiple data centers/clusters(提供端到端统计复制延迟等指标)
- Fault-tolerant and horizontally scalable operations(故障转移和水平扩展)
Replication Flows(复制流)
mm2使用connectors消费源集群消息,然后生产投递消息到目标集群。这种从源集群复制数据到目标集群的流向称为复制流replication flows。mm2配置文件中的定义格式{source_cluster}->{target_cluster}。以下是一些示例说明:
- 主主模式,双向复制:A->B, B->A
- 主备模式,单向复制:A->B
- 聚合模式:A->K, B->K, C->K
- 广播模式:K->A, K->B, K->C
- 传递模式:A->B, B->C, C->D
3.mm2配置启动
mm2配置文件{kafka路径}/config/connect-mirror-maker.properties
cd /home/kafka/kafka_2.12-2.7.1
vim ./config/connect-mirror-maker.properties
# mm2默认任务连接数。推荐至少为2.依赖硬件资源和复制的主题分区数
tasks.max = 5
# specify any number of cluster aliases
clusters = kafka1, kafka2
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
kafka1.bootstrap.servers = dev1:9092,dev1:9093,dev1:9094
kafka2.bootstrap.servers = dev2:9092,dev3:9092,dev4:9092
# enable and configure individual replication flows
kafka1->kafka2.enabled = true
# 同步消息主题,支持正则匹配。多个主题以逗号分隔
kafka1->kafka2.topics = foo.*, bar.*
mm2启动
目标集群对应机器上(dev2,dev3,dev4),分别启动
cd /home/kafka/kafka_2.12-2.7.1
nohup bin/connect-mirror-maker.sh ./config/connect-mirror-maker.properties --clusters kafka2 > ./mm1.log 2>&1 &
4.验证
往源kafka集群推送测试topic消息(kafka推送消息实现本文不作介绍,请自行度娘)。这里推送了3w条消息。分别通过kafdrop监控查看源和目标集群监控界面,对比统计情况。如下截图所示,可以看到两边消息数量一致。
源kafka集群topic2消息3w条
目标kafka集群topic2同步消息数量3w条。
5.QA
1.问题:使用mm2常见问题说明
解答:
- mm2原生支撑集群模式。不同机器上分别启动对应实例即可。
- mm2建议与目标集群部署在同一机房下。在启动命令中通过--clusters kafka2参数指定目标集群别名,对程序性能优化有帮助。
- 配置修改后需重启mm2程序生效。
2.问题:通过mm从source cluster到target cluster同步时,copy到target cluster 的topic名称默认策略会修改为{源集群别名}.{topic 名称}. 解答:
参考地址:https://www.jianshu.com/p/5d5c98811e33
https://github.com/opencore/mirrormaker_topic_rename
stackover解决方案
https://stackoverflow.com/questions/59390555/is-it-possible-to-replicate-kafka-topics-without-alias-prefix-with-mirrormaker2
#配置文件添加策略设置 2.7.1版本好像有问题
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
#或者自定义实现
replication.policy.class=com.dctl.ea.mm2.plug.ext.SameNameReplicationPolicy
自定义实现:
新增工程,添加maven依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-mirror-client</artifactId>
<version>2.7.1</version>
</dependency>
</dependencies>
实现类SameNameReplicationPolicy
public class SameNameReplicationPolicy extends DefaultReplicationPolicy {
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
return topic;
}
}
maven打包后,将扩展jar放到{kafka安装目录}/libs目录下,添加connect-mirror-maker.properties配置项
replication.policy.class=com.dctl.ea.mm2.plug.ext.SameNameReplicationPolicy
3.问题: mm默认不支持只消费一次exactly-once semantic(eos)的精确语义。即通过mm复制source集群到target集群时,可能在target集群中产生重复的同步消息。 解答:
mm2工具官方版本不支持只消费一次的精确语义,所以存在特殊情况,消息同步过程中mm2进程被kill掉后程序恢复,目标集群消息重复推送的问题。故需要消费目标kafka集群的业务程序自行保证topic消费的幂等。
以下是一些关于mm2实现精确语义的讨论,供有兴趣的同学参考。
https://cwiki.apache.org/confluence/display/KAFKA/KIP-656:+MirrorMaker2+Exactly-once+Semantics
8. 小结
本文向大家介绍了跨kafka集群同步的方案:使用mm2实现多中心kafka集群的复制。包含环境准备、工具安装、测试验证、以及mm2使用的一些注意点。相信大家对kafka跨集群同步有了比较完整的整体认识。由于准备比较仓促,难免存在不足之处。若有疏漏,欢迎留言讨论。