• 手写RPC框架-注册中心实现


    使用到的中间件

    1. redis:作为服务的储存体
    2. redisTemplate:连接redis
    3. spring的两个监听事件,容器刷新和容器关闭事件

    注册中心的主要方法

    public interface IRegister {
    
        /**
         * 注册-把服务注册到注册中心中,使得本身的服务能被发现、被消费。并发布一个事件,让其他服务第一时间拉取到服务
         */
        void doRegister();
    
        /**
         * 刷新服务-把注册中心的服务信息拉渠道本地
         */
        void refreshServer();
    
        /**
         * 获取可用服务-在消费者消费的时候,获取其中一个服务
         */
        String getServer(String serverName);
    
        /**
         * 注销-当消费者关闭的时候,把本服务的信息在注册中心删除。并发布一个事件,让其他服务第一时间删除服务
         */
        void unRegister();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    Redis注册中心的实现

    redis注册中心维护一个Map,通过服务名进行分组,保存多个服务。

    public class RedisRegister implements IRegister{
    	/**
         * 负载均衡实现的算法
         */
        private final Load load;
    
        /**
         * 当前服务的信息
         */
        private final ServerInfo serverInfo;
    
        /**
         * 服务储存体
         */
        private Map<String, HashSet<String>> server = new HashMap();
        
    	private final RedisTemplate redisTemplate;
    
        @Override
        public void doRegister() {
            redisTemplate.afterPropertiesSet();
            ValueOperations valueOperations = redisTemplate.opsForValue();
            String key = SystemConstant.REDIS_SERVICE_NAME_PREFIX + serverInfo.getServerName() + "_" +serverInfo.getLocalIp() + ":" + serverInfo.getLocalPort();
            valueOperations.set(key, serverInfo.getLocalIp() + ":" + serverInfo.getLocalPort());
            redisTemplate.expire(key, SystemConstant.REDIS_REGISTER_TIME_OUT, TimeUnit.MILLISECONDS);
            // 发布一个新注册事件
            redisTemplate.convertAndSend(SystemConstant.REDIS_SERVICE_NAME_PREFIX, "register".toCharArray());
        }
    
        @Override
        public void refreshServer() {
            Map<String, HashSet<String>> newServer = new HashMap();
            Set keys = redisTemplate.keys(SystemConstant.REDIS_SERVICE_NAME_PREFIX + "*");
            if (!CollectionUtils.isEmpty(keys)) {
                ValueOperations valueOperations = redisTemplate.opsForValue();
                for (Object key : keys) {
                    String value = (String) valueOperations.get(key);
                    String strKey = (String) key;
                    strKey = strKey.split("_")[0];
                    strKey = strKey.replace(SystemConstant.REDIS_SERVICE_NAME_PREFIX, "");
                    if (newServer.containsKey(strKey)) {
                        newServer.get(strKey).add(value);
                    } else {
                        HashSet<String> set = new HashSet<>();
                        set.add(value);
                        newServer.put(strKey, set);
                    }
                }
            }
            server = newServer;
        }
    
        @Override
        public String getServer(String serverName) {
            HashSet<String> strings = server.get(serverName);
            if (CollectionUtils.isEmpty(strings)) {
                throw new RpcException(serverName + "不存在提供者, 请检查提供者启动或者配置是否正确。");
            }
            // 通过负载均衡组件获取一个服务,
            return this.load.load(new ArrayList<>(strings));
        }
    
        @Override
        public void unRegister() {
            String key = SystemConstant.REDIS_SERVICE_NAME_PREFIX + serverInfo.getServerName() + "_" +serverInfo.getLocalIp() + ":" + serverInfo.getLocalPort();
            redisTemplate.delete(key);
            // 发布一个事件
            redisTemplate.convertAndSend(SystemConstant.REDIS_SERVICE_NAME_PREFIX, "unregister".toCharArray());
        }
    
        public RedisRegister(RedisTemplate redisTemplate, ServerInfo serverInfo, Load load) {
            this.redisTemplate = redisTemplate;
            this.serverInfo = serverInfo;
            this.load = load;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    主要流程

    流程图

    ![在这里插入图片描述](https://img-blog.csdnimg.cn/917e8f136b574d4099ad9df1d82b3dbe.png

    1. 初始化一些必要的组件

        /**
         * redis连接
         * @param wbProperties
         * @return
         */
        @ConditionalOnProperty(
                prefix = "wb",
                name="register-type",
                havingValue = "redis"
        )
        @Bean
        public JedisConnectionFactory jedisConnectionFactory(WbProperties wbProperties) {
            WbProperties.RedisRegister redisRegister = wbProperties.getRedisRegister();
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(redisRegister.getHost(), redisRegister.getPort());
            if (StringUtils.isNotBlank(redisRegister.getPassword())) {
                config.setPassword(redisRegister.getPassword());
            }
    
            return new JedisConnectionFactory(config);
        }
    
        /**
         * 注册器
         * @param environment
         *   环境变量
         * @param jedisConnectionFactory
         *  redis连接池
         * @param load
         *  负载均衡
         * @return
         * @throws UnknownHostException
         */
        @ConditionalOnBean(JedisConnectionFactory.class)
        @Bean
        public RedisRegister redisRegister(Environment environment, JedisConnectionFactory jedisConnectionFactory, Load load) throws UnknownHostException {
            // 创建JSON序列化工具
            GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
            RedisTemplate redisTemplate = new RedisTemplate();
            // 设置key的序列化
            redisTemplate.setKeySerializer(RedisSerializer.string());
            redisTemplate.setHashKeySerializer(RedisSerializer.string());
            // 设置value的序列化
            redisTemplate.setValueSerializer(jsonRedisSerializer);
            redisTemplate.setHashValueSerializer(jsonRedisSerializer);
            redisTemplate.setConnectionFactory(jedisConnectionFactory);
            // 解析处本服务的基本信息
            ServerInfo serverInfo = new ServerInfo(environment);
            return new RedisRegister(redisTemplate, serverInfo, load);
        }
    
        /**
         * redis监听器
         * @param redisRegister
         *  redis注册中心
         * @param jedisConnectionFactory
         *  redis连接池
         * @return
         */
        @ConditionalOnBean(JedisConnectionFactory.class)
        @Bean
        RedisMessageListenerContainer container(RedisRegister redisRegister, JedisConnectionFactory jedisConnectionFactory) {
            MessageListenerAdapter redisSubscriber = new RedisSubscriber(redisRegister);
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(jedisConnectionFactory);
            container.addMessageListener(redisSubscriber, new PatternTopic("wbrpc"));
            return container;
        }
    
        /**
         * 服务启动后的监听器
         * @return
         */
        @Bean
        public WbRegisterListener wbRegisterListener() {
            return new WbRegisterListener();
        }
    
        /**
         * 服务停止的监听器
         * @return
         */
        @Bean
        public WbUnRegisterListener wbUnRegisterListener() {
            return new WbUnRegisterListener();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    2. 注册本服务到注册中心

    通过Spring的默认监听器(ContextRefreshedEvent)实现,当容器刷新的时候将服务注册到注册中心

    public class WbRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
        @Autowired
        private IRegister register;
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            // 注册服务到注册中心 过期时间为60秒
            register.doRegister();
            // 每隔30秒重新注册一次
            repeatRegister();
            // 每隔15秒拉取一次注册中中心的服务到本地
            repeatRefresh();
        }
    
        /**
         * 每隔过期时间的一半时间,去注册一次
         */
        public void repeatRegister() {
            ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
            scheduledExecutorService.scheduleWithFixedDelay(() ->{
                try {
                    register.doRegister();
                } catch (Throwable t) {
                    System.err.println("发生意外异常, 原因:  " + t.getMessage());
                }
            }, SystemConstant.REDIS_REGISTER_TIME_OUT / 2, SystemConstant.REDIS_REGISTER_TIME_OUT / 2, TimeUnit.MILLISECONDS);
        }
    
        /**
         * 每隔过期时间的四分之一时间去拉取注册中心的服务保存到本地
         */
        public void repeatRefresh() {
            //
            ScheduledExecutorService rederScheduledExecutorService = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
            rederScheduledExecutorService.scheduleWithFixedDelay(() ->{
                try {
                    register.refreshServer();
                } catch (Throwable t) {
                    System.err.println("发生意外异常, 原因:  " + t.getMessage());
                }
            }, SystemConstant.REDIS_REGISTER_TIME_OUT / 4, SystemConstant.REDIS_REGISTER_TIME_OUT / 4, TimeUnit.MILLISECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    3. 删除本服务在注册中心

    通过Spring的默认监听器(ContextClosedEvent)实现,当容器关闭的时候将服务在注册中心删除

    public class WbUnRegisterListener implements ApplicationListener<ContextClosedEvent> {
    
        @Autowired
        private IRegister register;
        @Override
        public void onApplicationEvent(ContextClosedEvent event) {
            register.unRegister();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    自定义负载均衡

    在自己的消费项目中,自定义一个负载均衡类,实现com.wb.spring.boot.autoconfigure.load.Load接口,并把类注入到spring容器中即可。

    public class MyLoad implements Load {
        @Override
        public String load(List<String> servers) {
            return servers.get(0);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @Configuration
    public class MyConfig {
    
        @Bean
        public Load load1() {
            Load load = new MyLoad();
            return load;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    项目地址

    https://gitee.com/xu–wenbin/wb-spring-boot-project

  • 相关阅读:
    重装系统全流程
    Dubbo服务控制台Dubbo Admin配置
    【元宇宙欧米说】从GameFi的视角讨论Web2到Web3的利弊
    react+ts之router管理
    Redis之Lua的应用(四)
    拼多多季报图解:营收355亿同比增65% 研发投入达27亿
    一次性生物反应器袋充气压力更安全、精密和快速的控制方法
    CSS躬行记(11)——管理后台响应式改造
    STM32 GPIO工作原理
    MATLAB : Plot函数及其用法
  • 原文地址:https://blog.csdn.net/weixin_42060779/article/details/126856747