码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • 【mq】从零开始实现 mq-07-负载均衡 load balance


    前景回顾

    【mq】从零开始实现 mq-01-生产者、消费者启动

    【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

    【mq】从零开始实现 mq-03-引入 broker 中间人

    【mq】从零开始实现 mq-04-启动检测与实现优化

    【mq】从零开始实现 mq-05-实现优雅停机

    【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

    【mq】从零开始实现 mq-07-负载均衡 load balance

    为什么需要负载均衡

    大家好,我是老马。

    这一节让我们看一下如何实现 MQ 的负载均衡。

    为什么需要负载均衡呢?

    作用

    负载均衡最核心的作用:

    (1)可以避免单点故障

    (2)可以让请求均分的分散到每一个节点

    实现思路

    负载均衡实现的方式比较多,最简单的就是随机选择一个。

    拓展阅读:

    从零手写实现负载均衡 http://houbb.github.io/2020/06/19/load-balance-03-hand-write

    负载均衡

    MQ 中用到负载均衡的地方

    生产者发送

    生产者发送消息时,可以发送给任一 broker。

    broker 推送给消费者

    broker 接收到消息以后,在推送给消费者时,也可以任一选择一个。

    消费者的消费 ACK

    消费者消费完,状态回执给 broker,可以选择任一一个。

    消息黏连

    有些消息比较特殊,比如需要保证消费的有序性,可以通过 shardingKey 的方式,在负载的时候固定到指定的片区。

    代码实现

    生产者发送

    统一调整获取 channel 的方法。

    @Override
    public Channel getChannel(String key) {
        // 等待启动完成
        while (!statusManager.status()) {
            log.debug("等待初始化完成...");
            DateUtil.sleep(100);
        }
        RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance,
                channelFutureList, key);
        return rpcChannelFuture.getChannelFuture().channel();
    }
    

    工具类实现为核心实现:

    /**
     * 负载均衡
     *
     * @param list 列表
     * @param key 分片键
     * @return 结果
     * @since 0.0.7
     */
    public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance,
                                                    final List<T> list, String key) {
        if(CollectionUtil.isEmpty(list)) {
            return null;
        }
    
        if(StringUtil.isEmpty(key)) {
            LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance()
                    .servers(list);
            return loadBalance.select(loadBalanceContext);
        }
    
        // 获取 code
        int hashCode = Objects.hash(key);
        int index = hashCode % list.size();
        return list.get(index);
    }
    

    如果指定了 shardingKey,那么根据 shadringKey 进行 hash 判断。

    如果没有,则进行默认的负载均衡策略。

    Broker 消息推送给消费者

    消费者订阅列表的获取:

    @Override
    public List<Channel> getSubscribeList(MqMessage mqMessage) {
        final String topicName = mqMessage.getTopic();
        Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);
        if(CollectionUtil.isEmpty(set)) {
            return Collections.emptyList();
        }
    
        //2. 获取匹配的 tag 列表
        final List<String> tagNameList = mqMessage.getTags();
        Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();
        for(ConsumerSubscribeBo bo : set) {
            String tagRegex = bo.getTagRegex();
            if(hasMatch(tagNameList, tagRegex)) {
                //TODO: 这种设置模式,统一添加处理 haven
                String groupName = bo.getGroupName();
                List<ConsumerSubscribeBo> list = groupMap.get(groupName);
                if(list == null) {
                    list = new ArrayList<>();
                }
                list.add(bo);
                groupMap.put(groupName, list);
            }
        }
    
        //3. 按照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 选择
        final String shardingKey = mqMessage.getShardingKey();
        List<Channel> channelList = new ArrayList<>();
        for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) {
            List<ConsumerSubscribeBo> list = entry.getValue();
            ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey);
            final String channelId = bo.getChannelId();
            BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);
            if(entryChannel == null) {
                log.warn("channelId: {} 对应的通道信息为空", channelId);
                continue;
            }
            channelList.add(entryChannel.getChannel());
        }
        return channelList;
    }
    

    核心逻辑:RandomUtils.loadBalance(loadBalance, list, shardingKey); 获取,其他的保持不变。

    消费者 ACK

    消费者也是类似的,获取 channel 的方式调整如下:

    public Channel getChannel(String key) {
        // 等待启动完成
        while (!statusManager.status()) {
            log.debug("等待初始化完成...");
            DateUtil.sleep(100);
        }
    
        RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,
                channelFutureList, key);
        return rpcChannelFuture.getChannelFuture().channel();
    }
    

    小结

    负载均衡在分布式服务中,是必备的特性之一。实现的原理并不算复杂。

    希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

    我是老马,期待与你的下次重逢。

    开源地址

    The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

    拓展阅读

    rpc-从零开始实现 rpc https://github.com/houbb/rpc

  • 相关阅读:
    在很多nlp数据集上超越tinybert 的新架构nlp神经网络模型
    带内全双工水声通信系统自干扰抵消技术研究框架与思路
    红黑树与AVL树
    Node当中的事件循环
    R语言glm函数使用频数数据构建二分类logistic回归模型,分析的输入数据为频数数据、将频数数据转化为正常样本数据(拆分、裂变为每个频数对应的样本个数)
    性能优化:记一次树的搜索接口优化思路
    JavaSE 第六章 面向对象基础-中(封装)
    基于对立非洲秃鹫优化算法求解单目标优化问题(OAVOA)含Matlab代码
    MySQL——EXPLAIN用法详解
    SpringCloud 学习笔记总结 (八)
  • 原文地址:https://www.cnblogs.com/houbbBlogs/p/16245351.html
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号