Debezium是一款非常活跃的数据同步工具, 本文介绍如何基于Kafka Connect和Debezium将PostgreSQL中的数据同步复制到Kafka.
debezium实时同步PostgreSQL数据到Kafka
PostgreSQL配置
修改postgresql.conf中有关 wal_level 级别, 由默认的 replica 调整为 logical, 该配置项修改后需要重启PostgreSQL进程。
wal_level=logical
安装redpanda
PostgreSQL数据实时变化过程中,有两类基本数据,一种是表中的数据的变更,另一种是表结构变更的信息。
Kakfa用于存储表中的数据,Schema Registry用于保存表的结构数据。
Kafka解决方案中涉及到的组件较多,比如需要部署zookeeper, kafka和schema registry,这让整个部署和维护链条都很长。Redpanad是为了解决部署链条过长的痛点而诞生的一个新开源项目,兼容kafka api, 但采用raft协议,去除了zookeeper信赖,同时支持schema registry。
docker run \
--name=redpanda-1 \
--network host \
--detach \
--rm \
vectorized/redpanda:v21.9.5 start \
--overprovisioned \
--smp 1 \
--memory 1G \
--reserve-memory 0M \
--node-id 0 \
--check=false \
--pandaproxy-addr 0.0.0.0:8082 \
--advertise-pandaproxy-addr 127.0.0.1:8082 \
--kafka-addr 0.0.0.0:9092 \
--advertise-kafka-addr 127.0.0.1:9092 \
--set "redpanda.enable_transactions=true" \
--set "redpanda.enable_idempotence=true"
安装kafka-connect
利用docker来下载kafka-connect及其信赖。
docker pull debezium/connect
运行debezium/connect
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect
将connect目录从docker container复制到host机器
docker cp connect:/kafka /tmp
config/connect-distributed.properties
bootstrap.servers=127.0.0.1:9092
group.id=2
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=my-connect-offsets
offset.storage.replication.factor=1
config.storage.topic=my-connect-configs
config.storage.replication.factor=1
status.storage.topic=my_connect_statuses
status.storage.replication.factor=1
offset.flush.interval.ms=60000
rest.host.name=127.0.0.1
rest.port=8083
rest.advertised.host.name=127.0.0.1
rest.advertised.port=8083
plugin.path=/usr/share/java,/etc/kafka-connect/jars,/opt/bigdata/kafka-connect/connect
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000
运行kafka connect
bin/connect-distributed.sh config/connect-distributed.properties
注册connector
register-postgres-avro.json
{
"name": "anticrawler-connector-inst",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver0",
"publication.autocreate.mode":"filtered",
"schema.include.list": "inst",
"table.include.list": "inst\\.mock_.*,prim\\.partitioned_demo_p20.*",
"plugin.name":"pgoutput",
"heartbeat.interval.ms": "5",
"database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
"database.history.kafka.topic": "schema-changes.anticrawler",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://127.0.0.1:8081",
"value.converter.schema.registry.url": "http://127.0.0.1:8081",
"transforms":"Reroute",
"transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*)partitioned_demo(.*)",
"transforms.Reroute.topic.replacement": "$1partitioned_demo",
"transforms.Reroute.key.enforce.uniqueness":"false"
}
}
利用connect的rest api来注册connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://127.0.0.1:8083/connectors/ -d @register-postgres-avro.json
状态检测
在PG侧验证逻辑复制的进度
select * from pg_stat_replication;
select * from pg_replication_slots;
redpanda侧查看topic是否已经创建
rpk topic list
如果topic已经创建,接下来可以尝试观察一下同步的数据内容
rpk topic consume dbserver1.inst.tblname