说明:这里主要是安装和基本的使用。Debezinum 也可以用阿里的Canal 来替代。如果您觉得不错或对您有帮助,轻轻的动一下您的手指留言评论或点个赞,谢谢大家
下载安装kafka
- 下载地址:http://archive.apache.org/dist/kafka/2.4.0/kafka_2.11-2.4.0.tgz
- 解压: tar -zxvf /apps/kafka_2.11-2.4.0.tgz
- 编辑:server.properties.(我这里zk,kafka分用了三台机器)
broker.id=3
port=9094
listeners=PLAINTEXT://flink-master:9094
advertised.listeners=PLAINTEXT://flink-master:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=flink-master:2181,flink-slave1:2182,flink-slave2:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
下载安装Debezium
- 下载地址:https://debezium.io/releases/1.2/
- 解压到kafka的plugins目录: tar -zxvf debezium-connector-mysql-1.2.5.Final-plugin.tar.gz -C /usr/local/share/kafka/plugins/debezium
- 解压后如图:
编辑kafka 的connet-distributed.properties
- 只需要修改两个地方
- bootstrap.servers=flink-master:9094
- plugin.path=plugin.path=/usr/local/share/kafka/plugins/debezium
创建mysql.properties
1.创建
name=inventory-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=mysql-hostname
database.port=3306
database.user=root
database.password=123456
database.server.id=987654321
database.server.name=flink-slave2
database.whitelist=cdc
database.history.kafka.bootstrap.servers=flink-master:9094
database.history.kafka.topic=dbhistory.flink-slave2
include.schema.changes=true
2.debezium属性说明
- 地址:https://debezium.io/documentation/reference/0.10/connectors/mysql.html
启动kafka 以及 kafka connect
1.启动kafka:kafka-server-start.sh server.properties
2.启动connect:connect-distributed.sh
connect-distributed.properties mysql.properties
3.启动成功后如图:会出现connect-configs,connect-offsets,connect-status,flink-slave2.cdc.customers,flink-slave2.cdc.order,flink-slave2.cdc.products
4.customers,order,product 分别对应数据库中的表,如果没有可以手动执行connect api
官网地址:
https://kafka.apache.org/24/documentation.html#connect
启动Flink 集群并运行Flink sql
1.启动集群:start-cluster.sh
2.运行Flink sql:sql-client.sh embedded
3.运行结果:
4.查看mysql 中的表
order表数据:
5.修改order 表中的数据,并查看flink sql 中的显示
到这里我们的Flink+Debezium+Kafka 就完成了。