• 【服务的主从切换实现原理】


    主从架构介绍

    主从服务架构是一种常见的分布式系统设计模式,常用于提高系统的性能、可用性和扩展性。在这种架构中,系统中的节点被分为两类:主节点(Master)和从节点(Slave)。
    在这里插入图片描述

    zookeeper

    zookeeper有以下几个特点:
    1.集群部署:一般是3~5台机器组成一个集群,每台机器都在内存保存了zk的全部数据,机器之间互相通信同步数据,客户端连接任何一台机器都可以。
    2.顺序一致性:所有的写请求都是有序的;集群中只有leader机器可以写,所有机器都可以读,所有写请求都会分配一个zk集群全局的唯一递增编号:zxid,用来保证各种客户端发起的写请求都是有顺序的。
    3.原子性:要么全部机器成功,要么全部机器都不成功。
    4.数据一致性:无论客户端连接到哪台节点,读取到的数据都是一致的;leader收到了写请求之后都会同步给其他机器,保证数据的强一致,你连接到任何一台zk机器看到的数据都是一致的。
    5.高可用:如果某台机器宕机,会保证数据不丢失。集群中挂掉不超过一半的机器,都能保证集群可用。比如3台机器可以挂1台,5台机器可以挂2台。
    6.实时性:一旦数据发生变更,其他节点会实时感知到。
    7.高性能:每台zk机器都在内存维护数据,所以zk集群绝对是高并发高性能的,如果将zk部署在高配置物理机上,一个3台机器的zk集群抗下每秒几万请求是没有问题的。
    8.高并发:高性能决定的,主要是基于纯内存数据结构来处理,并发能力是很高的,只有一台机器进行写,但是高配置的物理机,比如16核32G,可以支撑几万的写入QPS。所有机器都可以读,选用3台高配机器的话,可以支撑十万+的QPS。

    利用ZK实现主从架构

    导入依赖

    <properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    		<zookeeper.version>3.4.14</zookeeper.version>
    		<curator-framework.version>2.12.0</curator-framework.version>
    		<curator-recipes.version>2.12.0</curator-recipes.version>
    		<ssdb.version>9.4</ssdb.version>
    		<jodatime.version>2.10</jodatime.version>
    		<binlog.version>0.21.0</binlog.version>
    		<disruptor.version>3.4.2</disruptor.version>
    	</properties>
    
    <dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-web</artifactId>
    		<exclusions>
    			<!-- 去除旧log依赖 -->
    			<exclusion>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-starter-logging</artifactId>
    			</exclusion>
    		</exclusions>
    	</dependency>
    
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-quartz</artifactId>
    	</dependency>
    
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-test</artifactId>
    		<scope>test</scope>
    	</dependency>
    
    	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j2 -->
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-log4j2</artifactId>
    	</dependency>
    
    	<!-- log4j2异步日志需要加载disruptor-3.0.0.jar或者更高的版本 -->
    	<dependency>
    		<groupId>com.lmax</groupId>
    		<artifactId>disruptor</artifactId>
    		<version>3.4.2</version>
    	</dependency>
    
    	<!-- zookeeper -->
    	<dependency>
    		<groupId>org.apache.zookeeper</groupId>
    		<artifactId>zookeeper</artifactId>
    		<version>${zookeeper.version}</version>
    	</dependency>
    	<dependency>
    		<groupId>org.apache.curator</groupId>
    		<artifactId>curator-framework</artifactId>
    		<version>${curator-framework.version}</version>
    	</dependency>
    	<dependency>
    		<groupId>org.apache.curator</groupId>
    		<artifactId>curator-recipes</artifactId>
    		<version>${curator-recipes.version}</version>
    	</dependency>
    

    zk客户端:

    public class ZookeeperClient {
    
        /**
         * 客户端
         */
        private CuratorFramework client;
    
        /**
         * Leader选举
         */
        private LeaderLatch leader;
    
        public ZookeeperClient(LeaderLatch leader,CuratorFramework client){
            this.client = client;
            this.leader = leader;
        }
    
        /**
         * 启动客户端
         * @throws Exception
         */
        public void startZKClient() throws Exception {
            client.start();
            leader.start();
        }
    
        /**
         * 关闭客户端
         * @throws Exception
         */
        public void closeZKClient() throws Exception {
            leader.close();
            client.close();
        }
    
        /**
         * 判断是否变为领导者
         * @return
         */
        public boolean hasLeadership(){
            return leader.hasLeadership();
        }
    
        public CuratorFramework getClient() {
            return client;
        }
    
        public void setClient(CuratorFramework client) {
            this.client = client;
        }
    
        public LeaderLatch getLeader() {
            return leader;
        }
    
        public void setLeader(LeaderLatch leader) {
            this.leader = leader;
        }
    
    
    public class ZookeeperClientInfo {
    
        /**
         * 是否是leader 默认为false
         */
        public static boolean isLeader = false;
    
        /**
         * 客户端ID
         */
        private String id;
    
        /**
         * 连接信息字符串
         */
        private String connectString;
    
        /**
         * 节点路径
         */
        private String path;
    
        /**
         * 连接超时时间
         */
        private Integer connectTimeOut;
    
        /**
         * 最大连接次数
         */
        private Integer maxRetries;
    
        /**
         * 重连休眠时间
         */
        private Integer retrySleepTime;
    
        public static boolean isLeader() {
            return isLeader;
        }
    
        public static void setIsLeader(boolean isLeader) {
            ZookeeperClientInfo.isLeader = isLeader;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getConnectString() {
            return connectString == null ? null : connectString.replaceAll("//s+", "");
        }
    
        public void setConnectString(String connectString) {
            this.connectString = connectString;
        }
    
        public String getPath() {
            return path;
        }
    
        public void setPath(String path) {
            this.path = path;
        }
    
        public Integer getConnectTimeOut() {
            return connectTimeOut;
        }
    
        public void setConnectTimeOut(Integer connectTimeOut) {
            this.connectTimeOut = connectTimeOut;
        }
    
        public Integer getMaxRetries() {
            return maxRetries;
        }
    
        public void setMaxRetries(Integer maxRetries) {
            this.maxRetries = maxRetries;
        }
    
        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("ZookeeperClientInfo{ ").append("id=").append(id)
                    .append(",isLeader=").append(isLeader).append(", connectString=").append(connectString)
                    .append(", path=").append(path).append(",connectTimeOut=").append(connectTimeOut)
                    .append(", maxRetries=").append(maxRetries).append(", retrySleepTime=").append(retrySleepTime).append(" }");
            return sb.toString();
        }
    }
    

    监听器,来监听主节点变更,实现主要是继承LeaderLatchListener类

    @Component
    public class ZookeeperClientListener implements LeaderLatchListener {
    
        private final static Logger log = LoggerFactory.getLogger(ZookeeperClientListener.class);
    
        @Autowired
        private ChangeLeaderService changeLeaderService;
    
        /**
         *  将本服务达成jar包,部署到2台服务器上。启动两个服务。
         *  1、第一台服务(机器1)启动后抢到leader,会进入到该方法中。另外一台服务(机器2)会进入到notLeader()中。
         *  2、当机器1宕机后,连接断开后zookeeper会删除临时节点。机器2根据选举会成为leader,成为leader后会进入到isLeader()中
         *     然后在changeLeaderService.taskExecut() 再次将定时任务做补偿处理。
         */
        @Override
        public void isLeader() {
            log.error("{},当前服务已变为leader,将从事业务消费======>>>>", JodaDateUtil.date2String(new Date()));
            ZookeeperClientInfo.isLeader = true;
    
            // 切换机器后,继续执行上一个机器未完成的定时任务。
            changeLeaderService.taskExecut();
        }
    
        @Override
        public void notLeader() {
            log.error("{},当前服务已退出leader,不再从事消费业务=====>>>", JodaDateUtil.date2String(new Date()));
            ZookeeperClientInfo.isLeader = false;
        }
    }
    

    当服务主从切换时的补偿措施:

    public interface ChangeLeaderService {
    
        /**
         * 主从服务切换时,手动触发分配到leader机器上的定时任务中的业务逻辑
         */
        void taskExecut();
    }
    
    @Service
    public class ChangeLeaderServiceImpl implements ChangeLeaderService {
    
        private final static Logger log = LoggerFactory.getLogger(ChangeLeaderServiceImpl.class);
    ;
        @Autowired
        private SSDB ssdb;
    
        @Override
        public void taskExecut() {
            //TODO 从ssdb 中查询出定时任务的标识,是否正常执行完成,未完成的话,在这里再触发执行。
            log.info("===ChangeLeaderServiceImpl===taskExecut()===");
            /*Response response = ssdb.get(Constant.TEST_STATE);
            if (response.ok() && response.datas.size() > 0) {
                int tenantState = byteArrayToInt(response.datas.get(0));
                if (Constant.TASK_EXECUTING == tenantState) { //当前任务未完成,接着完成
                    // service.do(); 伪代码
                    log.info("service.do();");
                    ssdb.set(Constant.TEST_STATE, Constant.TASK_END);
                }
            }*/
        }
    
        /**
         * byte数组转int
         * @param b
         * @return
         */
        private int byteArrayToInt(byte[] b){
            String str = byteArrayToString(b);
            return StringToInt(str);
        }
    
        /**
         * byte数组转string
         * @param b
         * @return
         */
        private String byteArrayToString(byte[] b) {
            if (null == b || b.length == 0) {
                return "";
            }
            return new String(b);
        }
    
        /**
         * string转int
         * @param str
         * @return
         */
        private int StringToInt(String str){
            if (StringUtils.isEmpty(str)){
                return 0;
            }
            return Integer.parseInt(str);
        }
    }
    

    zk的配置信息

    @Component
    public class ZookeeperConfig {
    
        /**
         * zk 地址
         */
        @Value("${spring.slaveof.zk.addr}")
        private String addr;
    
        /**
         * 重试策略----最大重试次数
         */
        @Value("${spring.slaveof.zk.max}")
        private int max;
    
        /**
         * 重试策略-----sleepTime
         */
        @Value("${spring.slaveof.zk.sleep}")
        private int sleepTime;
    
        /**
         * 连接超时时间
         */
        @Value("${spring.slaveof.zk.connection}")
        private int connectionTime;
    
        /**
         * 会话超时时间
         */
        @Value("${spring.slaveof.zk.session}")
        private int sessionTime;
    
        public String getAddr() {
            return addr;
        }
    
        public int getMax() {
            return max;
        }
    
        public int getSleepTime() {
            return sleepTime;
        }
    
        public int getConnectionTime() {
            return connectionTime;
        }
    
        public int getSessionTime() {
            return sessionTime;
        }
    }
    

    服务启动限制性的类,了解ApplicationRunner,SpringBoot项目启动时,若想在启动之后直接执行某一段代码,就可以用 ApplicationRunner这个接口,并实现接口里面的run(ApplicationArguments args)方法,方法中写上自己的代码逻辑。也就是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。

    @Component
    public class ZkDemoApplicationRunner implements ApplicationRunner {
    
        private final static Logger log = LoggerFactory.getLogger(ZkDemoApplicationRunner.class);
    
        @Autowired
        private ZookeeperClientListener zkClientListener;
    
        @Autowired
        private ZookeeperConfig zookeeperConfig;
    
        @Override
        public void run(ApplicationArguments applicationArguments) throws Exception {
            log.info("====================>>>>>>>>启动执行zk>>>>>>>>==================");
            log.error("===>>>>>>>>zookeeper: addr:{}, sleepTime:{}, max:{}, connectionTime:{}=====", zookeeperConfig.getAddr(), zookeeperConfig.getSleepTime(), zookeeperConfig.getMax(), zookeeperConfig.getConnectionTime());
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(zookeeperConfig.getAddr())
                    .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getSleepTime(), zookeeperConfig.getMax()))
                    .connectionTimeoutMs(zookeeperConfig.getConnectionTime()).build();
            LeaderLatch leaderLatch = new LeaderLatch(client, "/diaoliwei", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);
            if (zkClientListener == null) {
                log.error("==================>>>>>>>>>>>>>>>>zkClientListener is null=====>>>>>>>>>>>");
            }
            leaderLatch.addListener(zkClientListener);
            ZookeeperClient zkClient = new ZookeeperClient(leaderLatch, client);
            try {
                zkClient.startZKClient();
            } catch (Exception e) {
                log.error("======>>>>>>zk客户端连接失败<<<<<=====error:{}===", e);
                return;
            }
            CuratorFrameworkState state = client.getState();
            if (CuratorFrameworkState.STOPPED == state) {
                log.error("zk客户端已关闭");
                return;
            }
    
            /*while (true) {  // 测试日志用
                try {
                    if(!zkClient.hasLeadership()){
                        log.info("2当前服务不是leader");
                        Thread.sleep(2000);
                        log.error("error:::::::Test02 do it...>>>>>>> ");
                        continue;
                    }  else {
                        log.info("2当前服务是leader");
                    }
                    log.info("Test02 do it... ");
                    log.error("Test02 do it...>>>>>>> ");
                } catch (Exception e) {
                    log.error("Exception=====>>>>>>>>>>>>eeee:", e);
                }
            }*/
    
            //log.info("======>>>>>zk客户端连接成功<<<<<<=======");
        }
    }
    
    
  • 相关阅读:
    【web前端期末大作业】html网上在线书城大学生静态网页 大学生html当当书城仿站 网上书城购物网页作业HTML
    Maven IntelliJ
    CAS真的无锁吗
    FFmpeg进阶: 截取视频生成gif动图
    Go语言数据类型-函数
    颜值即正义,献礼就业季,打造多颜色多字体双飞翼布局技术简历模版(Resume)
    Horovod 实战练习(含源码和详细配置)
    java中对象的引用是什么?
    如何通过 Chainlink Price Feeds获得加密资产的历史价格
    springboot整合redis集群
  • 原文地址:https://blog.csdn.net/weixin_47068446/article/details/139510156