• 0704~springboot整合ES&RabbitMQ


    主要做课程上线功能:

     点击课程上架,操作步骤: 整合ES与RabbitMQ

    1.把选中的id传入后台,修改上线状态;

    2.把修改状态的对象存入ES索引库;

    3.通过rabbitMQ给用户推送营销消息;

     1.把选中的id传入后台,修改上线状态;第一步昨天已总结;

    2.把修改状态的对象存入ES索引库;

    思路:

    1.单独建立一个微服务用来做es查询保存操作,导入依赖,配置yml;

    2.编写一个接口,交给spring管理,继承ElasticsearchRepository接口,泛型写doc文档实体类和Long;

     3.编写controller,注入自己编写的接口CourseRepository,调用方法;

    4.编写feign,把接口暴露给课程服务使用;

    5.课程微服务直接es微服务的feign接口使用 即可;

    注意:doc文档实体类上加注解

    @Document(indexName= "hrm", type = "course") indexname为es索引库名,type为es索引表,类似于mysql的建库建表;

     1.单独建立一个微服务用来做es查询保存操作,导入依赖,配置yml;

    <!--ES的包-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>
    spring:
      data:
        elasticsearch:
          cluster-name: elasticsearch #集群名称
          cluster-nodes: 127.0.0.1:9300 #9200是图形界面端,9300代码你的端口
      application:
        name: service-es #服务名
      #配置数据库链接信息

    2.编写一个接口,交给spring管理,继承ElasticsearchRepository接口,泛型写doc文档实体类和Long;

    @Component
    public interface CourseRepository extends ElasticsearchRepository<CourseDoc,Long> {
    }

     3.编写controller,注入自己编写的接口CourseRepository,调用方法;

     

    @RestController
    @RequestMapping("/es")
    public class CourseEsController {
    
        @Autowired
        private CourseRepository courseRepository;
    
        @PostMapping("/course")
        public JSONResult CourseEs(@RequestBody List<CourseDoc>docList){
            courseRepository.saveAll(docList);
            return JSONResult.success();
        }

    4.编写feign,把接口暴露给课程服务使用;

     

    @FeignClient(value = "service-es",fallbackFactory = FallbackFactoryCourse.class)
    public interface CourseFeign {
        @PostMapping("/es/course")
        public JSONResult CourseEs(@RequestBody List<CourseDoc> docList);

    5.课程微服务直接es微服务的feign接口使用 即可;

    5.1创建一个list来存放前端选中的数据;

    5.2.通过前端选中的id找出课程对象与课程市场对象,用BeanUtils.copyProperties(起始数据,目标数据)方法给doc文档类赋值;

    5.3返回list即可;

    @Override
    public JSONResult onLineCourse(CourseQuery query) {
        //上下线
        //思路
        //1.通过前端传过来的id拿到对象;
        //2.拿到对象给对象设置状态;
        //3.修改返回即可;
        //4.把上线的课程存入es索引库
        //5.通过rabbitMQ发送消息给用户
        Long[] ids = query.getIds();
        List<CourseDoc> list = new ArrayList<>();
        CourseDoc courseDoc = new CourseDoc();
        for (Long id : ids) {
            Course course = courseMapper.selectById(id);
            if(course.getStatus()!=0){
                throw new MyException("当前课程已经上线啦~");
            }
            course.setStatus(1);
            courseMapper.updateById(course);
            CourseMarket courseMarket = courseMarketMapper.selectById(course.getId());
            System.out.println(courseMarket);
            BeanUtils.copyProperties(courseMarket,courseDoc);
            BeanUtils.copyProperties(course,courseDoc);
            list.add(courseDoc);
            courseFeign.CourseEs(list);
        }

    3.通过rabbitMQ给用户推送营销消息;

    springboot整合RabbitMQ思路:

    1.导入依赖,添加yml配置;

    <!--spirngboot集成rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    #RabbitMQ相关 放在第二层
    rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: guest
      virtualHost: /
      publisher-confirms: true #消息发送到交换机后的回调
      publisher-returns: true  #消息由交换机发到队列失败后的回调
      template:
        mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

    2.rabbitmq配置类,配置队列名与交换机机名,还有队列 与交换机的绑定; 以及初始化RabbitAdmin对象

    @Component
    public class ConfigRabbitMQ {
    
    
        //创建交换机
        @Bean
        public Exchange getExchange(){
            return ExchangeBuilder.topicExchange("课程交换机名").build();
        }
        //创建队列
        @Bean
        public Queue getQueue(){
            return new Queue("课程队列名",true,false,false);
        }
        //绑定交换机与队列
        @Bean
        public Binding getBinding(){
            return BindingBuilder.bind(getQueue()).to(getExchange()).with("课程路由键").noargs();
        }
    
        //初始化RabbitAdmin对象
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
            rabbitAdmin.setAutoStartup(true);
    
            //下面设置目的:项目启动时,就创建交换机和队列
            //创建交换机
            rabbitAdmin.declareExchange(getExchange());
            //创建对列
            rabbitAdmin.declareQueue(getQueue());
    
            return rabbitAdmin;
        }
    }

     第二个配置类,配置rabbitmq的序列化配置,以及两个回调函数(confirms,returns);

    /**
     * @description: 做序列化
     */
    @Configuration
    public class RabbitMQConverterConfig implements RabbitListenerConfigurer{
    
        //以下配置RabbitMQ消息服务
        @Autowired
        public ConnectionFactory connectionFactory;
    
        @Bean
        public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
            factory.setMessageConverter(new MappingJackson2MessageConverter());
            return factory;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(3);
            factory.setMaxConcurrentConsumers(10);
            //设置手动签收
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            // 这里的转换器设置实现了发送消息时自动序列化消息对象为message body
            template.setMessageConverter(jsonMessageConverter());
            template.setMandatory(true);
            return template;
        }
    }

    3.使用RabbitTemplate发送队列消息;

     

    rabbitTemplate.convertAndSend("课程交换机名","课程路由键","这是新上线的队列");

     高频面试题;如何保持RabbitMQ消息不丢失?

    1、需要设置Confirm和Return回调方法进行处理

    2、然后搞一张消息发送的记录表,里面包含如下字段:交换机名称、routingkey、消息内容、消息状态、重试次数等字段

    3、发送消息的时候,将状态置为发送中,重试次数给个默认值(可以从配置表中取),如果Confirm回调里的ack是false,那么我们就需要将状态更新为发送失败,否则更新为发送成功(0:发送中;1:生产者到交换机失败;2:交换机到队列失败;3:成功)

    4、Return回调方法只要被触发,说明消息肯定发送失败了,直接将状态改为发送失败

    5、搞一个定时任务,定时去扫描该表的所有状态为发送失败的记录,重试次数大于0的消息,重新进行消息发送,每发送一次,重试次数字段减一,直到0为止

    6、当重试次数等于0时,说明发送了很多次还是失败,此时需要发短信或邮件告知运维人员,进行人工干预了

     原理理解图:

     

     

     
    

     

     

     

  • 相关阅读:
    spring 绑定数据,国际化处理,上传下载文件
    【小程序入门】App函数注册小程序实例
    离子液体(IonicLiquid)用于
    利用函数指针数组写计算器(转移表)
    【最详细】最新最全Java基础面试题(52道)
    ROS2——动作(十一)
    PCL 生成球形点云
    React Hooks的理解
    链表OJ题
    1.django部署(案例)
  • 原文地址:https://blog.csdn.net/m0_67574688/article/details/125609534