wal2json

1. 概述

wal2json 是一个用于 PostgreSQL 逻辑解码的输出插件,这个插件为每个事务生成一个JSON对象。

2. 安装

源码安装环境为 Ubuntu 24.04(x86_64),环境中已经安装了IvorySQL5及以上版本,安装路径为/usr/ivory-5

2.1. 源码安装

# 从 https://github.com/eulerto/wal2json/releases/tag/wal2json_2_6 下载 2.6 的源码包 wal2json_2_6.zip

unzip wal2json_2_6.zip
cd wal2json_2_6

# 编译安装插件
make PG_CONFIG=/usr/ivory-5/bin/pg_config
make PG_CONFIG=/usr/ivory-5/bin/pg_config install
如果出现找不到xlocale.h的错误,需要手动修改 /usr/ivory-5/include/postgresql/server/pg_config.h 删除或者注释掉 #define HAVE_XLOCALE_H 1 这一行

2.2. 修改数据库配置文件

修改 postgresql.conf 文件 启用wal_level为logical,并设置最大复制槽数和最大wal发送进程数

wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

pg_hba.conf不需要修改,有以下内容即可(仅限本地链接测试):

     local   replication     all                                     trust
     host    replication     all             127.0.0.1/32            trust
     host    replication     all             ::1/128                 trust

重启数据库后配置生效。

3. 使用

在第一个终端中执行命令:

sudo -u ivorysql /usr/ivory-5/bin/bin/pg_recvlogical -d postgres --slot wal2json_slot --create-slot -P wal2json

启动监听,实时输出变更的JSON格式
sudo -u ivorysql /usr/ivory-5/bin/bin/pg_recvlogical -d postgres --slot wal2json_slot --start -o pretty-print=1 -f -

在第二个终端中连接数据库:

/usr/ivory-5/bin/psql -d postgres -p 1521

执行下面的SQL语句:

CREATE TABLE test_cdc (id int primary key, name varchar(50));
INSERT INTO test_cdc VALUES (1, 'test1');
UPDATE test_cdc SET name = 'test1_update' WHERE id = 1;
DELETE FROM test_cdc WHERE id = 1;
DROP TABLE test_cdc;

此时在第一个终端上可以看到下面的输出:

{
        "change": [
        ]
}
{
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "test_cdc",
                        "columnnames": ["id", "name"],
                        "columntypes": ["integer", "sys.oravarcharbyte(50)"],
                        "columnvalues": [1, "test1"]
                }
        ]
}
{
        "change": [
                {
                        "kind": "update",
                        "schema": "public",
                        "table": "test_cdc",
                        "columnnames": ["id", "name"],
                        "columntypes": ["integer", "sys.oravarcharbyte(50)"],
                        "columnvalues": [1, "test1_update"],
                        "oldkeys": {
                                "keynames": ["id"],
                                "keytypes": ["integer"],
                                "keyvalues": [1]
                        }
                }
        ]
}
{
        "change": [
                {
                        "kind": "delete",
                        "schema": "public",
                        "table": "test_cdc",
                        "oldkeys": {
                                "keynames": ["id"],
                                "keytypes": ["integer"],
                                "keyvalues": [1]
                        }
                }
        ]
}
{
        "change": [
        ]
}