利用MQ实现mysql与elasticsearch数据数据同步,当酒店数据发生增,删,改,要求对elasticseartch中的数据也要完成相关操作
导入amqp的依赖坐标
org.springframework.boot
spring-boot-starter-amqp
配置相关文件
spring:
rabbitmq:
host: 192.168.205.128
port: 5672
username: ylh
password: 20020630
virtual-host: /
同理:
hotel-demo中也要配置
在hotel-demo中:
创建一个类保存交换机,队列,绑定关系
public class MqConstans {
//交换机名称
public final static String HOTEL_EXCHANGE="hotel.topic";
//监听新增和修改的队列
public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";
//删除的队列
public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";
//新增或修改的RoutingKey
public final static String HOTEL_INSERT_KEY="hotel.insert";
//删除的RoutingKey
public final static String HOTEL_DELETE_KEY="hotel.delete";
}
这里采用bean注入的方式声明交换机,队列,绑定关系(也可以使用注解的方式)
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstans.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstans.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueues(){
return new Queue(MqConstans.HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstans.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueues()).to(topicExchange()).with(MqConstans.HOTEL_DELETE_KEY);
}
}
在hotel-admin中也创建一个类保存交换机,队列,绑定关系
发送消息:
当数据库内的数据发送修改,删除,新增的时候
修改对应的es
在hotel-admin中的controller中修改:
1.PostMapping:(新增)
加入
rabbitTemplate.convertAndSend(MqConstans.HOTEL_EXCHANGE,MqConstans.HOTEL_INSERT_KEY,hotel.getId());
}
2.PutMapping中(修改)
加入
rabbitTemplate.convertAndSend(MqConstans.HOTEL_EXCHANGE,MqConstans.HOTEL_INSERT_KEY,hotel.getId());
3.deleteMapping中加入(删除)
rabbitTemplate.convertAndSend(MqConstans.HOTEL_EXCHANGE,MqConstans.HOTEL_DELETE_KEY,id);
在hotel-demo中:
创建监听队列
定义一个类
@Component
public class hotelLister {
@Autowired
private IHotelService hotelService;
//监听酒店新增或删除的业务
@RabbitListener(queues = MqConstans.HOTEL_INSERT_QUEUE)
public void listernHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
//监听酒店新增或删除的业务
@RabbitListener(queues = MqConstans.HOTEL_DELETE_QUEUE)
public void listernHotelDelete(Long id){
hotelService.deleteById(id);
}
}
创建insertById(id)和deleteById(id)方法
@Override
public void deleteById(Long id) {
try {
//准备request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
//准备发送请求
client.delete(request, RequestOptions.DEFAULT);
}catch (Exception e){
throw new RuntimeException();
}
}
@Override
public void insertById(Long id) {
try {
//根据id查询酒店数据
Hotel hotel = getById(id);
//转换对象
HotelDoc hotelDoc = new HotelDoc(hotel);
//准备request
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
//准备JSON文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
//发送请求
client.index(request, RequestOptions.DEFAULT);
}catch (Exception e){
throw new RuntimeException(e);
}
}