多中心kafka集群同步方案:聊聊mm2那些事

时光闹钟app开发者,请关注我,后续分享更精彩!
坚持原创,共同进步!

1.概述

kafka作为一款性能高效的消息中间件,在企业中被广泛使用。在一些灾备,异地备份等场景下,如何实现多中心机房间kafka集群的数据同步呢?本文将向大家介绍kafka官方的mirrormaker2(简称mm2)跨集群同步方案。希望对有类似需求的朋友们,有所帮助和参考。

跨集群同步示意图

2.环境资源

虚拟机(Centos)

安装服务

host地址: 192.168.10.10 dev1

源kafka集群

host地址: 192.168.10.11 dev2

公共zookeeper集群实例1,

目标kafka集群实例1,

mm2实例1,

kafdrop监控(docker)

host地址: 192.168.10.12 dev3

公共zookeeper集群实例2,

目标kafka集群实例2,

mm2实例2,

kafdrop监控(docker)

host地址: 192.168.10.13 dev4

公共zookeeper集群实例3,

目标kafka集群实例3,

mm2实例3

如上表格,环境说明如下:

  • 4台虚拟机,虚拟机系统为centos 7,各虚拟机间以host地址访问。host地址以安装虚拟机网络ip为准,本文ip仅为示例说明。
  • dev1机器上安装源kafka集群,3个kafka实例,不同实例间以不同端口区分。
  • zookeeper集群分别安装到dev2-dev4 虚拟机上,源kafka和目标kafka集群公用zookeeper集群,以节约机器资源。
  • mm2集群分别安装到dev2-dev4虚拟机上。
  • 4台虚拟机分别绑定好host域名和hostname映射,设置方式请自行度娘,这里不作过多赘诉。
  • 4台虚拟机分别安装jdk8环境,kafka安装依赖jdk环境。

3.kafka下载

本文以kafka_2.12-2.7.1版本作为演示示例,不同kafka版本间存在差异,或有兼容性和配置差异问题,请务必确认相关版本。

#下载地址
wget -c https://archive.apache.org/dist/kafka/2.7.1/kafka_2.12-2.7.1.tgz --no-check-certificate

#下载完成,copy到其他2台机器并解压到以下路径
/home/kafka/kafka_2.12-2.7.1

4.zookeeper集群安装

配置文件修改

机器dev2-dev4 3台机器上分别修改zookeeper.properties以下内容:

#创建zookeeper的配置数据存储目录
mkdir -p /data/zookeeper

#切换到kafka配置目录
cd {kafka目录}/config/

#修改配置
vim zookeeper.properties

#---zookeeper.properties中添加或修改以下配置项---

dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=dev2:2888:3888
server.2=dev3:2888:3888
server.3=dev4:2888:3888

myid文件添加

dev2-dev4 3台机器添加zookeeper myid文件

#dev2机器
cat >/data/zookeeper/myid<<eof
1
eof

#dev3机器
cat >/data/zookeeper/myid<<eof
2
eof

#dev4机器
cat >/data/zookeeper/myid<<eof
3
eof

zookeeper启动

dev2-dev4 3台机器分别启动zookeeper

cd /home/kafka/kafka_2.12-2.7.1
nohup bin/zookeeper-server-start.sh ./config/zookeeper.properties > ./zookeeper.log 2>&1 &

5.kafka broker集群安装

源kafka集群安装

源kafka集群,依赖公共zookeeper集群(dev2-dev4)。部署在dev1一个虚拟机上,基于不同端口的方式部署3个broker实例。

dev1虚拟机分别修改server1.properties、server2.properties、server3.properties配置

#切换到配置目录
cd /home/kafka/kafka_2.12-2.7.1/config

# 复制kafka server配置文件
cp server.properties server1.properties
cp server.properties server2.properties
cp server.properties server3.properties

#创建日志目录
mkdir -p /data/kafka/log1
mkdir -p /data/kafka/log2
mkdir -p /data/kafka/log3
#server1.properties配置修改
# 日志目录
log.dirs=/data/kafka/log1

# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka1

#broker id每个实例必须唯一
broker.id=1

#broker server监听地址
listeners=PLAINTEXT://dev1:9092
#server2.properties配置修改
# 日志目录
log.dirs=/data/kafka/log2

# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka1

#broker id每个实例必须唯一
broker.id=2

#broker server监听地址
listeners=PLAINTEXT://dev1:9093
#server3.properties配置修改
# 日志目录
log.dirs=/data/kafka/log3

# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka1

#broker id每个实例必须唯一
broker.id=3

#broker server监听地址
listeners=PLAINTEXT://dev1:9094

kafka启动,dev1机器上分别启动3个不同实例

cd /home/kafka/kafka_2.12-2.7.1
nohup bin/kafka-server-start.sh ./config/server1.properties > ./kafka1.log 2>&1 &
nohup bin/kafka-server-start.sh ./config/server2.properties > ./kafka2.log 2>&1 &
nohup bin/kafka-server-start.sh ./config/server3.properties > ./kafka3.log 2>&1 &

目标kafka集群安装

目标kafka集群,依赖公共zookeeper集群(dev2-dev4)。部署在dev2-dev4 3个虚拟机上,每个虚拟机部署1个broker实例。

dev2-dev4虚拟机分别修改配置

#dev2 broker配置
#切换到配置目录
cd /home/kafka/kafka_2.12-2.7.1/config

# 复制kafka server配置文件
cp server.properties server1.properties

#创建日志目录
mkdir -p /data/kafka/log1

#-------server1.properties配置修改
# 日志目录
log.dirs=/data/kafka/log1

# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka2

#broker id每个实例必须唯一
broker.id=1

#broker server监听地址
listeners=PLAINTEXT://dev2:9092
#dev3 broker配置
#切换到配置目录
cd /home/kafka/kafka_2.12-2.7.1/config

# 复制kafka server配置文件
cp server.properties server1.properties

#创建日志目录
mkdir -p /data/kafka/log1

#-------server1.properties配置修改
# 日志目录
log.dirs=/data/kafka/log1

# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka2

#broker id每个实例必须唯一
broker.id=2

#broker server监听地址
listeners=PLAINTEXT://dev3:9092
#dev4 broker配置
#切换到配置目录
cd /home/kafka/kafka_2.12-2.7.1/config

# 复制kafka server配置文件
cp server.properties server1.properties

#创建日志目录
mkdir -p /data/kafka/log1

#-------server1.properties配置修改
# 日志目录
log.dirs=/data/kafka/log1

# 集群ip配置。注意:2个kafka集群共用一个zookeeper集群。不同kafka集群数据写不同zookeeper目录。这里是kafka1目录
zookeeper.connect=dev2:2181,dev3:2181,dev4:2181/kafka2

#broker id每个实例必须唯一
broker.id=3

#broker server监听地址
listeners=PLAINTEXT://dev4:9092

kafka启动,dev2-dev4 3台机器分别启动

cd /home/kafka/kafka_2.12-2.7.1
nohup bin/kafka-server-start.sh ./config/server1.properties > ./kafka1.log 2>&1 &

6.kafdrop监控安装

kafdrop是kafka集群监控工具。基于docker安装,使用前请提前在部署机器上安装docker运行环境。

源kafka集群监控,安装在虚拟机dev2上

docker run -d --name kafdrop1 --net=host --restart=always \
  -e KAFKA_BROKERCONNECT=dev1:9092,dev1:9093,dev1:9094 \
  -e JVM_OPTS="-Xms32M -Xmx64M" \
  -e SERVER_SERVLET_CONTEXTPATH="/" \
  obsidiandynamics/kafdrop:latest

#访问界面:  
http://dev2:9000

目标kafka集群监控,安装在虚拟机dev3上

docker run -d --name kafdrop2 --net=host --restart=always \
  -e KAFKA_BROKERCONNECT=dev2:9092,dev3:9092,dev4:9092 \
  -e JVM_OPTS="-Xms32M -Xmx64M" \
  -e SERVER_SERVLET_CONTEXTPATH="/" \
  obsidiandynamics/kafdrop:latest

#访问界面:  
http://dev3:9000  

启动成功后浏览器分别访问以下地址。

源kafka集群监控
http://dev2:9000

目标kafka集群监控
http://dev3:9000

7. mirror maker2介绍

1.参考文档

网友更新:
https://www.cnblogs.com/listenfwind/p/14269259.html
https://zhuanlan.zhihu.com/p/343819512
官方文档:https://kafka.apache.org/31/documentation/#georeplication
官方wiki文档:https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
github: https://github.com/apache/kafka/tree/trunk/connect/mirror

其他同步方案:
https://www.zhihu.com/question/339709372

2.mm2介绍

Geo-Replication(跨地域复制)应用场景

  • Geo-replication(跨地域复制)
  • Disaster recovery(灾备)
  • Feeding edge clusters into a central, aggregate cluster(边沿集群向中心集群数据收集,数据汇集)
  • Physical isolation of clusters (such as production vs. testing)(物理集群隔离和同步,如生成环境到测试环境数据验证等)
  • Cloud migration or hybrid cloud deployments(云迁移或易购云部署)
  • Legal and compliance requirements(合规等需求)

支持特性

  • Replicates topics (data plus configurations)(主题复制)
  • Replicates consumer groups including offsets to migrate applications between clusters(消费组及其消费offsets值)
  • Replicates ACLs(权限)
  • Preserves partitioning(分区)
  • Automatically detects new topics and partitions(自动检测新的主题和分区)
  • Provides a wide range of metrics, such as end-to-end replication latency across multiple data centers/clusters(提供端到端统计复制延迟等指标)
  • Fault-tolerant and horizontally scalable operations(故障转移和水平扩展)

Replication Flows(复制流)

mm2使用connectors消费源集群消息,然后生产投递消息到目标集群。这种从源集群复制数据到目标集群的流向称为复制流replication flows。mm2配置文件中的定义格式{source_cluster}->{target_cluster}。以下是一些示例说明:
- 主主模式,双向复制:A->B, B->A
- 主备模式,单向复制:A->B
- 聚合模式:A->K, B->K, C->K
- 广播模式:K->A, K->B, K->C
- 传递模式:A->B, B->C, C->D

3.mm2配置启动

mm2配置文件{kafka路径}/config/connect-mirror-maker.properties

cd /home/kafka/kafka_2.12-2.7.1
vim ./config/connect-mirror-maker.properties
# mm2默认任务连接数。推荐至少为2.依赖硬件资源和复制的主题分区数
tasks.max = 5

# specify any number of cluster aliases
clusters = kafka1, kafka2

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
kafka1.bootstrap.servers = dev1:9092,dev1:9093,dev1:9094
kafka2.bootstrap.servers = dev2:9092,dev3:9092,dev4:9092

# enable and configure individual replication flows
kafka1->kafka2.enabled = true

# 同步消息主题,支持正则匹配。多个主题以逗号分隔
kafka1->kafka2.topics = foo.*, bar.* 

mm2启动

目标集群对应机器上(dev2,dev3,dev4),分别启动

cd /home/kafka/kafka_2.12-2.7.1
nohup bin/connect-mirror-maker.sh ./config/connect-mirror-maker.properties --clusters kafka2 > ./mm1.log 2>&1 &

4.验证

往源kafka集群推送测试topic消息(kafka推送消息实现本文不作介绍,请自行度娘)。这里推送了3w条消息。分别通过kafdrop监控查看源和目标集群监控界面,对比统计情况。如下截图所示,可以看到两边消息数量一致。

源kafka集群topic2消息3w条

目标kafka集群topic2同步消息数量3w条。


5.QA

1.问题:使用mm2常见问题说明

解答:

- mm2原生支撑集群模式。不同机器上分别启动对应实例即可。
- mm2建议与目标集群部署在同一机房下。在启动命令中通过--clusters kafka2参数指定目标集群别名,对程序性能优化有帮助。
- 配置修改后需重启mm2程序生效。

2.问题:通过mm从source cluster到target cluster同步时,copy到target cluster 的topic名称默认策略会修改为{源集群别名}.{topic 名称}. 解答:

参考地址:https://www.jianshu.com/p/5d5c98811e33
https://github.com/opencore/mirrormaker_topic_rename

stackover解决方案
https://stackoverflow.com/questions/59390555/is-it-possible-to-replicate-kafka-topics-without-alias-prefix-with-mirrormaker2

#配置文件添加策略设置 2.7.1版本好像有问题
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
#或者自定义实现
replication.policy.class=com.dctl.ea.mm2.plug.ext.SameNameReplicationPolicy

自定义实现:

新增工程,添加maven依赖

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>connect-mirror-client</artifactId>
        <version>2.7.1</version>
    </dependency>
</dependencies>

实现类SameNameReplicationPolicy

public class SameNameReplicationPolicy extends DefaultReplicationPolicy {

    @Override
    public String formatRemoteTopic(String sourceClusterAlias, String topic) {
        return topic;
    }
}

maven打包后,将扩展jar放到{kafka安装目录}/libs目录下,添加connect-mirror-maker.properties配置项

replication.policy.class=com.dctl.ea.mm2.plug.ext.SameNameReplicationPolicy

3.问题: mm默认不支持只消费一次exactly-once semantic(eos)的精确语义。即通过mm复制source集群到target集群时,可能在target集群中产生重复的同步消息。 解答:

mm2工具官方版本不支持只消费一次的精确语义,所以存在特殊情况,消息同步过程中mm2进程被kill掉后程序恢复,目标集群消息重复推送的问题。故需要消费目标kafka集群的业务程序自行保证topic消费的幂等。

以下是一些关于mm2实现精确语义的讨论,供有兴趣的同学参考。
https://cwiki.apache.org/confluence/display/KAFKA/KIP-656:+MirrorMaker2+Exactly-once+Semantics

8. 小结

本文向大家介绍了跨kafka集群同步的方案:使用mm2实现多中心kafka集群的复制。包含环境准备、工具安装、测试验证、以及mm2使用的一些注意点。相信大家对kafka跨集群同步有了比较完整的整体认识。由于准备比较仓促,难免存在不足之处。若有疏漏,欢迎留言讨论。