• Postgresql中如何处理逻辑复制冲突


    Postgresql中,随着逻辑复制的广发使用,在逻辑复制中会出现各种各样的问题,今天介绍一下,如果逻辑复制出现冲突,我们该如何解决。

    复制冲突:

    在PG15版本中,逻辑复制的订阅端如果设置disable_on_error为true,订阅端由于违反约束导致逻辑复制错误,从而会终止逻辑复制。这种情况必须认为干预,逻辑复制才可以恢复正常。

    参数简要说明:

    新增了disable_on_error参数,订阅端在从发布端复制数据期间,如果检测到错误,是否应自动禁用该订阅。默认值为false,不会禁用订阅,会一直循环错误。如果设置为true,就会打破循环,终止报错,禁用该逻辑复制。

    在这里插入图片描述

    冲突错误消息包含2条重要信息:

    finish LSN。通常,LSN是一个指向WAL中某个位置的指针。在逻辑复制中,finish LSN表示已提交的事务commit_lsn,和二阶段事物中,已准备的事务所表示的prepare_lsn。
    Replication origin 名字,包含了跟踪复制进度的复制源的名称。在逻辑复制中,随着创建订阅,会自动创建相应的复制源。
    因为在使用pg_replication_origin_advance函数跳过事物的时候,需要上面的信息。

    通过跳过失败的事物,解决冲突

    常规操作建立一个发布和订阅

    #发布端:
    #创建一个简单测试表,插入一条数据1
    postgres=# create table tb1 (a int);
    CREATE TABLE
    postgres=# create publication pub for table tb1;
    CREATE PUBLICATION
    postgres=# INSERT INTO tb1 VALUES (1);
    INSERT 0 1
    
    #订阅端:
    CREATE TABLE tb1 (a int primary key);
    CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres host=10.4.9.166 port=1919 user=postgres password=postgres' PUBLICATION pub WITH (disable_on_error = true);
    
    postgres=# select * from tb1;
     a 
    ---
     1
    (1 row)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    模拟数据插入,通过跳过冲突的事物,恢复逻辑复制

    #发布端开启两个事物分别插入数据:
    postgres=# begin;  --事物1
    BEGIN
    postgres=*# INSERT INTO tb1 VALUES (generate_series(2, 4));
    INSERT 0 3
    postgres=*# INSERT INTO tb1 VALUES (1);  --插入重复数据
    INSERT 0 1
    postgres=*# INSERT INTO tb1 VALUES (generate_series(6, 8));
    INSERT 0 3
    postgres=*# commit ;
    COMMIT
    postgres=# begin;   -事物2
    BEGIN
    postgres=*# INSERT INTO tb1 VALUES (9);
    INSERT 0 1
    postgres=*# commit ;
    COMMIT
    postgres=# select * from tb1;
     a 
    ---
     1
     2
     3
     4
     1
     6
     7
     8
     9
    (9 rows)
    
    #订阅端:
    #查看数据字典发现有相关错误,因为设置disable_on_error为true,sub已经disable了
    postgres=# SELECT * FROM pg_stat_subscription_stats;
     subid | subname | apply_error_count | sync_error_count | stats_reset 
    -------+---------+-------------------+------------------+-------------
     16394 | sub     |                 1 |                0 | 
    (1 row)
    
    postgres=# SELECT oid, subname, subenabled, subdisableonerr FROM pg_subscription;
      oid  | subname | subenabled | subdisableonerr 
    -------+---------+------------+-----------------
     16394 | sub     | f          | t
    (1 row)
    
    #查看数据库日志,看到相关信息
    2022-11-01 11:12:47.576 CST,,,22328,,636088c7.5738,2,,2022-11-01 10:47:35 CST,4/104,743,ERROR,23505,"duplicate key value violates unique constraint ""tb1_pkey""","Key (a)=(1) already exists.",,,,"processing remote data for replication origin ""pg_16394"" during message type ""INSERT"" for replication target relation ""public.tb1"" in transaction 740, finished at 0/154E278",,,,"","logical replication worker",,0
    2022-11-01 11:12:47.576 CST,,,22328,,636088c7.5738,3,,2022-11-01 10:47:35 CST,4/0,0,LOG,00000,"subscription ""sub"" has been disabled because of an error",,,,,,,,,"","logical replication worker",,0
    
    #使用pg_replication_origin_advance跳过该冲突事物,为了跳过该事物我们在0/154E278的基础上加1使用0/154E279。
    postgres=# SELECT pg_replication_origin_advance('pg_16394', '0/154E279'::pg_lsn);
     pg_replication_origin_advance 
    -------------------------------
     
    (1 row)
    #跳过后,启用该订阅
    postgres=# alter subscription sub enable ;
    ALTER SUBSCRIPTION
    
    #可以看到事物1的数据全部被跳过,因为里面三条插入语句是一个事物。事物2的数据因为没有冲突,所以启用后,正常复制过来了。
    postgres=# select * from tb1;
     a 
    ---
     1
     9
    (2 rows)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    使用pg_replication_origin_advance也有弊端,如果跳过了正常的lsn,那么数据也会不一致。所以在使用的时候要非常小心

    #发布端删除数据重新测试一遍:
    postgres=# truncate tb1;
    TRUNCATE TABLE
    postgres=# insert into tb1 values (1);
    INSERT 0 1
    postgres=# begin;
    BEGIN
    postgres=*# INSERT INTO tb1 VALUES (generate_series(2, 4));
    INSERT 0 3
    postgres=*# INSERT INTO tb1 VALUES (1);
    INSERT 0 1
    postgres=*# INSERT INTO tb1 VALUES (generate_series(6, 8));
    INSERT 0 3
    postgres=*# end ;
    COMMIT
    postgres=# begin;
    BEGIN
    postgres=*# INSERT INTO tb1 VALUES (9);
    INSERT 0 1
    postgres=*# end;
    COMMIT
    postgres=# INSERT INTO tb1 VALUES (10);
    INSERT 0 1
    postgres=# INSERT INTO tb1 VALUES (11);
    INSERT 0 1
    postgres=# INSERT INTO tb1 VALUES (12);
    INSERT 0 1
    #找出大于9,等于10的LSN为0/01557800
    /opt/pgsql15/bin/pg_waldump 000000010000000000000001
    
    rmgr: Heap        len (rec/tot):     59/    59, tx:        762, lsn: 0/01557790, prev 0/01557760, desc: INSERT off 9 flags 0x08, blkref #0: rel 1663/5/16395 blk 0
    rmgr: Transaction len (rec/tot):     46/    46, tx:        762, lsn: 0/015577D0, prev 0/01557790, desc: COMMIT 2022-11-01 13:37:54.421163 CST
    rmgr: Heap        len (rec/tot):     59/    59, tx:        763, lsn: 0/01557800, prev 0/015577D0, desc: INSERT off 10 flags 0x08, blkref #0: rel 1663/5/16395 blk 0
    rmgr: Transaction len (rec/tot):     46/    46, tx:        763, lsn: 0/01557840, prev 0/01557800, desc: COMMIT 2022-11-01 13:37:57.140198 CST
    rmgr: Standby     len (rec/tot):     50/    50, tx:          0, lsn: 0/01557870, prev 0/01557840, desc: RUNNING_XACTS nextXid 764 latestCompletedXid 763 oldestRunningXid 764
    rmgr: Heap        len (rec/tot):     59/    59, tx:        764, lsn: 0/015578A8, prev 0/01557870, desc: INSERT off 11 flags 0x08, blkref #0: rel 1663/5/16395 blk 0
    rmgr: Transaction len (rec/tot):     46/    46, tx:        764, lsn: 0/015578E8, prev 0/015578A8, desc: COMMIT 2022-11-01 13:37:59.974290 CST
    rmgr: Heap        len (rec/tot):     59/    59, tx:        765, lsn: 0/01557918, prev 0/015578E8, desc: INSERT off 12 flags 0x08, blkref #0: rel 1663/5/16395 blk 0
    rmgr: Transaction len (rec/tot):     46/    46, tx:        765, lsn: 0/01557958, prev 0/01557918, desc: COMMIT 2022-11-01 13:38:02.687276 CST
    rmgr: Standby     len (rec/tot):     50/    50, tx:          0, lsn: 0/01557988, prev 0/01557958, desc: RUNNING_XACTS nextXid 766 latestCompletedXid 765 oldestRunningXid 766
    #订阅端:
    2022-11-01 13:37:51.241 CST,,,6566,,6360b05a.19a6,2,,2022-11-01 13:36:26 CST,4/208,771,ERROR,23505,"duplicate key value violates unique constraint ""tb1_pkey""","Key (a)=(1) already exists.",,,,"processing remote data for replication origin ""pg_16394"" during message type ""INSERT"" for replication target relation ""public.tb1"" in transaction 761, finished at 0/1557760",,,,"","logical replication worker",,0
    postgres=# select * from tb1 ;
     a 
    ---
     1
    (1 row)
    #跳到提交10这个事物的LSN
    postgres=# SELECT pg_replication_origin_advance('pg_16394', '0/01557800'::pg_lsn);
     pg_replication_origin_advance 
    -------------------------------
     
    (1 row)
    
    #由下可见,插入9的事物也可以被跳过,而不仅仅插入1的事物被跳过,这样就不只是丢失冲突数据了。
    postgres=# alter subscription sub enable ;
    ALTER SUBSCRIPTION
    postgres=#  select * from tb1 ;
     a  
    ----
      1
     10
     11
     12
    (4 rows)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    所以正式因为以上原因,我们在实际生产中使用该函数要十分小心,因为它可以跳过与冲突无关的其他事务。接下来我们看一下postgresql15版本使用ALTER SUBSCRIPTION SKIP正确跳过冲突事物。

    我们还按原来的表测试一下:

    #发布端:
    postgres=# truncate tb1;
    TRUNCATE TABLE
    postgres=# insert into tb1 values (1);
    INSERT 0 1
    postgres=# begin;
    BEGIN
    postgres=*# INSERT INTO tb1 VALUES (generate_series(2, 4));
    INSERT 0 3
    postgres=*# INSERT INTO tb1 VALUES (1);
    INSERT 0 1
    postgres=*# INSERT INTO tb1 VALUES (generate_series(6, 8));
    INSERT 0 3
    postgres=*# end ;
    COMMIT
    postgres=# begin;
    BEGIN
    postgres=*# INSERT INTO tb1 VALUES (9);
    INSERT 0 1
    postgres=*# end;
    COMMIT
    
    #订阅端
    #只有1正常过来了,复制处于disable了
    postgres=#  select * from tb1;
     a 
    ---
     1
    (1 row)
    #通过日志找到冲突事物LSN
    2022-11-01 14:16:04.704 CST,,,9306,,6360b6d7.245a,2,,2022-11-01 14:04:07 CST,4/270,788,ERROR,23505,"duplicate key value violates unique constraint ""tb1_pkey""","Key (a)=(1) already exists.",,,,"processing remote data for replication origin ""pg_16394"" during message type ""INSERT"" for replication target relation ""public.tb1"" in transaction 773, finished at 0/1564018",,,,"","logical replication worker",,0
    
    #通过新的特新命令跳过冲突LSN
    postgres=# ALTER SUBSCRIPTION sub SKIP (lsn = '0/1564018');
    ALTER SUBSCRIPTION
    postgres=# alter subscription sub enable ;
    ALTER SUBSCRIPTION
    #查看数据,跳过了插入1冲突的事物,后面插入9的正常
    postgres=# select * from tb1;
     a 
    ---
     1
     9
    (2 rows)
    
    #再看日志,也说明了成功跳过LSN为0/1564018的事物
    2022-11-01 14:18:25.324 CST,,,10703,,6360ba31.29cf,1,,2022-11-01 14:18:25 CST,4/283,0,LOG,00000,"logical replication apply worker for subscription ""sub"" has started",,,,,,,,,"","logical replication worker",,0
    2022-11-01 14:18:25.334 CST,,,10703,,6360ba31.29cf,2,,2022-11-01 14:18:25 CST,4/0,0,LOG,00000,"logical replication starts skipping transaction at LSN 0/1564018",,,,,"processing remote data for replication origin ""pg_16394"" during message type ""BEGIN"" in transaction 773, finished at 0/1564018",,,,"","logical replication worker",,0
    2022-11-01 14:18:25.334 CST,,,10703,,6360ba31.29cf,3,,2022-11-01 14:18:25 CST,4/0,0,LOG,00000,"logical replication completed skipping transaction at LSN 0/1564018",,,,,"processing remote data for replication origin ""pg_16394"" during message type ""COMMIT"" in transaction 773, finished at 0/1564018",,,,"","logical replication worker",,0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    那么我们再模拟一下跳过不冲突的事物看看,数据插入忽略,同上面的例子

    #订阅端日志
    2022-11-01 14:31:52.296 CST,,,11692,,6360bc8d.2dac,4,,2022-11-01 14:28:29 CST,4/330,815,ERROR,23505,"duplicate key value violates unique constraint ""tb1_pkey""","Key (a)=(1) already exists.",,,,"processing remote data for replication origin ""pg_16394"" during message type ""INSERT"" for replication target relation ""public.tb1"" in transaction 784, finished at 0/156C9D0",,,,"","logical replication worker",,0
    
    #发布端找一个大于冲突事物的lsn,然后跳过
    rmgr: Transaction len (rec/tot):     46/    46, tx:        784, lsn: 0/0156C9D0, prev 0/0156C990, desc: COMMIT 2022-11-01 14:31:52.295665 CST
    rmgr: Heap        len (rec/tot):     59/    59, tx:        785, lsn: 0/0156CA00, prev 0/0156C9D0, desc: INSERT off 9 flags 0x08, blkref #0: rel 1663/5/16399 blk 0
    rmgr: Transaction len (rec/tot):     46/    46, tx:        785, lsn: 0/0156CA40, prev 0/0156CA00, desc: COMMIT 2022-11-01 14:31:55.880887 CST
    rmgr: Standby     len (rec/tot):     50/    50, tx:          0, lsn: 0/0156CA70, prev 0/0156CA40, desc: RUNNING_XACTS nextXid 786 latestCompletedXid 785 oldestRunningXid 786
    rmgr: Standby     len (rec/tot):     50/    50, tx:          0, lsn: 0/0156CAA8, prev 0/0156CA70, desc: RUNNING_XACTS nextXid 786 latestCompletedXid 785 oldestRunningXid 786
    rmgr: XLOG        len (rec/tot):    114/   114, tx:          0, lsn: 0/0156CAE0, prev 0/0156CAA8, desc: CHECKPOINT_ONLINE redo 0/156CAA8; tli 1; prev tli 1; fpw true; xid 0:786; oid 24576; multi 1; offset 0; oldest xid 716 in DB 1; oldest multi 1 in DB 1; oldest/newest commit timestamp xid: 0/0; oldest running xid 786; online
    rmgr: Standby     len (rec/tot):     50/    50, tx:          0, lsn: 0/0156CB58, prev 0/0156CAE0, desc: RUNNING_XACTS nextXid 786 latestCompletedXid 785 oldestRunningXid 786
    rmgr: Heap        len (rec/tot):     64/   448, tx:        786, lsn: 0/0156CB90, prev 0/0156CB58, desc: INSERT off 10 flags 0x08, blkref #0: rel 1663/5/16399 blk 0 FPW
    rmgr: Transaction len (rec/tot):     46/    46, tx:        786, lsn: 0/0156CD50, prev 0/0156CB90, desc: COMMIT 2022-11-01 14:32:56.031886 CST
    rmgr: Heap        len (rec/tot):     59/    59, tx:        787, lsn: 0/0156CD80, prev 0/0156CD50, desc: INSERT off 11 flags 0x08, blkref #0: rel 1663/5/16399 blk 0
    rmgr: Transaction len (rec/tot):     46/    46, tx:        787, lsn: 0/0156CDC0, prev 0/0156CD80, desc: COMMIT 2022-11-01 14:32:57.692334 CST
    rmgr: Heap        len (rec/tot):     59/    59, tx:        788, lsn: 0/0156CDF0, prev 0/0156CDC0, desc: INSERT off 12 flags 0x08, blkref #0: rel 1663/5/16399 blk 0
    
    #订阅端跳过LSN为0/156CD50的事物,但是并不能成功,日志会重复报错
    postgres=#  ALTER SUBSCRIPTION sub SKIP (lsn = '0/156CD50');
    ALTER SUBSCRIPTION
    postgres=# alter subscription sub enable ;
    ALTER SUBSCRIPTION
    postgres=#  select * from tb1;
     a 
    ---
     1
    (1 row)
    
    #精确跳过冲突事物LSN,才能正常恢复
    postgres=#  ALTER SUBSCRIPTION sub SKIP (lsn = '0/156C9D0');
    ALTER SUBSCRIPTION
    postgres=# SELECT subname, subskiplsn, subenabled FROM pg_subscription;
     subname | subskiplsn | subenabled 
    ---------+------------+------------
     sub     | 0/156C9D0  | f
    (1 row)
    
    postgres=# alter subscription sub enable ;
    ALTER SUBSCRIPTION
    postgres=# SELECT subname, subskiplsn, subenabled FROM pg_subscription;
     subname | subskiplsn | subenabled 
    ---------+------------+------------
     sub     | 0/0        | t
    (1 row)
    
    postgres=#  select * from tb1;
     a  
    ----
      1
      9
     10
     11
     12
    (5 rows)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    由此可见,该功能内部做了安全检查,要正确跳过冲突事物才可以继续恢复逻辑复制,这样就避免了我们误跳非冲突事物。

    参考:
    https://www.postgresql.fastware.com/blog/addressing-replication-conflicts-using-alter-subscription-skip
    https://www.postgresql.fastware.com/blog/how-to-handle-logical-replication-conflicts-in-postgresql
    https://www.postgresql.org/docs/15/sql-altersubscription.html
    https://www.postgresql.org/docs/15/logical-replication-conflicts.html

  • 相关阅读:
    HTML+CSS+JS简易计算器
    API简介,如何运用API接口获取商品数据(淘宝/天猫、1688、拼多多、京东等二十多个海内外电商平台)
    【Excel】WPS单元格快速转换表格字母大小写
    [附源码]计算机毕业设计springboot-大学生健康档案管理
    2022年最新最详细的安装Node.js以及cnpm(详细图解过程、绝对成功)
    如何远程控制别人电脑进行技术支持?
    企业级数据安全,天翼云是这样理解的
    Spring Security JWT Authentication and Authorisation(一)
    6G显卡显存不足出现CUDA Error:out of memory解决办法
    从零学算法(剑指 Offer 61)
  • 原文地址:https://blog.csdn.net/dazuiba008/article/details/127575527