PostgreSQL和Materialize构建实时增量视图

Materialize是一款或者说当前市面上唯一一款支持实时增量视图的开源软件,在Materialize 0.9.0版本之后,支持以PostgreSQL为数据源,利用PostgreSQL的logical replication读取PG最新数据,本文演示如何把两者结合在一起,并利用简单的Python脚本,把最新的聚合结果写回到PostgreSQL。

PostgreSQL

PostgreSQL中的wal_level要设置为logical, 创建表并调整replica identity为FULL, 以表person为例。

create table person(name varchar(32), age int, int gender, primary key(name));
alter table person replica identity full;

创建publication, 并添加表。

create publication pg_mz_publication;
alter publication pg_mz_publication add table person;

检测publication中有哪些表会进行逻辑复制。

select * from pg_publication_tables;

Materialize Source

Materialize中三个主要概念,分别是Source/View/Sink.

source表示从哪裡读取数据,当前支持kafka/postgresql/avro文件等。

view主要用于对Source进行分析和处理的逻辑。

sink用于将view中变化的结果存储起来, 当前和kafka的整合较好。

create materialized source mz_pg_source 
from postgres connection 'host=localhost user=postgres dbname=postgres' 
publication 'pg_mz_publication';

创建视图

create views from source mz_pg_source (person as person);

创建分析视图

create materialized view stat_person as select gender, count(*) from person;

视图变化结果存储到 PostgreSQL

使用tail命令可以捕获视图中最新的变化数据, 为了及时的把数据写入到Sink, 使用timeout参数。

def watch_mv_change(mv_name):
    mz_conn = psycopg2.connect(dsn)
    dynamic_insert_sql = generate_insert_sql(mv_name)
    try:
         l_rows = []
         with mz_conn.cursor() as cur:
             cur.execute('declare c cursor for tail {}'.format(mv_name))
             while True:
                  cur.execute("fetch all c with (timeout='100ms')")
                  l_rows.clear()
                  for row in cur:
                     if row[1] == 1:
                         l_row = list(row)
                         l_row.pop(0)
                         l_row.pop(0)
                         l_rows.append(tuple(l_row))
                         if len(l_rows) > 200:
                             psycopg2.extras.execute_batch(pg_cur,dynamic_insert_sql, l_rows)
                             pg_conn.commit()
                             l_rows.clear()
                  if len(l_rows) > 0:
                         psycopg2.extras.execute_batch(pg_cur,dynamic_insert_sql, l_rows)
                         pg_conn.commit()
    except Exception as ex:
         print(mv_name)
         print(ex)
         pass