• Kafka+redis分布式锁结合使用心得总结


    #kafka部分

    @KafkaListener(topics = "#{'${vsmart_alert_detection_tms_send_message_topic}'.split(',')}", groupId = "${vsmart.alert.detection.consumer.group}")
    public void vsmartAlertDetectionTmsSendMessage(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        doSendMessage(record,ack);
    }

    private void doSendMessage(ConsumerRecord record, Acknowledgment ack) {
        Optional message = Optional.ofNullable(record.value());
        String key = record.topic() + "-" + record.partition() + "-offset:" + record.offset();
        if (RedisUtils.isExistsKey(key)) {
            ack.acknowledge();
            return;
        }

        try {
            if (message.isPresent() && (record.timestamp() > (System.currentTimeMillis() - kafkaConsumerDelayTime))) {
                JSONObject msg = JSONObject.parseObject(record.value().toString());
                msg.put(VSMART_KAFKA_MSG_POSITION_INFO, key);

                //具体操作
            }
        }catch (Exception e){
            
        }finally {
            ack.acknowledge();
        }
    }    

    #redis部分 

    public Boolean handler(JSONObject msg) {
        //解析
        Boolean isOk = jsonToDetectionInfos(msg);

        if (!isOk) {
            return false;
        }

        //加锁 associatedKey()
        
        String lockKey = associatedKey();
        if (StrUtil.isEmpty(lockKey)) {
            return false;
        }
        RLock lock = SpringUtils.getBean(RedissonClient.class).getLock(lockKey);
        //锁的时间 根据业务需要进行调整
        try {
            boolean flag_2 = lock.tryLock(10, 300, TimeUnit.SECONDS);

            if (flag_2) {        
                //加锁后执行前判断是否已经处理过kafka中相同位置的信息了
                if (ObjectUtil.isNotNull(msg) &&
                        ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO)) &&
                        RedisUtils.isExistsKey(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO))) {            
                    return false;
                }

                //具体业务操作
                //...
                    
                return true;
            } else {
                detectionRuleBo.getLogText().append(StrUtil.format("{}-获取锁失败;", detectionRuleBo.getName())).append("
    ");
                return false;
            }

        } catch (Exception e) {
            
        } finally {
            ///释放锁
            if (null != lock && lock.isHeldByCurrentThread()) {        
                if (ObjectUtil.isNotNull(msg) &&
                        ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO))) {
                    RedisUtils.setCacheStrExpire(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), 60 * 60);
                }
                //解锁
                lock.unlock();
            }
            return true;
        }
    }

  • 相关阅读:
    互联网云厂商大转向:在海外重燃新「战事」
    市场周刊杂志市场周刊杂志社市场周刊编辑部2022年第6期目录
    ubuntu安装 miniconda 同步时间
    C语言文件操作 | 文件分类、文件打开与关闭、文件的读写、文件状态、文件删除与重命名、文件缓冲区
    音频编辑软件Steinberg SpectraLayers Pro mac中文软件介绍
    循序渐进学Git(可复习)
    【仿牛客网笔记】 Spring Boot进阶,开发社区核心功能-发布帖子
    收藏:不能不刷的数字后端面试题,含解析
    『C++ - 模板』之模板进阶
    MySQL 日期函数大全(更新中.....)
  • 原文地址:https://blog.csdn.net/weixin_40976261/article/details/134282511