• 云e办(后端)——RabbitMQ生产可靠性及消费端幂等性(邮件发送)


    云e办(后端)——RabbitMQ生产可靠性及消费端幂等性(邮件发送)

    一、生产可靠性投递方案

    我们需要去思考两个问题:

    • 1.如何保证生产可靠性的投递?【如何确保生产端真正的投递到了队列、以及消费者消费了】
    • 2.消费者幂等性的保证?【如何确保消费者只消费一条消息】
      用RabbitMQ无可避免的会出现,一条消息重复多发的情况。其实说:如何确保只消费一条消息 ,而重复多发丢弃即可。

    正确的步骤:

    • 1、生产者如何保证消息正常发送。
    • 2、保证rabbitMQ的节点,队列能成功接收到消息
    • 3、保证发送端能够收到rabbit节点收到确认应答。(队列收到了消息,要给生产端发送确认应答)

    RabbitMQ本质是生产层、队列、消费者、

    • 生产者只负责:发送消息
    • 队列负责消息的中转
    • 消费者只负责消息的消费
    方法一: 消息落库,对消息状态进行打标:

    在这里插入图片描述

    BIZ.DB是业务数据库
    MSG.DB是消息数据库

    • 1.将业务数据入库,发送的消息入到消息数据库

    • 2.发送消息到RabbitMQ节点。

    • 3.Rabbit开启确认回调.。生产者会监听确认回调(Cinfirm Listener)

    • 4.如果监听成功,表示消息已经收到了。那么就会更新消息数据库:更改状态:发送成功。

    • 5.【分布式定时任务】查看消息数据库的消息还有没有正在发送中的消息

    • 6.如果有则:进行重发

    • 7.如果重复3次,监听者还是没有监听成功,就设为发送失败。

    总结:
    1.频发操作数据库:一次发送消息,最少需要3次操作数据库。

    方法二:消息延迟投递,做二次确认,回调检查

    在这里插入图片描述

    • 1.发送消息到rabbitMQ节点。
    • 2.延迟第二次发送【一条消息发送了两次,中间有间隔时间】。
    • 3.消费者接收消息
    • 4.消费者生成确认消息,发送给rabbitMQ里面
    • 5.【回调服务】:假设监听到第一次发送,监听消费者的发送确认。如果监听消费者的确认服务,就会把消息放到消息数据库里面。
      【毁掉服务】:假设监听到二次延迟投递的消息,那么就会去数据库检查,有没有数据。如果数据库不存在数据,就会发起RPC通信,要求从第一步,重新开始。

    总结:
    优点是:数据库操作减少了。

    二、生产端可靠性的投递

    在云e办项目中,我们使用第一种方法去确保生产可靠性。

    数据库表的说明:

    msgld:消息里面自动生成的id(uuid),是消息自动生成的唯一id。
    eid:员工id
    tryTime:重试时间。1分钟后再次查询有无查询成功。
    在这里插入图片描述

    1.定义消息状态常量 pojo/MailConstants.java
    package com.xxxx.server.pojo;
    
    public class MailConstants {
        //消息投递中
        public static final Integer DELIVERINg=0;
        //消息投递成功
        public static final Integer SUCCESS = 1;
        //消息投递失败
        public static final Integer FAILURE =2;
        //最大重试次数
        public static final Integer MAX_TRY_CONT = 3;
        //消息超时时间
        public static final Integer MSG_TIMEOUT =1;
        //队列
        public static final String MAIL_QUEUE_NAME="mail.queue";
        //交换机
        public static final String MAIL_EXCHANGE_NAME="mail.exchange";
        //路由键
        public static final String MAIL_ROUTING_KEY_NAME="mail.routing.key";
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    2.修改新增员工的方法 EmployeeServiceImpl.java

    发送消息时,将当前消息数据存入数据库,投递状态为消息投递中

    @Autowired
        private RabbitTemplate rabbitTemplate;
        @Autowired
        private MailLogMapper mailLogMapper;
        /**
         * 添加员工
         * @param employee
         * @return
         */
        @Override
        public RespBean insertEmployee(Employee employee) {
            //处理合同期限,保留2位小数
            //获取合同开始的时间
            LocalDate beginContract = employee.getBeginContract();
            //获取合同结束的时间
            LocalDate endContract = employee.getEndContract();
            //计算有多少天
            long days = beginContract.until(endContract, ChronoUnit.DAYS);
            // 将天数保留两位小数点
            DecimalFormat decimalFormat = new DecimalFormat("##.00");
            employee.setContractTerm(Double.parseDouble(decimalFormat.format(days/365.00)));
            if (1==employeeMapper.insert(employee)) {
                //1.获取员工对象
                Employee emp = employeeMapper.getEmployee(employee.getId()).get(0);
    
                //数据库记录发送的消息
                String msgId = UUID.randomUUID().toString();
                MailLog mailLog = new MailLog();
                mailLog.setMsgId(msgId);
                //员工id
                mailLog.setEid(employee.getId());
                mailLog.setStatus(0);
                mailLog.setRouteKey(MailConstants.MAIL_ROUTING_KEY_NAME);
                mailLog.setExchange(MailConstants.MAIL_EXCHANGE_NAME);
                mailLog.setCount(0);
                mailLog.setTryTime(LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT));
                mailLog.setCreateTime(LocalDateTime.now());
                mailLog.setUpdateTime(LocalDateTime.now());
                mailLogMapper.insert(mailLog);
    
                //发送邮件:
                //2.通过RabbitMQ进行发送  converAndSend(路由key,员工对象)
                rabbitTemplate.convertAndSend( MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,emp,
                        new CorrelationData(msgId));
                return RespBean.success("添加成功!");
            }
            return RespBean.error("添加失败!");
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    3.修改邮件服务 VoaMailApplication.java MailReceiver.java

    将队列名改为常量定义的队列名

    在这里插入图片描述

    4.开启消息确认回调以及消息失败回调 RabbitMQConfig.java
    package com.xxxx.server.config;
    
    import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
    import com.xxxx.server.pojo.MailConstants;
    import com.xxxx.server.pojo.MailLog;
    import com.xxxx.server.service.IMailLogService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
    
        public static final Logger Logger = LoggerFactory.getLogger(RabbitMQConfig.class);
        @Autowired
        private CachingConnectionFactory cachingConnectionFactory;
        @Autowired
        private IMailLogService mailLogService;
        @Bean
        public RabbitTemplate rabbitTemplate(){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
            /**
             * 消息确认回调,确认消息是否到达broker
             * data:消息唯一标识
             * ack:确认结果
             * cause:失败原因
             */
            rabbitTemplate.setConfirmCallback((data,ack,cause)->{
                String msgId = data.getId();
                if (ack){
                    //消息确认成功
                    Logger.info("{}=====>消息发送成功",msgId);
                    //更新数据库中记录
                    mailLogService.update(new UpdateWrapper<MailLog>
                            ().set("status",1).eq("msgId",msgId));
                }else {
                    Logger.info("{}=====>消息发送失败",msgId);
                }
            });
            /**
             * 消息失败回调,比如router不到queue时回调
             * msg:消息主题
             * repCode:响应码
             * repText:响应描述
             * exchange:交换机
             * routingKey:路由键
             */
    
            rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingKey)->{
                Logger.info("{}=====>消息发送到queue时失败",msg.getBody());
            });
            return rabbitTemplate;
        }
    
             @Bean
        public Queue queue(){
            return new Queue(MailConstants.MAIL_QUEUE_NAME,true);
        }
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME);
        }
        @Bean
        public Binding binding(){
            return BindingBuilder.bind(queue()).to(directExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    5.定时任务重发失败消息 MailTask.java

    重新投递失败的消息。重试超过3次,更新投递状态为投递失败

    package com.xxxx.server.task;
    import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
    import com.xxxx.server.pojo.Employee;
    import com.xxxx.server.pojo.MailConstants;
    import com.xxxx.server.pojo.MailLog;
    import com.xxxx.server.service.IEmployeeService;
    import com.xxxx.server.service.IMailLogService;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import java.time.LocalDateTime;
    import java.util.List;
        /**
         * 邮件发送定时任务
         *
         * @author zhoubin
         * @since 1.0.0
         */
    
    @Component
    public class MailTask {
        @Autowired
        private IMailLogService mailLogService;
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Autowired
        private IEmployeeService employeeService;
        /**
         * 邮件发送定时任务
         * 10秒一次
         */
        @Scheduled(cron = "0/10 * * * * ?")
        public void mailTask() {
            //状态为0且重试时间小于当前时间的才需要重新发送
            List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>
                    ().eq("status", 0).lt("tryTime",
                    LocalDateTime.now()));
            list.forEach(mailLog -> {
                //重试次数超过3次,更新为投递失败,不再重试
                if (3 <= mailLog.getCount()) {
                    mailLogService.update(new UpdateWrapper<MailLog>().set("status",
                            2).eq("msgId", mailLog.getMsgId()));
                }
                //更新重试次数,更新时间,重试时间
                mailLogService.update(new UpdateWrapper<MailLog>()
                        .set("count",mailLog.getCount()+1)
                        .set("updateTime",LocalDateTime.now())
                        .set("tryTime",LocalDateTime.now()
                                .plusMinutes(MailConstants.MSG_TIMEOUT))
                        .eq("msgId", mailLog.getMsgId()));
                Employee emp = employeeService.getEmployee(mailLog.getEid()).get(0);
                //发送消息
                rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,
                        MailConstants.MAIL_ROUTING_KEY_NAME, emp,
                        new CorrelationData(mailLog.getMsgId()));
            });
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    在这里插入图片描述

    三、消费端消息接收主要考虑幂等性的问题,这边使用 Redis 处理

    介绍幂等性问题

    有可能投递多次id一样的消息。
    例如上述的定时重发,例如发送了第一条消息,处于投递中。刚发完还没有收到监听回调,还没有变更状态时,定时任务抓到了该消息,会第二次重发消息。从而会发送两次一样的消息。消费端如何应对呢?

    市面上主流的幂等性操作有两种:
    • 1.唯一的Id+指纹码。 唯一id可以是自增id,指纹码用msgid(也就是uuid)
      消息过来的时候,消费之前,根据id进行数据查询有没有数据,如果有数据说明消费过了,没有数据说明,会进行消费。同时将数据放到数据库里面。
      总结:简单。缺点:高并发数据库会有瓶颈
    • 2.redis原子性。
      问题:业务数据要不要入库,考虑数据库和redis做缓存。
      如果不考虑入库,那么都存在内存里面,考虑定时同步策略。
    本次用的策略是:

    用的是第一种,但是用redis,redis也不考虑原子性。那么就是:msgid+id存到redis里面去,每次进行消费之前,先去redis查询有无该id,有该id说明已经消费过了,不在消费了。如果redis没有该id,说明没有消费过,那么就正常的消费,并且将数据存到reids里面

    修改配置

    除了添加 Redis 相应配置,还要开启 RabbitMQ 的手动确认机制

    server:
      # 端口
      port: 8082
    spring:
      # 邮件配置
      mail:
        # 邮件服务器地址
        host: smtp.qq.com
        # 协议
        protocol: smtp
        # 编码格式
        default-encoding: utf-8
        # 授权码(在邮箱开通服务时获取)
        password: meqxqfgkoowhbagd
        # 发送者邮箱地址
        username: huyelinlin@qq.com
         
        # 端口(不同邮箱端口号不同)
    #    port: 465
        properties:
          mail:
            stmp:
    #          port: 465
              auth: true #设置是否需要验证,如果为true,那么用户名和密码是必须的,如果是false,可以不设置用户名和密码,这也得看对接平台是否支持无密码登陆
              starttls: #是对纯文本通信协议的扩展,它提供一种方式将纯文本连接升级为加密连接(TLS或SSL),而不是另外使用一个端口作加密通道
                enable: true
                required: true
              ssl:
                enable: true
                required: true
              socketFactory:
                class: javax.net.ssl.SSLSocketFactory
                port: 465
          # rabbitmq配置
      rabbitmq:
        # 用户名
        username: yeb
        # 密码
        password: yeb
        # 服务器地址
        host: 192.168.75.100
        # 端口
        port: 5672
        listener:
          simple:
            # 手动确认
            acknowledge-mode: manual
      # Redis配置
      redis:
        timeout: 10000ms                        # 连接超时时间
        host: 192.168.                    # Redis服务器地址
        port: 6381                     # Redis服务器端口
        database: 1                    # 选择哪个库,默认0库
        password: root
        lettuce:
          pool:
            max-active: 1024                    # 最大连接数,默认 8
            max-wait: 10000ms                   # 最大连接阻塞等待时间,单位毫秒,默认 -1
            max-idle: 200                       # 最大空闲连接,默认 8
            min-idle: 5                         # 最小空闲连接,默认 0
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    修改邮件发送服务

    首先去 Redis 查看当前消息id是否存在,如果存在说明已经消费,直接返回。如果不存在,正常发送消息,并将消息id存入 Reids 。需要手动确认消息。

    package com.xxxx.mail;
    
    import com.rabbitmq.client.Channel;
    import com.xxxx.server.pojo.Employee;
    import com.xxxx.server.pojo.MailConstants;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.mail.MailProperties;
    import org.springframework.data.redis.core.HashOperations;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.mail.javamail.JavaMailSender;
    import org.springframework.mail.javamail.MimeMessageHelper;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.stereotype.Component;
    import org.thymeleaf.TemplateEngine;
    import org.thymeleaf.context.Context;
    import javax.mail.internet.MimeMessage;
    import java.io.IOException;
    import java.util.Date;
    /**
     * 消息接收者
     *
     * @author zhoubin
     * @since 1.0.0
     */
    @Component
    public class MailReceiver {
        private static final Logger logger = LoggerFactory.getLogger(MailReceiver.class);
        @Autowired
        private JavaMailSender javaMailSender;
        @Autowired
        private MailProperties mailProperties;
        @Autowired
        private TemplateEngine templateEngine;
        @Autowired
        private RedisTemplate redisTemplate;
    
        /**
         * 邮件发送
         */
        @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
        public void handler(Message message, Channel channel) {
            Employee employee = (Employee) message.getPayload();
    
            MessageHeaders headers = message.getHeaders();
            //消息序号
            long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
            String msgId = (String)headers.get("spring_returned_message_correlation");
            HashOperations hashOperations = redisTemplate.opsForHash();
            try {
                if (hashOperations.entries("mail_log").containsKey(msgId)) {
                    //redis中包含key,说明消息已经被消费
                    logger.info("消息已经被消费=====>{}", msgId);
                    /**
                     * 手动确认消息
                     * tag:消息序号
                     * multiple:是否多条
                     */
                    channel.basicAck(tag, false);
                    return;
                }
                MimeMessage msg = javaMailSender.createMimeMessage();
                MimeMessageHelper helper = new MimeMessageHelper(msg);
                //发件人
                helper.setFrom(mailProperties.getUsername());
                //收件人
                helper.setTo(employee.getEmail());
                //主题
                helper.setSubject("入职欢迎邮件");
                //发送日期
                helper.setSentDate(new Date());
                //邮件内容
                Context context = new Context();
                context.setVariable("name", employee.getName());
                context.setVariable("posName", employee.getPosition().getName());
                context.setVariable("joblevelName", employee.getJoblevel().getName());
                context.setVariable("departmentName", employee.getDepartment().getName());
                String mail = templateEngine.process("mail", context);
                helper.setText(mail, true);
                //发送邮件
                javaMailSender.send(msg);
                logger.info("邮件发送成功");
                //将消息id存入redis
                hashOperations.put("mail_log",msgId,"OK");
                //手动确认消息
                channel.basicAck(tag,false);
            } catch (Exception e) {
                try {
                    /**
                     * 手动确认消息
                     * tag:消息序号
                     * multiple:是否多条
                     * requeue:是否回退到队列
                     */
                    channel.basicNack(tag,false,true);
                } catch (IOException ex) {
                    logger.error("消息确认失败=====>{}", ex.getMessage());
                }
                logger.error("邮件发送失败=====>{}", e.getMessage());
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
  • 相关阅读:
    LeetCode程序员面试金典(第 6 版)上
    电力系统|基于分布式高斯-牛顿方法结合置信传播 (BP) 的概率推理方法的非线性状态估计 (SE) 模型(Matlab代码实现)
    【python数学建模】Matplotlib库
    [附源码]计算机毕业设计在线招聘网站Springboot程序
    Spring系列-细说bean标签的parent属性使用
    初见QT,控件的基本应用,实现简单登录窗口
    易基因|性别分化:DNA甲基化表观遗传调控性别表型可塑性,诱导斑马鱼性别比例失调
    Pandas多级索引数据处理及fillna()填充方式
    Vue中如何进行滚动加载与无限滚动
    Nacos源码系列—订阅机制的前因后果(下)
  • 原文地址:https://blog.csdn.net/xiangqian0721/article/details/127749389