Debezium同步PostgreSQL数据到Kafka

Debezium是一款非常活跃的数据同步工具, 本文介绍如何基于Kafka Connect和Debezium将PostgreSQL中的数据同步复制到Kafka.

debezium实时同步PostgreSQL数据到Kafka

PostgreSQL配置

修改postgresql.conf中有关 wal_level 级别, 由默认的 replica 调整为 logical, 该配置项修改后需要重启PostgreSQL进程。

wal_level=logical

安装redpanda

PostgreSQL数据实时变化过程中,有两类基本数据,一种是表中的数据的变更,另一种是表结构变更的信息。

Kakfa用于存储表中的数据,Schema Registry用于保存表的结构数据。

Kafka解决方案中涉及到的组件较多,比如需要部署zookeeper, kafka和schema registry,这让整个部署和维护链条都很长。Redpanad是为了解决部署链条过长的痛点而诞生的一个新开源项目,兼容kafka api, 但采用raft协议,去除了zookeeper信赖,同时支持schema registry。

docker run \
  --name=redpanda-1 \
  --network host \
  --detach \
  --rm \
  vectorized/redpanda:v21.9.5 start \
  --overprovisioned \
  --smp 1 \
  --memory 1G \
  --reserve-memory 0M \
  --node-id 0 \
  --check=false \
  --pandaproxy-addr 0.0.0.0:8082 \
  --advertise-pandaproxy-addr 127.0.0.1:8082 \
  --kafka-addr 0.0.0.0:9092 \
  --advertise-kafka-addr 127.0.0.1:9092 \
  --set "redpanda.enable_transactions=true" \
  --set "redpanda.enable_idempotence=true"

安装kafka-connect

利用docker来下载kafka-connect及其信赖。

docker pull debezium/connect

运行debezium/connect

docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect

将connect目录从docker container复制到host机器

docker cp connect:/kafka /tmp

config/connect-distributed.properties

bootstrap.servers=127.0.0.1:9092
group.id=2
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=my-connect-offsets
offset.storage.replication.factor=1
config.storage.topic=my-connect-configs
config.storage.replication.factor=1
status.storage.topic=my_connect_statuses
status.storage.replication.factor=1
offset.flush.interval.ms=60000
rest.host.name=127.0.0.1
rest.port=8083
rest.advertised.host.name=127.0.0.1
rest.advertised.port=8083
plugin.path=/usr/share/java,/etc/kafka-connect/jars,/opt/bigdata/kafka-connect/connect
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000

运行kafka connect

bin/connect-distributed.sh config/connect-distributed.properties

注册connector

register-postgres-avro.json

{
    "name": "anticrawler-connector-inst",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "127.0.0.1",
        "database.port": "5432",
        "database.user": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver0",
        "publication.autocreate.mode":"filtered",
        "schema.include.list": "inst",
        "table.include.list": "inst\\.mock_.*,prim\\.partitioned_demo_p20.*",
        "plugin.name":"pgoutput",
        "heartbeat.interval.ms": "5",
        "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
        "database.history.kafka.topic": "schema-changes.anticrawler",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://127.0.0.1:8081",
        "value.converter.schema.registry.url": "http://127.0.0.1:8081",
        "transforms":"Reroute",
        "transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Reroute.topic.regex": "(.*)partitioned_demo(.*)",
        "transforms.Reroute.topic.replacement": "$1partitioned_demo",
        "transforms.Reroute.key.enforce.uniqueness":"false"
    }
}

利用connect的rest api来注册connector

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://127.0.0.1:8083/connectors/ -d @register-postgres-avro.json

状态检测

PG侧验证逻辑复制的进度

select * from pg_stat_replication;
select * from pg_replication_slots;

redpanda侧查看topic是否已经创建

rpk topic list

如果topic已经创建,接下来可以尝试观察一下同步的数据内容

rpk topic consume dbserver1.inst.tblname