在Postgresql中,随着逻辑复制的广发使用,在逻辑复制中会出现各种各样的问题,今天介绍一下,如果逻辑复制出现冲突,我们该如何解决。
在PG15版本中,逻辑复制的订阅端如果设置disable_on_error为true,订阅端由于违反约束导致逻辑复制错误,从而会终止逻辑复制。这种情况必须认为干预,逻辑复制才可以恢复正常。
新增了disable_on_error参数,订阅端在从发布端复制数据期间,如果检测到错误,是否应自动禁用该订阅。默认值为false,不会禁用订阅,会一直循环错误。如果设置为true,就会打破循环,终止报错,禁用该逻辑复制。

finish LSN。通常,LSN是一个指向WAL中某个位置的指针。在逻辑复制中,finish LSN表示已提交的事务commit_lsn,和二阶段事物中,已准备的事务所表示的prepare_lsn。
Replication origin 名字,包含了跟踪复制进度的复制源的名称。在逻辑复制中,随着创建订阅,会自动创建相应的复制源。
因为在使用pg_replication_origin_advance函数跳过事物的时候,需要上面的信息。
常规操作建立一个发布和订阅
#发布端:
#创建一个简单测试表,插入一条数据1
postgres=# create table tb1 (a int);
CREATE TABLE
postgres=# create publication pub for table tb1;
CREATE PUBLICATION
postgres=# INSERT INTO tb1 VALUES (1);
INSERT 0 1
#订阅端:
CREATE TABLE tb1 (a int primary key);
CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres host=10.4.9.166 port=1919 user=postgres password=postgres' PUBLICATION pub WITH (disable_on_error = true);
postgres=# select * from tb1;
a
---
1
(1 row)
模拟数据插入,通过跳过冲突的事物,恢复逻辑复制
#发布端开启两个事物分别插入数据:
postgres=# begin; --事物1
BEGIN
postgres=*# INSERT INTO tb1 VALUES (generate_series(2, 4));
INSERT 0 3
postgres=*# INSERT INTO tb1 VALUES (1); --插入重复数据
INSERT 0 1
postgres=*# INSERT INTO tb1 VALUES (generate_series(6, 8));
INSERT 0 3
postgres=*# commit ;
COMMIT
postgres=# begin; -事物2
BEGIN
postgres=*# INSERT INTO tb1 VALUES (9);
INSERT 0 1
postgres=*# commit ;
COMMIT
postgres=# select * from tb1;
a
---
1
2
3
4
1
6
7
8
9
(9 rows)
#订阅端:
#查看数据字典发现有相关错误,因为设置disable_on_error为true,sub已经disable了
postgres=# SELECT * FROM pg_stat_subscription_stats;
subid | subname | apply_error_count | sync_error_count | stats_reset
-------+---------+-------------------+------------------+-------------
16394 | sub | 1 | 0 |
(1 row)
postgres=# SELECT oid, subname, subenabled, subdisableonerr FROM pg_subscription;
oid | subname | subenabled | subdisableonerr
-------+---------+------------+-----------------
16394 | sub | f | t
(1 row)
#查看数据库日志,看到相关信息
2022-11-01 11:12:47.576 CST,,,22328,,636088c7.5738,2,,2022-11-01 10:47:35 CST,4/104,743,ERROR,23505,"duplicate key value violates unique constraint ""tb1_pkey""","Key (a)=(1) already exists.",,,,"processing remote data for replication origin ""pg_16394"" during message type ""INSERT"" for replication target relation ""public.tb1"" in transaction 740, finished at 0/154E278",,,,"","logical replication worker",,0
2022-11-01 11:12:47.576 CST,,,22328,,636088c7.5738,3,,2022-11-01 10:47:35 CST,4/0,0,LOG,00000,"subscription ""sub"" has been disabled because of an error",,,,,,,,,"","logical replication worker",,0
#使用pg_replication_origin_advance跳过该冲突事物,为了跳过该事物我们在0/154E278的基础上加1使用0/154E279。
postgres=# SELECT pg_replication_origin_advance('pg_16394', '0/154E279'::pg_lsn);
pg_replication_origin_advance
-------------------------------
(1 row)
#跳过后,启用该订阅
postgres=# alter subscription sub enable ;
ALTER SUBSCRIPTION
#可以看到事物1的数据全部被跳过,因为里面三条插入语句是一个事物。事物2的数据因为没有冲突,所以启用后,正常复制过来了。
postgres=# select * from tb1;
a
---
1
9
(2 rows)
使用pg_replication_origin_advance也有弊端,如果跳过了正常的lsn,那么数据也会不一致。所以在使用的时候要非常小心
#发布端删除数据重新测试一遍:
postgres=# truncate tb1;
TRUNCATE TABLE
postgres=# insert into tb1 values (1);
INSERT 0 1
postgres=# begin;
BEGIN
postgres=*# INSERT INTO tb1 VALUES (generate_series(2, 4));
INSERT 0 3
postgres=*# INSERT INTO tb1 VALUES (1);
INSERT 0 1
postgres=*# INSERT INTO tb1 VALUES (generate_series(6, 8));
INSERT 0 3
postgres=*# end ;
COMMIT
postgres=# begin;
BEGIN
postgres=*# INSERT INTO tb1 VALUES (9);
INSERT 0 1
postgres=*# end;
COMMIT
postgres=# INSERT INTO tb1 VALUES (10);
INSERT 0 1
postgres=# INSERT INTO tb1 VALUES (11);
INSERT 0 1
postgres=# INSERT INTO tb1 VALUES (12);
INSERT 0 1
#找出大于9,等于10的LSN为0/01557800
/opt/pgsql15/bin/pg_waldump 000000010000000000000001
rmgr: Heap len (rec/tot): 59/ 59, tx: 762, lsn: 0/01557790, prev 0/01557760, desc: INSERT off 9 flags 0x08, blkref #0: rel 1663/5/16395 blk 0
rmgr: Transaction len (rec/tot): 46/ 46, tx: 762, lsn: 0/015577D0, prev 0/01557790, desc: COMMIT 2022-11-01 13:37:54.421163 CST
rmgr: Heap len (rec/tot): 59/ 59, tx: 763, lsn: 0/01557800, prev 0/015577D0, desc: INSERT off 10 flags 0x08, blkref #0: rel 1663/5/16395 blk 0
rmgr: Transaction len (rec/tot): 46/ 46, tx: 763, lsn: 0/01557840, prev 0/01557800, desc: COMMIT 2022-11-01 13:37:57.140198 CST
rmgr: Standby len (rec/tot): 50/ 50, tx: 0, lsn: 0/01557870, prev 0/01557840, desc: RUNNING_XACTS nextXid 764 latestCompletedXid 763 oldestRunningXid 764
rmgr: Heap len (rec/tot): 59/ 59, tx: 764, lsn: 0/015578A8, prev 0/01557870, desc: INSERT off 11 flags 0x08, blkref #0: rel 1663/5/16395 blk 0
rmgr: Transaction len (rec/tot): 46/ 46, tx: 764, lsn: 0/015578E8, prev 0/015578A8, desc: COMMIT 2022-11-01 13:37:59.974290 CST
rmgr: Heap len (rec/tot): 59/ 59, tx: 765, lsn: 0/01557918, prev 0/015578E8, desc: INSERT off 12 flags 0x08, blkref #0: rel 1663/5/16395 blk 0
rmgr: Transaction len (rec/tot): 46/ 46, tx: 765, lsn: 0/01557958, prev 0/01557918, desc: COMMIT 2022-11-01 13:38:02.687276 CST
rmgr: Standby len (rec/tot): 50/ 50, tx: 0, lsn: 0/01557988, prev 0/01557958, desc: RUNNING_XACTS nextXid 766 latestCompletedXid 765 oldestRunningXid 766
#订阅端:
2022-11-01 13:37:51.241 CST,,,6566,,6360b05a.19a6,2,,2022-11-01 13:36:26 CST,4/208,771,ERROR,23505,"duplicate key value violates unique constraint ""tb1_pkey""","Key (a)=(1) already exists.",,,,"processing remote data for replication origin ""pg_16394"" during message type ""INSERT"" for replication target relation ""public.tb1"" in transaction 761, finished at 0/1557760",,,,"","logical replication worker",,0
postgres=# select * from tb1 ;
a
---
1
(1 row)
#跳到提交10这个事物的LSN
postgres=# SELECT pg_replication_origin_advance('pg_16394', '0/01557800'::pg_lsn);
pg_replication_origin_advance
-------------------------------
(1 row)
#由下可见,插入9的事物也可以被跳过,而不仅仅插入1的事物被跳过,这样就不只是丢失冲突数据了。
postgres=# alter subscription sub enable ;
ALTER SUBSCRIPTION
postgres=# select * from tb1 ;
a
----
1
10
11
12
(4 rows)
所以正式因为以上原因,我们在实际生产中使用该函数要十分小心,因为它可以跳过与冲突无关的其他事务。接下来我们看一下postgresql15版本使用ALTER SUBSCRIPTION SKIP正确跳过冲突事物。
我们还按原来的表测试一下:
#发布端:
postgres=# truncate tb1;
TRUNCATE TABLE
postgres=# insert into tb1 values (1);
INSERT 0 1
postgres=# begin;
BEGIN
postgres=*# INSERT INTO tb1 VALUES (generate_series(2, 4));
INSERT 0 3
postgres=*# INSERT INTO tb1 VALUES (1);
INSERT 0 1
postgres=*# INSERT INTO tb1 VALUES (generate_series(6, 8));
INSERT 0 3
postgres=*# end ;
COMMIT
postgres=# begin;
BEGIN
postgres=*# INSERT INTO tb1 VALUES (9);
INSERT 0 1
postgres=*# end;
COMMIT
#订阅端
#只有1正常过来了,复制处于disable了
postgres=# select * from tb1;
a
---
1
(1 row)
#通过日志找到冲突事物LSN
2022-11-01 14:16:04.704 CST,,,9306,,6360b6d7.245a,2,,2022-11-01 14:04:07 CST,4/270,788,ERROR,23505,"duplicate key value violates unique constraint ""tb1_pkey""","Key (a)=(1) already exists.",,,,"processing remote data for replication origin ""pg_16394"" during message type ""INSERT"" for replication target relation ""public.tb1"" in transaction 773, finished at 0/1564018",,,,"","logical replication worker",,0
#通过新的特新命令跳过冲突LSN
postgres=# ALTER SUBSCRIPTION sub SKIP (lsn = '0/1564018');
ALTER SUBSCRIPTION
postgres=# alter subscription sub enable ;
ALTER SUBSCRIPTION
#查看数据,跳过了插入1冲突的事物,后面插入9的正常
postgres=# select * from tb1;
a
---
1
9
(2 rows)
#再看日志,也说明了成功跳过LSN为0/1564018的事物
2022-11-01 14:18:25.324 CST,,,10703,,6360ba31.29cf,1,,2022-11-01 14:18:25 CST,4/283,0,LOG,00000,"logical replication apply worker for subscription ""sub"" has started",,,,,,,,,"","logical replication worker",,0
2022-11-01 14:18:25.334 CST,,,10703,,6360ba31.29cf,2,,2022-11-01 14:18:25 CST,4/0,0,LOG,00000,"logical replication starts skipping transaction at LSN 0/1564018",,,,,"processing remote data for replication origin ""pg_16394"" during message type ""BEGIN"" in transaction 773, finished at 0/1564018",,,,"","logical replication worker",,0
2022-11-01 14:18:25.334 CST,,,10703,,6360ba31.29cf,3,,2022-11-01 14:18:25 CST,4/0,0,LOG,00000,"logical replication completed skipping transaction at LSN 0/1564018",,,,,"processing remote data for replication origin ""pg_16394"" during message type ""COMMIT"" in transaction 773, finished at 0/1564018",,,,"","logical replication worker",,0
那么我们再模拟一下跳过不冲突的事物看看,数据插入忽略,同上面的例子
#订阅端日志
2022-11-01 14:31:52.296 CST,,,11692,,6360bc8d.2dac,4,,2022-11-01 14:28:29 CST,4/330,815,ERROR,23505,"duplicate key value violates unique constraint ""tb1_pkey""","Key (a)=(1) already exists.",,,,"processing remote data for replication origin ""pg_16394"" during message type ""INSERT"" for replication target relation ""public.tb1"" in transaction 784, finished at 0/156C9D0",,,,"","logical replication worker",,0
#发布端找一个大于冲突事物的lsn,然后跳过
rmgr: Transaction len (rec/tot): 46/ 46, tx: 784, lsn: 0/0156C9D0, prev 0/0156C990, desc: COMMIT 2022-11-01 14:31:52.295665 CST
rmgr: Heap len (rec/tot): 59/ 59, tx: 785, lsn: 0/0156CA00, prev 0/0156C9D0, desc: INSERT off 9 flags 0x08, blkref #0: rel 1663/5/16399 blk 0
rmgr: Transaction len (rec/tot): 46/ 46, tx: 785, lsn: 0/0156CA40, prev 0/0156CA00, desc: COMMIT 2022-11-01 14:31:55.880887 CST
rmgr: Standby len (rec/tot): 50/ 50, tx: 0, lsn: 0/0156CA70, prev 0/0156CA40, desc: RUNNING_XACTS nextXid 786 latestCompletedXid 785 oldestRunningXid 786
rmgr: Standby len (rec/tot): 50/ 50, tx: 0, lsn: 0/0156CAA8, prev 0/0156CA70, desc: RUNNING_XACTS nextXid 786 latestCompletedXid 785 oldestRunningXid 786
rmgr: XLOG len (rec/tot): 114/ 114, tx: 0, lsn: 0/0156CAE0, prev 0/0156CAA8, desc: CHECKPOINT_ONLINE redo 0/156CAA8; tli 1; prev tli 1; fpw true; xid 0:786; oid 24576; multi 1; offset 0; oldest xid 716 in DB 1; oldest multi 1 in DB 1; oldest/newest commit timestamp xid: 0/0; oldest running xid 786; online
rmgr: Standby len (rec/tot): 50/ 50, tx: 0, lsn: 0/0156CB58, prev 0/0156CAE0, desc: RUNNING_XACTS nextXid 786 latestCompletedXid 785 oldestRunningXid 786
rmgr: Heap len (rec/tot): 64/ 448, tx: 786, lsn: 0/0156CB90, prev 0/0156CB58, desc: INSERT off 10 flags 0x08, blkref #0: rel 1663/5/16399 blk 0 FPW
rmgr: Transaction len (rec/tot): 46/ 46, tx: 786, lsn: 0/0156CD50, prev 0/0156CB90, desc: COMMIT 2022-11-01 14:32:56.031886 CST
rmgr: Heap len (rec/tot): 59/ 59, tx: 787, lsn: 0/0156CD80, prev 0/0156CD50, desc: INSERT off 11 flags 0x08, blkref #0: rel 1663/5/16399 blk 0
rmgr: Transaction len (rec/tot): 46/ 46, tx: 787, lsn: 0/0156CDC0, prev 0/0156CD80, desc: COMMIT 2022-11-01 14:32:57.692334 CST
rmgr: Heap len (rec/tot): 59/ 59, tx: 788, lsn: 0/0156CDF0, prev 0/0156CDC0, desc: INSERT off 12 flags 0x08, blkref #0: rel 1663/5/16399 blk 0
#订阅端跳过LSN为0/156CD50的事物,但是并不能成功,日志会重复报错
postgres=# ALTER SUBSCRIPTION sub SKIP (lsn = '0/156CD50');
ALTER SUBSCRIPTION
postgres=# alter subscription sub enable ;
ALTER SUBSCRIPTION
postgres=# select * from tb1;
a
---
1
(1 row)
#精确跳过冲突事物LSN,才能正常恢复
postgres=# ALTER SUBSCRIPTION sub SKIP (lsn = '0/156C9D0');
ALTER SUBSCRIPTION
postgres=# SELECT subname, subskiplsn, subenabled FROM pg_subscription;
subname | subskiplsn | subenabled
---------+------------+------------
sub | 0/156C9D0 | f
(1 row)
postgres=# alter subscription sub enable ;
ALTER SUBSCRIPTION
postgres=# SELECT subname, subskiplsn, subenabled FROM pg_subscription;
subname | subskiplsn | subenabled
---------+------------+------------
sub | 0/0 | t
(1 row)
postgres=# select * from tb1;
a
----
1
9
10
11
12
(5 rows)
由此可见,该功能内部做了安全检查,要正确跳过冲突事物才可以继续恢复逻辑复制,这样就避免了我们误跳非冲突事物。
参考:
https://www.postgresql.fastware.com/blog/addressing-replication-conflicts-using-alter-subscription-skip
https://www.postgresql.fastware.com/blog/how-to-handle-logical-replication-conflicts-in-postgresql
https://www.postgresql.org/docs/15/sql-altersubscription.html
https://www.postgresql.org/docs/15/logical-replication-conflicts.html