• PostgreSQL基于Patroni方案的高可用启动流程分析


    什么是Patroni
    在很多生产环境中,分布式数据库以高可用性、数据分布性、负载均衡等特性,被用户广泛应用。而作为高可用数据库的解决方案——Patroni,是专门为PostgreSQL数据库设计的,一款以Python语言实现的高可用架构模板。该架构模板,旨在通过外部共享存储软件(kubernetes、etcd、etcd3、zookeeper、aws等),实现 PostgreSQL 集群的自动故障恢复、自动故障转移、自动备份等能力。
    主要特点:
    1.自动故障检测和恢复:Patroni 监视 PostgreSQL 集群的健康状态,一旦检测到主节点故障,它将自动执行故障恢复操作,将其中一个从节点晋升为主节点。
    2.自动故障转移:一旦 Patroni 定义了新的主节点,它将协调所有从节点和客户端,以确保它们正确地切换到新的主节点,从而实现快速、无缝的故障转移。
    3.一致性和数据完整性:Patroni 高度关注数据一致性和完整性。在故障切换过程中,它会确保在新主节点接管之前,数据不会丢失或受损。
    4.外部共享配置存储:Patroni 使用外部键值存储(如 ZooKeeper、etcd 或 Consul)来存储配置和集群状态信息。这确保了配置的一致性和可访问性,并支持多个 Patroni 实例之间的协作。
    5.支持多种云环境和物理硬件:Patroni 不仅可以在云环境中运行,还可以部署在物理硬件上,提供了广泛的部署选项。
    Patroni架构解析

    ●DCS(Distributed Configuration Store ):是指分布式配置信息的存储位置,可支持kubernetes、etcd、etcd3、zookeeper、aws等存储媒介,由Patroni进行分布式配置信息的读写。
    ●核心Patroni:负责将分布式配置信息写入DCS中,并设置PostgreSQL节点的角色以及PostgreSQL配置信息,管理PostgreSQL的生命周期。
    ●PostgreSQL节点:各PostgreSQL节点,根据Patroni设置的PostgreSQL配置信息,生成主从关系链,以流复制的方式进行数据同步,最终生成一个PostgreSQL集群。
    Patroni高可用源码分析
    Patroni高可用启动流程

    流程说明:
    ●加载集群信息,通过DCS支持的API接口,获取集群信息,主要内容如下:
    ○config:记录pg集群ID以及配置信息(包括pg参数信息、一些超时时间配置等),用于集群校验、节点重建等;
    ○leader:记录主节点选举时间、心跳时间、选举周期、最新的lsn等,用于主节点完成竞争后的信息记录;
    ○sync: 记录主节点和同步节点信息,由主节点记录,用于主从切换、故障转移的同步节点校验;
    ○failover: 记录最后一次故障转移的时间。
    ●集群状态检测,主要检测集群配置信息的内容校验,当前集群的整体状态及节点状态,判断通过什么方式来启动PostgreSQL;
    ●启动PostgreSQL,用于初始化PostgreSQL目录,根据集群信息设置相应的PostgreSQL配置信息,并启动;
    ●生成PostgreSQL集群,指将完成启动的PostgreSQL节点,通过设置主从角色,关联不同角色的PostgreSQL节点,最终生成完整的集群。
    Patroni高可用启动流程分析
    加载集群信息
    加载集群信息,是高可用流程启动的第一步,也是生成PostgreSQL集群的最关键信息。

    第一步,记载集群信息

    try:
        self.load_cluster_from_dcs()
        self.state_handler.reset_cluster_info_state(self.cluster, self.patroni.nofailover)
    except Exception:
        self.state_handler.reset_cluster_info_state(None, self.patroni.nofailover)
        raise
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    通过DCS接口加载集群信息

    def load_cluster_from_dcs(self):
        cluster = self.dcs.get_cluster()
    
    • 1
    • 2
    # We want to keep the state of cluster when it was healthy
    if not cluster.is_unlocked() or not self.old_cluster:
        self.old_cluster = cluster
    self.cluster = cluster
    
    if not self.has_lock(False):
        self.set_is_leader(False)
    
    self._leader_timeline = None if cluster.is_unlocked() else cluster.leader.timeline
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    集群接口

    def get_cluster(self, force=False):
        if force:
            self._bypass_caches()
        try:
            cluster = self._load_cluster()
        except Exception:
            self.reset_cluster()
            raise
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    self._last_seen = int(time.time())
    
    with self._cluster_thread_lock:
        self._cluster = cluster
        self._cluster_valid_till = time.time() + self.ttl
        return cluster
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @abc.abstractmethod
    def _load_cluster(self):
        """Internally this method should build  `Cluster` object which
           represents current state and topology of the cluster in DCS.
           this method supposed to be called only by `get_cluster` method.
    
    • 1
    • 2
    • 3
    • 4
    • 5
       raise `~DCSError` in case of communication or other problems with DCS.
       If the current node was running as a master and exception raised,
       instance would be demoted."""
    
    • 1
    • 2
    • 3

    以Kubernetes作为DCS为例

    def _load_cluster(self):
        stop_time = time.time() + self._retry.deadline
        self._api.refresh_api_servers_cache()
        try:
            with self._condition:
                self._wait_caches(stop_time)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
            members = [self.member(pod) for pod in self._pods.copy().values()]
            nodes = self._kinds.copy()
    
        config = nodes.get(self.config_path)
        metadata = config and config.metadata
        annotations = metadata and metadata.annotations or {}
    
        # get initialize flag
        initialize = annotations.get(self._INITIALIZE)
    
        # get global dynamic configuration
        config = ClusterConfig.from_node(metadata and metadata.resource_version,
                                         annotations.get(self._CONFIG) or '{}',
                                         metadata.resource_version if self._CONFIG in annotations else 0)
    
        # get timeline history
        history = TimelineHistory.from_node(metadata and metadata.resource_version,
                                            annotations.get(self._HISTORY) or '[]')
    
        leader = nodes.get(self.leader_path)
        metadata = leader and leader.metadata
        self._leader_resource_version = metadata.resource_version if metadata else None
        annotations = metadata and metadata.annotations or {}
    
        # get last known leader lsn
        last_lsn = annotations.get(self._OPTIME)
        try:
            last_lsn = 0 if last_lsn is None else int(last_lsn)
        except Exception:
            last_lsn = 0
    
        # get permanent slots state (confirmed_flush_lsn)
        slots = annotations.get('slots')
        try:
            slots = slots and json.loads(slots)
        except Exception:
            slots = None
    
        # get leader
        leader_record = {n: annotations.get(n) for n in (self._LEADER, 'acquireTime',
                         'ttl', 'renewTime', 'transitions') if n in annotations}
        if (leader_record or self._leader_observed_record) and leader_record != self._leader_observed_record:
            self._leader_observed_record = leader_record
            self._leader_observed_time = time.time()
    
        leader = leader_record.get(self._LEADER)
        try:
            ttl = int(leader_record.get('ttl')) or self._ttl
        except (TypeError, ValueError):
            ttl = self._ttl
    
        if not metadata or not self._leader_observed_time or self._leader_observed_time + ttl < time.time():
            leader = None
    
        if metadata:
            member = Member(-1, leader, None, {})
            member = ([m for m in members if m.name == leader] or [member])[0]
            leader = Leader(metadata.resource_version, None, member)
    
        # failover key
        failover = nodes.get(self.failover_path)
        metadata = failover and failover.metadata
        failover = Failover.from_node(metadata and metadata.resource_version,
                                      metadata and (metadata.annotations or {}).copy())
    
        # get synchronization state
        sync = nodes.get(self.sync_path)
        metadata = sync and sync.metadata
        sync = SyncState.from_node(metadata and metadata.resource_version,  metadata and metadata.annotations)
    
        return Cluster(initialize, config, leader, last_lsn, members, failover, sync, history, slots)
    except Exception:
        logger.exception('get_cluster')
        raise KubernetesError('Kubernetes API is not responding properly')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    上述集群信息中,主要以xxx-config、xxx-leader、xxx-failover、xxx-sync作为配置信息,具体内容如下:
    ●xxx-config

    % kubectl get cm pg142-1013-postgresql-config -oyaml
    apiVersion: v1
    kind: ConfigMap
    metadata:
      annotations:
        config: '{"loop_wait":10,"maximum_lag_on_failover":33554432,"postgresql":{"parameters":{"archive_command":"/bin/true","archive_mode":"on","archive_timeout":"1800s","autovacuum":"on","autovacuum_analyze_scale_factor":0.02,"autovacuum_max_workers":"3","autovacuum_naptime":"5min","autovacuum_vacuum_cost_delay":"2ms","autovacuum_vacuum_cost_limit":"-1","autovacuum_vacuum_scale_factor":0.05,"autovacuum_work_mem":"128MB","backend_flush_after":"0","bgwriter_delay":"200ms","bgwriter_flush_after":"256","bgwriter_lru_maxpages":"100","bgwriter_lru_multiplier":"2","checkpoint_completion_target":"0.9","checkpoint_flush_after":"256kB","checkpoint_timeout":"5min","commit_delay":"0","constraint_exclusion":"partition","datestyle":"iso,
          mdy","deadlock_timeout":"1s","default_text_search_config":"pg_catalog.english","dynamic_shared_memory_type":"posix","effective_cache_size":"32768","fsync":"on","full_page_writes":"on","hot_standby":"on","hot_standby_feedback":"off","huge_pages":"off","idle_in_transaction_session_timeout":"600000","lc_messages":"en_US.UTF-8","lc_monetary":"en_US.UTF-8","lc_numeric":"en_US.UTF-8","lc_time":"en_US.UTF-8","listen_addresses":"*","log_autovacuum_min_duration":"0","log_checkpoints":"on","log_connections":"off","log_disconnections":"off","log_error_verbosity":"default","log_line_prefix":"%t
          [%p]: [%l-1] %c %x %d %u %a %h","log_lock_waits":"on","log_min_duration_statement":"500","log_rotation_size":"0","log_statement":"none","log_temp_files":0,"log_timezone":"Asia/Shanghai","maintenance_work_mem":"32768","max_connections":"170","max_parallel_maintenance_workers":"2","max_parallel_workers":"2","max_parallel_workers_per_gather":"2","max_replication_slots":"10","max_standby_archive_delay":"30s","max_standby_streaming_delay":"30s","max_wal_senders":"10","max_wal_size":"2048","max_worker_processes":"8","old_snapshot_threshold":"-1","pg_stat_statements.max":"10000","pg_stat_statements.save":"on","pg_stat_statements.track":"all","pgaudit.log":"NONE","pgaudit.log_catalog":"on","pgaudit.log_client":"off","pgaudit.log_level":"log","pgaudit.log_parameter":"off","pgaudit.log_relation":"off","pgaudit.log_rows":"off","pgaudit.log_statement":"on","pgaudit.log_statement_once":"off","pgaudit.role":"","random_page_cost":"4","restart_after_crash":"on","synchronous_commit":"on","tcp_keepalives_count":"0","tcp_keepalives_idle":"900","tcp_keepalives_interval":"100","temp_buffers":"8MB","timezone":"Asia/Shanghai","track_activity_query_size":"1kB","track_functions":"all","track_io_timing":"off","unix_socket_directories":"/var/run/postgresql","vacuum_cost_delay":"0ms","vacuum_cost_limit":"200","wal_buffers":"2048","wal_compression":"on","wal_keep_segments":"128","wal_keep_size":"2048MB","wal_level":"replica","wal_log_hints":"on","wal_receiver_status_interval":"10s","wal_sender_timeout":"1min","wal_writer_delay":"200ms","wal_writer_flush_after":"1MB","work_mem":"4MB"},"use_pg_rewind":true,"use_slots":true},"retry_timeout":10,"synchronous_mode":true,"ttl":30}'
        initialize: "7289263672843878470"
      creationTimestamp: "2023-10-13T02:25:51Z"
      labels:
        application: spilo
        cluster-name: pg142-1013-postgresql
      name: pg142-1013-postgresql-config
      namespace: default
      resourceVersion: "22858249"
      uid: dfa64d28-e939-4bdd-8db1-a3485fa09637
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    上述例子中,下有和2个参数,
    1.定义集群的整体配置信息,这里包含了PostgreSQL配置参数以及集群参数(选举等待时间、允许的最大WAL延迟量、是否开启同步模式等)等;
    2.定义了集群的ID,该值对应pg_controldata命令内的值,因此,所有集群内的PostgreSQL节点有相同的sys_id。

    root@pg142-1013-postgresql-1:/home/postgres# pg_controldata | grep "Database system identifier"
    Database system identifier:           7289263672843878470
    ●xxx-leader
    % kubectl get cm pg142-1013-postgresql-leader -oyaml
    apiVersion: v1
    kind: ConfigMap
    metadata:
      annotations:
        acquireTime: "2023-10-13T02:26:06.973552+00:00"
        leader: pg142-1013-postgresql-0
        optime: "67109192"
        renewTime: "2023-10-16T07:02:57.418940+00:00"
        transitions: "0"
        ttl: "30"
      creationTimestamp: "2023-10-13T02:26:07Z"
      labels:
        application: spilo
        cluster-name: pg142-1013-postgresql
      name: pg142-1013-postgresql-leader
      namespace: default
      resourceVersion: "23286847"
      uid: cb235c85-6a21-454d-8320-222205eaa77f
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    上述下,各参数含义:
    1.acquireTime:获取集群leader锁时间;
    2.leader:集群leader锁的拥有者,这里表示某个PostgreSQL节点名称;
    3.optime:集群leader的最新LSN的十进制数,这里;
    4.renewTime:集群leader锁的拥有者心跳时间,心跳周期与xxx-config中的对应;
    5.transitions:集群leader锁占用次数,一般发生在主从切换或故障转移场景,依次累加;
    6.ttl:故障转移前的选举时间,即超过TTL时间下,没有获取到renewTime值更新,便触发选举,由新的节点占用leader锁。
    ●xxx-sync

    % kubectl get cm pg142-1013-postgresql-sync -oyaml
    apiVersion: v1
    kind: ConfigMap
    metadata:
      annotations:
        leader: pg142-1013-postgresql-1
        sync_standby: pg142-1013-postgresql-0
      creationTimestamp: "2023-10-16T06:54:39Z"
      labels:
        application: spilo
        cluster-name: pg142-1013-postgresql
      name: pg142-1013-postgresql-sync
      namespace: default
      resourceVersion: "23288352"
      uid: 1c46e63b-8b90-4fc6-9596-8e2f71fba2ab
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    上述内容记录了2个信息:
    1.leader:显示leader节点的名称;
    2.sync_standby:显示同步节点的名称,多个同步节点以逗号分隔。

    ●xxx-failover

    
    % kubectl get cm pg142-1013-postgresql-failover -oyaml
    apiVersion: v1
    kind: ConfigMap
    metadata:
      creationTimestamp: "2023-10-16T07:16:03Z"
      labels:
        application: spilo
        cluster-name: pg142-1013-postgresql
      managedFields:
      - apiVersion: v1
        fieldsType: FieldsV1
        fieldsV1:
          f:metadata:
            f:labels:
              .: {}
              f:application: {}
              f:cluster-name: {}
        manager: Patroni
        operation: Update
        time: "2023-10-16T07:36:56Z"
      name: pg142-1013-postgresql-failover
      namespace: default
      resourceVersion: "23290596"
      uid: 72d50c58-bc65-4b77-8870-93d0b8f8b7a2
    上述内容,主要记录最后一次故障转移发生的时间。
    集群状态检测
      if self.is_paused():
          self.watchdog.disable()
          self._was_paused = True
      else:
          if self._was_paused:
              self.state_handler.schedule_sanity_checks_after_pause()
          self._was_paused = False
      
      if not self.cluster.has_member(self.state_handler.name):
          self.touch_member()
      
      # cluster has leader key but not initialize key
      if not (self.cluster.is_unlocked() or self.sysid_valid(self.cluster.initialize)) and self.has_lock():
          self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid)
      
      if not (self.cluster.is_unlocked() or self.cluster.config and self.cluster.config.data) and self.has_lock():
          self.dcs.set_config_value(json.dumps(self.patroni.config.dynamic_configuration, separators=(',', ':')))
          self.cluster = self.dcs.get_cluster()
      
      if self._async_executor.busy:
          return self.handle_long_action_in_progress()
      
      msg = self.handle_starting_instance()
      if msg is not None:
          return msg
      
      # we've got here, so any async action has finished.
      if self.state_handler.bootstrapping:
          return self.post_bootstrap()
      
      if self.recovering:
          self.recovering = False
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
      if not self._rewind.is_needed:
          # Check if we tried to recover from postgres crash and failed
          msg = self.post_recover()
          if msg is not None:
              return msg
    
      # Reset some states after postgres successfully started up
      self._crash_recovery_executed = False
      if self._rewind.executed and not self._rewind.failed:
          self._rewind.reset_state()
    
      # The Raft cluster without a quorum takes a bit of time to stabilize.
      # Therefore we want to postpone the leader race if we just started up.
      if self.cluster.is_unlocked() and self.dcs.__class__.__name__ == 'Raft':
          return 'started as a secondary'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    检测集群是否暂停
    集群暂停,是指集群中的PostgreSQL节点不由Patroni管理,当集群异常时,不再出发故障转移等措施。
    集群暂停一般由用户主动出发,可以用在单个PostgreSQL节点的维护上,触发方式:

    root@pg142-1013-postgresql-0:/home/postgres# patronictl list
    + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+
    | Member                  | Host           | Role         | State   | TL | Lag in MB |
    +-------------------------+----------------+--------------+---------+----+-----------+
    | pg142-1013-postgresql-0 | 10.244.117.143 | Leader       | running |  3 |           |
    | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running |  3 |         0 |
    +-------------------------+----------------+--------------+---------+----+-----------+
    root@pg142-1013-postgresql-0:/home/postgres# patronictl pause
    Success: cluster management is paused
    root@pg142-1013-postgresql-0:/home/postgres# patronictl list
    + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+
    | Member                  | Host           | Role         | State   | TL | Lag in MB |
    +-------------------------+----------------+--------------+---------+----+-----------+
    | pg142-1013-postgresql-0 | 10.244.117.143 | Leader       | running |  3 |           |
    | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running |  3 |         0 |
    +-------------------------+----------------+--------------+---------+----+-----------+
     Maintenance mode: on
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    上述,即表示当前集群已停止。此时,PostgreSQL进程仍然存活,如果故障,将需要用户自行启动。
    集群暂停恢复方式:

    root@pg142-1013-postgresql-0:/home/postgres# patronictl list
    + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+
    | Member                  | Host           | Role         | State   | TL | Lag in MB |
    +-------------------------+----------------+--------------+---------+----+-----------+
    | pg142-1013-postgresql-0 | 10.244.117.143 | Leader       | running |  3 |           |
    | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running |  3 |         0 |
    +-------------------------+----------------+--------------+---------+----+-----------+
     Maintenance mode: on
    root@pg142-1013-postgresql-0:/home/postgres# patronictl resume
    Success: cluster management is resumed
    root@pg142-1013-postgresql-0:/home/postgres# patronictl list
    + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+
    | Member                  | Host           | Role         | State   | TL | Lag in MB |
    +-------------------------+----------------+--------------+---------+----+-----------+
    | pg142-1013-postgresql-0 | 10.244.117.143 | Leader       | running |  3 |           |
    | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running |  3 |         0 |
    +-------------------------+----------------+--------------+---------+----+-----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    通过命令,即可恢复集群。
    在恢复集群后,需要对集群中PostgreSQL节点进行处理:
    1.重新配置PostgreSQL的参数;
    2.根据xxx-sync中最后一次记录的主、同步节点名称信息,在主节点上设置同步复制槽信息;
    3.检测恢复后的PostgreSQL节点的是否变更,与最后一次xxx-config中的值,是否一致,否则将无法恢复集群。
    集群初始化检测

    # cluster has leader key but not initialize key
    if not (self.cluster.is_unlocked() or self.sysid_valid(self.cluster.initialize)) and self.has_lock():
        self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid)
    
    if not (self.cluster.is_unlocked() or self.cluster.config and self.cluster.config.data) and self.has_lock():
        self.dcs.set_config_value(json.dumps(self.patroni.config.dynamic_configuration, separators=(',', ':')))
        self.cluster = self.dcs.get_cluster()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    集群初始化检测,主要检测2个方面的信息:
    ●集群当前存在leader节点,但xxx-config中的不存在,此时,需要将leader节点上PostgreSQL的sysid设置到xxx-config中;
    ●集群当前存在leader节点,但未获取到xxx-config信息,需要将leader节点上的配置信息和sysid都设置到xxx-config中,并重新获取集群信息。
    该步骤的用途是,防止xxx-config文件被删除,导致从节点加载集群信息失败。
    节点状态检测

    检测当前PostgreSQL的进程启动到了什么阶段

    if self._async_executor.busy:
        return self.handle_long_action_in_progress()
    
    • 1
    • 2

    检测启动中的PostgreSQL是否出现异常

    msg = self.handle_starting_instance()
    if msg is not None:
    return msg
    节点状态检测,是通过检测PostgreSQL节点的当前运行状态,来确定是否需要进行具体的操作,节点状态检测的方式可分为2种:
    1.通过PostgreSQL的运行状态确定;
    2.通过异步进程(_async_executor)监听,当前节点处于什么阶段。

    节点检测通过后基础操作

    we've got here, so any async action has finished.
    
    if self.state_handler.bootstrapping:
        return self.post_bootstrap()
    
    if self.recovering:
        self.recovering = False
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    if not self._rewind.is_needed:
        # Check if we tried to recover from postgres crash and failed
        msg = self.post_recover()
        if msg is not None:
            return msg
    
    # Reset some states after postgres successfully started up
    self._crash_recovery_executed = False
    if self._rewind.executed and not self._rewind.failed:
        self._rewind.reset_state()
    
    # The Raft cluster without a quorum takes a bit of time to stabilize.
    # Therefore we want to postpone the leader race if we just started up.
    if self.cluster.is_unlocked() and self.dcs.__class__.__name__ == 'Raft':
        return 'started as a secondary'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    节点状态检测通过后,需要对PostgreSQL进行操作:
    1.PostgreSQL启动后操作

    def post_bootstrap(self):
        with self._async_response:
            result = self._async_response.result
        # bootstrap has failed if postgres is not running
        if not self.state_handler.is_running() or result is False:
            self.cancel_initialization()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    if result is None:
        if not self.state_handler.is_leader():
            return 'waiting for end of recovery after bootstrap'
    
        self.state_handler.set_role('master')
        ret = self._async_executor.try_run_async('post_bootstrap', self.state_handler.bootstrap.post_bootstrap,
                                                 args=(self.patroni.config['bootstrap'], self._async_response))
        return ret or 'running post_bootstrap'
    
    self.state_handler.bootstrapping = False
    if not self.watchdog.activate():
        logger.error('Cancelling bootstrap because watchdog activation failed')
        self.cancel_initialization()
    self._rewind.ensure_checkpoint_after_promote(self.wakeup)
    self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid)
    self.dcs.set_config_value(json.dumps(self.patroni.config.dynamic_configuration, separators=(',', ':')))
    self.dcs.take_leader()
    self.set_is_leader(True)
    self.state_handler.call_nowait(ACTION_ON_START)
    self.load_cluster_from_dcs()
    
    return 'initialized a new cluster'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    上述操作,包括pg_rewind后的checkpoint检测、初始化DCS的xxx-config资源、生成xxx-leader资源、加载集群信息等。
    2.恢复中的PostgreSQL检测是否需要执行pg_rewind
    if self.recovering:
    self.recovering = False

    if not self._rewind.is_needed:
        # Check if we tried to recover from postgres crash and failed
        msg = self.post_recover()
        if msg is not None:
            return msg
    
    # Reset some states after postgres successfully started up
    self._crash_recovery_executed = False
    if self._rewind.executed and not self._rewind.failed:
        self._rewind.reset_state()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    pg_rewind命令用于将从节点的WAL与主节点的WAL拉齐,一般用于从节点WAL因异常后滞后于主节点WAL。
    启动PostgreSQL

    is data directory empty?
    
    if self.state_handler.data_directory_empty():
        self.state_handler.set_role('uninitialized')
        self.state_handler.stop('immediate', stop_timeout=self.patroni.config['retry_timeout'])
        # In case datadir went away while we were master.
        self.watchdog.disable()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # is this instance the leader?
    if self.has_lock():
        self.release_leader_key_voluntarily()
        return 'released leader key voluntarily as data dir empty and currently leader'
    
    if self.is_paused():
        return 'running with empty data directory'
    return self.bootstrap()  # new node
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    else:
        # check if we are allowed to join
        data_sysid = self.state_handler.sysid
        if not self.sysid_valid(data_sysid):
            # data directory is not empty, but no valid sysid, cluster must be broken, suggest reinit
            return ("data dir for the cluster is not empty, "
                    "but system ID is invalid; consider doing reinitialize")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    if self.sysid_valid(self.cluster.initialize):
        if self.cluster.initialize != data_sysid:
            if self.is_paused():
                logger.warning('system ID has changed while in paused mode. Patroni will exit when resuming'
                               ' unless system ID is reset: %s != %s', self.cluster.initialize, data_sysid)
                if self.has_lock():
                    self.release_leader_key_voluntarily()
                    return 'released leader key voluntarily due to the system ID mismatch'
            else:
                logger.fatal('system ID mismatch, node %s belongs to a different cluster: %s != %s',
                             self.state_handler.name, self.cluster.initialize, data_sysid)
                sys.exit(1)
    elif self.cluster.is_unlocked() and not self.is_paused():
        # "bootstrap", but data directory is not empty
        if not self.state_handler.cb_called and self.state_handler.is_running() \
                and not self.state_handler.is_leader():
            self._join_aborted = True
            logger.error('No initialize key in DCS and PostgreSQL is running as replica, aborting start')
            logger.error('Please first start Patroni on the node running as master')
            sys.exit(1)
        self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=data_sysid)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    无数据目录启动
    无数据目录启动,是指在执行初始化目录异常、恢复节点异常、WAL拉齐异常等场景下,会触发的流程:
    1.设置角色,用于后续重新初始化集群;
    2.立即停止当前PostgreSQL进程;
    3.判断当前节点是否为主节点,主动释放主节点锁;
    4.执行启动操作。

    def bootstrap(self):
      if not self.cluster.is_unlocked():  # cluster already has leader
          clone_member = self.cluster.get_clone_member(self.state_handler.name)
          member_role = 'leader' if clone_member == self.cluster.leader else 'replica'
          msg = "from {0} '{1}'".format(member_role, clone_member.name)
          ret = self._async_executor.try_run_async('bootstrap {0}'.format(msg), self.clone, args=(clone_member, msg))
          return ret or 'trying to bootstrap {0}'.format(msg)
      
      # no initialize key and node is allowed to be master and has 'bootstrap' section in a configuration file
      elif self.cluster.initialize is None and not self.patroni.nofailover and 'bootstrap' in self.patroni.config:
          if self.dcs.initialize(create_new=True):  # race for initialization
              self.state_handler.bootstrapping = True
              with self._async_response:
                  self._async_response.reset()
      
              if self.is_standby_cluster():
                  ret = self._async_executor.try_run_async('bootstrap_standby_leader', self.bootstrap_standby_leader)
                  return ret or 'trying to bootstrap a new standby leader'
              else:
                  ret = self._async_executor.try_run_async('bootstrap', self.state_handler.bootstrap.bootstrap,
                                                           args=(self.patroni.config['bootstrap'],))
                  return ret or 'trying to bootstrap a new cluster'
          else:
              return 'failed to acquire initialize lock'
      else:
          create_replica_methods = self.get_standby_cluster_config().get('create_replica_methods', []) \
                                   if self.is_standby_cluster() else None
          if self.state_handler.can_create_replica_without_replication_connection(create_replica_methods):
              msg = 'bootstrap (without leader)'
              return self._async_executor.try_run_async(msg, self.clone) or 'trying to ' + msg
          return 'waiting for {0}leader to bootstrap'.format('standby_' if self.is_standby_cluster() else '')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    上述代码,表示启动的几种方式:
    1.当前集群已有leader节点,当前PostgreSQL将以从节点从主节点上同步数据启动;
    2.当前集群没有leader节点,当前PostgreSQL将以主节点启动,如果是备用集群,将以备用集群主节点启动;
    3.当前集群为备用集群且没有主节点,从节点通过方式,一般通过协议流方式从主集群上进行数据同步。
    有数据目录启动
    有数据目录启动,主要校验集群ID与PostgreSQL节点sysid的一致性,触发的主要流程:
    1.校验PostgreSQL节点sysid是否有效,如果无效,表示PostgreSQL出现了异常需要重启;
    2.校验校验集群ID与PostgreSQL节点sysid是否一致,不一致将无法加入集群,如果集群已暂停,将会释放leader锁占用;
    3.检验集群没有leader节点,当前节点将重新初始化集群,将sysid作为新的集群ID启动。
    生成PostgreSQL集群

    try:
        if self.cluster.is_unlocked():
            ret = self.process_unhealthy_cluster()
        else:
            msg = self.process_healthy_cluster()
            ret = self.evaluate_scheduled_restart() or msg
    finally:
        # we might not have a valid PostgreSQL connection here if another thread
        # stops PostgreSQL, therefore, we only reload replication slots if no
        # asynchronous processes are running (should be always the case for the master)
        if not self._async_executor.busy and not self.state_handler.is_starting():
            create_slots = self.state_handler.slots_handler.sync_replication_slots(self.cluster,
                                                                                   self.patroni.nofailover)
            if not self.state_handler.cb_called:
                if not self.state_handler.is_leader():
                    self._rewind.trigger_check_diverged_lsn()
                self.state_handler.call_nowait(ACTION_ON_START)
            if create_slots and self.cluster.leader:
                err = self._async_executor.try_run_async('copy_logical_slots',
                                                         self.state_handler.slots_handler.copy_logical_slots,
                                                         args=(self.cluster.leader, create_slots))
                if not err:
                    ret = 'Copying logical slots {0} from the primary'.format(create_slots)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    生成PostgreSQL集群,主要根据当前集群是否存在主节点,判断走健康的集群流程还是非健康的集群流程。
    非健康的集群流程

    def process_unhealthy_cluster(self):
      """Cluster has no leader key"""
    
      if self.is_healthiest_node():
          if self.acquire_lock():
              failover = self.cluster.failover
              if failover:
                  if self.is_paused() and failover.leader and failover.candidate:
                      logger.info('Updating failover key after acquiring leader lock...')
                      self.dcs.manual_failover('', failover.candidate, failover.scheduled_at, failover.index)
                  else:
                      logger.info('Cleaning up failover key after acquiring leader lock...')
                      self.dcs.manual_failover('', '')
              self.load_cluster_from_dcs()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
          if self.is_standby_cluster():
              # standby leader disappeared, and this is the healthiest
              # replica, so it should become a new standby leader.
              # This implies we need to start following a remote master
              msg = 'promoted self to a standby leader by acquiring session lock'
              return self.enforce_follow_remote_master(msg)
          else:
              return self.enforce_master_role(
                  'acquired session lock as a leader',
                  'promoted self to leader by acquiring session lock'
              )
      else:
          return self.follow('demoted self after trying and failing to obtain lock',
                             'following new leader after trying and failing to obtain lock')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
      else:
          # when we are doing manual failover there is no guaranty that new leader is ahead of any other node
          # node tagged as nofailover can be ahead of the new leader either, but it is always excluded from elections
          if bool(self.cluster.failover) or self.patroni.nofailover:
              self._rewind.trigger_check_diverged_lsn()
              time.sleep(2)  # Give a time to somebody to take the leader lock
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
      if self.patroni.nofailover:
          return self.follow('demoting self because I am not allowed to become master',
                             'following a different leader because I am not allowed to promote')
      return self.follow('demoting self because i am not the healthiest node',
                         'following a different leader because i am not the healthiest node')
    
    • 1
    • 2
    • 3
    • 4
    • 5

    非健康的集群流程,是确定leader节点的候选,首要条件必须找到一个健康的节点,如何判断健康的节点,主要有以下几个条件:
    1.PostgreSQL集群状态非暂停;
    2.PostgreSQL节点状态非启动中;
    3.PostgreSQL节点允许故障转移;
    4.PostgreSQL节点WAL与集群缓存中的(最后一次主节点同步的lsn值)的滞后量在允许的范围内。

     def is_healthiest_node(self):
            if time.time() - self._released_leader_key_timestamp < self.dcs.ttl:
                logger.info('backoff: skip leader race after pre_promote script failure and releasing the lock voluntarily')
                return False
    
    • 1
    • 2
    • 3
    • 4
        if self.is_paused() and not self.patroni.nofailover and \
                self.cluster.failover and not self.cluster.failover.scheduled_at:
            ret = self.manual_failover_process_no_leader()
            if ret is not None:  # continue if we just deleted the stale failover key as a master
                return ret
    
        if self.state_handler.is_starting():  # postgresql still starting up is unhealthy
            return False
    
        if self.state_handler.is_leader():
            # in pause leader is the healthiest only when no initialize or sysid matches with initialize!
            return not self.is_paused() or not self.cluster.initialize\
                    or self.state_handler.sysid == self.cluster.initialize
    
        if self.is_paused():
            return False
    
        if self.patroni.nofailover:  # nofailover tag makes node always unhealthy
            return False
    
        if self.cluster.failover:
            # When doing a switchover in synchronous mode only synchronous nodes and former leader are allowed to race
            if self.is_synchronous_mode() and self.cluster.failover.leader and \
                    self.cluster.failover.candidate and not self.cluster.sync.matches(self.state_handler.name):
                return False
            return self.manual_failover_process_no_leader()
    
        if not self.watchdog.is_healthy:
            logger.warning('Watchdog device is not usable')
            return False
    
        # When in sync mode, only last known master and sync standby are allowed to promote automatically.
        all_known_members = self.cluster.members + self.old_cluster.members
        if self.is_synchronous_mode() and self.cluster.sync and self.cluster.sync.leader:
            if not self.cluster.sync.matches(self.state_handler.name):
                return False
            # pick between synchronous candidates so we minimize unnecessary failovers/demotions
            members = {m.name: m for m in all_known_members if self.cluster.sync.matches(m.name)}
        else:
            # run usual health check
            members = {m.name: m for m in all_known_members}
    
        return self._is_healthiest_node(members.values())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

     def _is_healthiest_node(self, members, check_replication_lag=True):
            """This method tries to determine whether I am healthy enough to became a new leader candidate or not."""
    
    • 1
    • 2
        my_wal_position = self.state_handler.last_operation()
        if check_replication_lag and self.is_lagging(my_wal_position):
            logger.info('My wal position exceeds maximum replication lag')
            return False  # Too far behind last reported wal position on master
    
        if not self.is_standby_cluster() and self.check_timeline():
            cluster_timeline = self.cluster.timeline
            my_timeline = self.state_handler.replica_cached_timeline(cluster_timeline)
            if my_timeline < cluster_timeline:
                logger.info('My timeline %s is behind last known cluster timeline %s', my_timeline, cluster_timeline)
                return False
    
        # Prepare list of nodes to run check against
        members = [m for m in members if m.name != self.state_handler.name and not m.nofailover and m.api_url]
    
        if members:
            for st in self.fetch_nodes_statuses(members):
                if st.failover_limitation() is None:
                    if not st.in_recovery:
                        logger.warning('Master (%s) is still alive', st.member.name)
                        return False
                    if my_wal_position < st.wal_position:
                        logger.info('Wal position of %s is ahead of my wal position', st.member.name)
                        # In synchronous mode the former leader might be still accessible and even be ahead of us.
                        # We should not disqualify himself from the leader race in such a situation.
                        if not self.is_synchronous_mode() or st.member.name != self.cluster.sync.leader:
                            return False
                        logger.info('Ignoring the former leader being ahead of us')
        return True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    当前节点为健康节点,因当前集群没有主节点,需要执行leader锁抢占。如果当前节点抢占leader锁失败,将作为从节点加入到集群中。
    当前节点为异常节点,则会一直等待PostgreSQL节点正常后,参与集群的选举行为。
    健康的集群流程

    def process_healthy_cluster(self):
      if self.has_lock():
          if self.is_paused() and not self.state_handler.is_leader():
              if self.cluster.failover and self.cluster.failover.candidate == self.state_handler.name:
                  return 'waiting to become master after promote...'
    
    • 1
    • 2
    • 3
    • 4
    • 5
          if not self.is_standby_cluster():
              self._delete_leader()
              return 'removed leader lock because postgres is not running as master'
    
      if self.update_lock(True):
          msg = self.process_manual_failover_from_leader()
          if msg is not None:
              return msg
    
          # check if the node is ready to be used by pg_rewind
          self._rewind.ensure_checkpoint_after_promote(self.wakeup)
    
          if self.is_standby_cluster():
              # in case of standby cluster we don't really need to
              # enforce anything, since the leader is not a master.
              # So just remind the role.
              msg = 'no action. I am ({0}), the standby leader with the lock'.format(self.state_handler.name) \
                    if self.state_handler.role == 'standby_leader' else \
                    'promoted self to a standby leader because i had the session lock'
              return self.enforce_follow_remote_master(msg)
          else:
              return self.enforce_master_role(
                  'no action. I am ({0}), the leader with the lock'.format(self.state_handler.name),
                  'promoted self to leader because I had the session lock'
              )
      else:
          # Either there is no connection to DCS or someone else acquired the lock
          logger.error('failed to update leader lock')
          if self.state_handler.is_leader():
              if self.is_paused():
                  return 'continue to run as master after failing to update leader lock in DCS'
              self.demote('immediate-nolock')
              return 'demoted self because failed to update leader lock in DCS'
          else:
              return 'not promoting because failed to update leader lock in DCS'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    else:
          logger.debug('does not have lock')
      lock_owner = self.cluster.leader and self.cluster.leader.name
      if self.is_standby_cluster():
          return self.follow('cannot be a real primary in a standby cluster',
                             'no action. I am ({0}), a secondary, and following a standby leader ({1})'.format(
                                  self.state_handler.name, lock_owner), refresh=False)
      return self.follow('demoting self because I do not have the lock and I was a leader',
                         'no action. I am ({0}), a secondary, and following a leader ({1})'.format(
                              self.state_handler.name, lock_owner), refresh=False)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    健康的集群流程,是指当前的集群存在leader节点,对该流程的处理,主要有2个方向:
    1.检测当前节点为主节点,进行更新leader锁操作,保持主节点心跳,避免从节点竞争锁,如果更新锁失败,将立即释放锁,让其他从节点抢占;
    2.检测当前节点非主节点,作为从节点加入集群。

    总结
    综上所述,Patroni 是一个用于管理 PostgreSQL 数据库集群的高可用性(HA)管理工具,旨在确保数据库系统的连续可用性,以应对节点故障和维护操作等挑战。Patroni 提供了一系列关键功能和特点,使得它成为强大的高可用性解决方案。
    总之,在很多场景中,Patroni能够保持PostgreSQL集群友好的运行,保证在集群异常的情况下,通过自动故障转移、数据同步和备份策略等功能,确保数据库集群的稳定性和可用性,使得应用程序能够持续访问数据,即使在节点故障或维护时也不会中断服务。

    参考资源
    Patroni配置参数https://patroni.readthedocs.io/en/latest/patroni_configuration.html
    Patroni基于2.1.5分支源码https://github.com/zalando/patroni/tree/v2.1.5

  • 相关阅读:
    windbg的时间旅行实现对 C# 程序的终极调试!
    历年行政区划码成品下载,欢迎白瞟,拿走不谢
    虚幻引擎 UE5 增强输入系统
    2023年9月随笔之摩托车驾考
    R 椭圆随机点产生并画图
    [Unity]将所有 TGA、TIFF、PSD 和 BMP(可自定义)纹理转换为 PNG,以减小项目大小,而不会在 Unity 中造成任何质量损失
    拍照小白入坑
    形态学 - 击中-击不中变换
    redis中的缓存穿透问题
    Mongodb的基本操作
  • 原文地址:https://blog.csdn.net/HarmonyCloud_/article/details/134077654