上一篇:RabbitMQ基础知识
这里无需指定版本号,让其跟着SpringBoot版本走。本示例使用SpringBoot版本号为2.7.10。
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
-
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
创建两个SpringBoot应用,模拟消息生产者与消费者【publisher、consumer】。
编写配置文件,用户名和密码等自行修改 这里虚拟机的名称是上一篇文章中新建的。
- server.port=8082
- #rabbitmq服务器ip
- spring.rabbitmq.host=localhost
- #rabbitmq的端口
- spring.rabbitmq.port=5672
- #用户名
- spring.rabbitmq.username=用户名
- #密码
- spring.rabbitmq.password=密码
- #配置虚拟机
- spring.rabbitmq.virtual-host=demo
声明交换机、队列并绑定:
- @Configuration
- public class RabbitMqConfig {
-
- @Bean
- public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
- return rabbitTemplate;
- }
-
- @Bean
- public MessageConverter jackson2JsonMessageConverter() {
- return new Jackson2JsonMessageConverter();
- }
-
- @Bean
- public DirectExchange getExchange(){
- return new DirectExchange("directExchange",false,false);
- }
-
- @Bean
- public Queue getQueue(){
- return new Queue("publisher.addUser",true,false,false);
- }
-
- @Bean
- public Binding getBinding(DirectExchange exchange,Queue queue){
- return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
- }
- }
新建User实体类
- @Data
- public class User {
- private Long id;
- private String name;
- private String desc;
- }
在方法中使用RabbitTemplate来发送消息:
- public interface PublisherService {
-
- /**
- * 添加用户
- * @param user 用户信息
- */
- void addUser(User user);
-
- }
- @RequiredArgsConstructor
- @Service
- public class PublisherServiceImpl implements PublisherService{
-
- private final RabbitTemplate rabbitTemplate;
-
- @Override
- public void addUser(User user) {
- rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user);
- }
- }
以上需要注意的就是交换机的名称、队列名、routingKey。示例中使用的是直连交换机,routingKey需要和队列名保持一致。不懂的可以查看上一篇文章。
controller:
- @RequiredArgsConstructor
- @RestController
- @RequestMapping("/user")
- public class UserController {
-
-
- private final PublisherService publisherService;
-
- @PostMapping("/add")
- public void add(){
- User user = new User();
- user.setId(1000L);
- user.setName("黄忠");
- user.setDesc("老兵不死,只是逐渐凋零");
- publisherService.addUser(user);
- }
-
- }
消费者的配置和生产者一样,不赘述了,直接看代码:
- @Service
- @Slf4j
- public class ConsumerService {
-
-
- @RabbitListener(queues ="publisher.addUser")
- public void addUser(String userStr){
- User user = JSONObject.parseObject(userStr,User.class);
- log.info(user.toString());
- }
- }
@RabbitListener 注解是指定某方法作为消息消费的方法,指定队列名称。@RabbitListener 如果标注在类上,需配合 @RabbitHandler 注解一起使用,根据接受的参数类型进入具体的方法中。
消费端在启动时可能会报找不到交换机或队列,只需要让生产者发送一次消息,从控制台就可以看到相关的交换机和队列等信息了。

可以看到消费者成功消费了消息:


通过上述操作,我们已经会简单地使用RabbitMQ了,接下来了解一下它的整个流程。如此可以让我们掌握的更牢固。

生产者:
消费者: