• 数据同步、


    数据同步

    ​ 利用MQ实现mysql与elasticsearch数据数据同步,当酒店数据发生增,删,改,要求对elasticseartch中的数据也要完成相关操作

    导入amqp的依赖坐标

    
        org.springframework.boot
        spring-boot-starter-amqp
    
    
    • 1
    • 2
    • 3
    • 4

    配置相关文件

    spring:
     rabbitmq:
       host: 192.168.205.128
       port: 5672
       username: ylh
       password: 20020630
       virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    同理:

    hotel-demo中也要配置

    在hotel-demo中:

    创建一个类保存交换机,队列,绑定关系

    public class MqConstans {
        //交换机名称
        public final static  String HOTEL_EXCHANGE="hotel.topic";
    
        //监听新增和修改的队列
        public final static  String HOTEL_INSERT_QUEUE="hotel.insert.queue";
        //删除的队列
        public final static  String HOTEL_DELETE_QUEUE="hotel.delete.queue";
    
        //新增或修改的RoutingKey
        public final static String HOTEL_INSERT_KEY="hotel.insert";
        //删除的RoutingKey
        public final static String HOTEL_DELETE_KEY="hotel.delete";
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里采用bean注入的方式声明交换机,队列,绑定关系(也可以使用注解的方式)

    @Configuration
    public class MqConfig {
        @Bean
        public TopicExchange topicExchange(){
            return  new TopicExchange(MqConstans.HOTEL_EXCHANGE,true,false);
        }
        @Bean
       public Queue insertQueue(){
            return new Queue(MqConstans.HOTEL_INSERT_QUEUE,true);
        }
        @Bean
        public Queue deleteQueues(){
            return new Queue(MqConstans.HOTEL_DELETE_QUEUE,true);
        }
        @Bean
        public Binding insertQueueBinding(){
            return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstans.HOTEL_INSERT_KEY);
    
        }
        @Bean
        public Binding deleteQueueBinding(){
            return BindingBuilder.bind(deleteQueues()).to(topicExchange()).with(MqConstans.HOTEL_DELETE_KEY);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在hotel-admin中也创建一个类保存交换机,队列,绑定关系

    发送消息:

    当数据库内的数据发送修改,删除,新增的时候

    修改对应的es

    在hotel-admin中的controller中修改:

    1.PostMapping:(新增)

    加入

    rabbitTemplate.convertAndSend(MqConstans.HOTEL_EXCHANGE,MqConstans.HOTEL_INSERT_KEY,hotel.getId());
    }
    
    • 1
    • 2

    2.PutMapping中(修改)

    加入

    rabbitTemplate.convertAndSend(MqConstans.HOTEL_EXCHANGE,MqConstans.HOTEL_INSERT_KEY,hotel.getId());
    
    • 1

    3.deleteMapping中加入(删除)

    rabbitTemplate.convertAndSend(MqConstans.HOTEL_EXCHANGE,MqConstans.HOTEL_DELETE_KEY,id);
    
    • 1

    在hotel-demo中:

    创建监听队列

    定义一个类

    @Component
    public class hotelLister {
        @Autowired
        private IHotelService hotelService;
        //监听酒店新增或删除的业务
        @RabbitListener(queues = MqConstans.HOTEL_INSERT_QUEUE)
        public void listernHotelInsertOrUpdate(Long id){
            hotelService.insertById(id);
        }
    
        //监听酒店新增或删除的业务
        @RabbitListener(queues = MqConstans.HOTEL_DELETE_QUEUE)
        public void listernHotelDelete(Long id){
            hotelService.deleteById(id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    创建insertById(id)和deleteById(id)方法

    @Override
    public void deleteById(Long id) {
        try {
            //准备request
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            //准备发送请求
            client.delete(request, RequestOptions.DEFAULT);
        }catch (Exception e){
            throw new RuntimeException();
        }
    }
    
    @Override
    public void insertById(Long id) {
        try {
            //根据id查询酒店数据
            Hotel hotel = getById(id);
            //转换对象
            HotelDoc hotelDoc = new HotelDoc(hotel);
    
            //准备request
            IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
    
            //准备JSON文档
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
    
            //发送请求
            client.index(request, RequestOptions.DEFAULT);
    
    
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
  • 相关阅读:
    使用token登录提交到github
    共享内存 - 多进程编程(三)
    并发编程学习笔记 之 常用并发容器的概念及使用方法
    dd命令创建指定大小的文件
    2022-08-16 数据库先验性记录
    咬文嚼图式的介绍二叉树、B树/B-树
    5 分钟教你搭建「视频动作分类」系统
    猫头虎分享已解决Bug || TypeError: Cannot read property ‘map‘ of undefined**
    2023年软件测试已经崩盘了吗,为什么都找不到工作了?
    win11自带矩形块截屏、录屏、视频编辑等功能
  • 原文地址:https://blog.csdn.net/qq_57907966/article/details/126567440