Materialize是一款或者说当前市面上唯一一款支持实时增量视图的开源软件,在Materialize 0.9.0版本之后,支持以PostgreSQL为数据源,利用PostgreSQL的logical replication读取PG最新数据,本文演示如何把两者结合在一起,并利用简单的Python脚本,把最新的聚合结果写回到PostgreSQL。
PostgreSQL
PostgreSQL中的wal_level要设置为logical, 创建表并调整replica identity为FULL, 以表person为例。
create table person(name varchar(32), age int, int gender, primary key(name));
alter table person replica identity full;
创建publication, 并添加表。
create publication pg_mz_publication;
alter publication pg_mz_publication add table person;
检测publication中有哪些表会进行逻辑复制。
select * from pg_publication_tables;
Materialize Source
Materialize中三个主要概念,分别是Source/View/Sink.
source表示从哪裡读取数据,当前支持kafka/postgresql/avro文件等。
view主要用于对Source进行分析和处理的逻辑。
sink用于将view中变化的结果存储起来, 当前和kafka的整合较好。
create materialized source mz_pg_source
from postgres connection 'host=localhost user=postgres dbname=postgres'
publication 'pg_mz_publication';
创建视图
create views from source mz_pg_source (person as person);
创建分析视图
create materialized view stat_person as select gender, count(*) from person;
视图变化结果存储到 PostgreSQL
使用tail命令可以捕获视图中最新的变化数据, 为了及时的把数据写入到Sink, 使用timeout参数。
def watch_mv_change(mv_name):
mz_conn = psycopg2.connect(dsn)
dynamic_insert_sql = generate_insert_sql(mv_name)
try:
l_rows = []
with mz_conn.cursor() as cur:
cur.execute('declare c cursor for tail {}'.format(mv_name))
while True:
cur.execute("fetch all c with (timeout='100ms')")
l_rows.clear()
for row in cur:
if row[1] == 1:
l_row = list(row)
l_row.pop(0)
l_row.pop(0)
l_rows.append(tuple(l_row))
if len(l_rows) > 200:
psycopg2.extras.execute_batch(pg_cur,dynamic_insert_sql, l_rows)
pg_conn.commit()
l_rows.clear()
if len(l_rows) > 0:
psycopg2.extras.execute_batch(pg_cur,dynamic_insert_sql, l_rows)
pg_conn.commit()
except Exception as ex:
print(mv_name)
print(ex)
pass