• 如何实现MQTT的Java代码


    MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅]范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。

    为rabbit开启mqtt

    1.在yml文件中添加一个mqtt的端口映射1883:1883

    1. "prettyprint hljs less" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">restart: always
    2. container_name: rabbitmq
    3. ports:
    4. - 5672:5672
    5. - 15672:15672
    6. - 1883:1883 #mqtt
    7. volumes:
    8. - ./data:/var/lib/rabbitmq
  • 2.进入rabbit的docker容器内部

    <pre class="hljs nginx" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 0.75em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">docker exec -it rabbitmq bashpre>
    

    3.rabbit内运行

    <pre class="hljs bash" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 0.75em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">rabbitmq-plugins enable rabbitmq_mqttpre>
    

    4.在网页视图中查看mqtt

    使用MQTT软件测试mqtt

    1.连接mqtt

    2.在MQTT软中添加订阅

    在RabbitMQ的队列中查看

    3.测试

    方法一、在Rabbitmq网页发送消息

    方法二、自己给自己发

    一、发送消息

    1. 创建springBoot项目,在xml中导入springBoot项目所需要配置以及相关依赖包
    1. <pre class="prettyprint hljs xml" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"><parent>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-parentartifactId>
    4. <version>2.6.8version>
    5. parent>
    6. <dependencies>
    7. <dependency>
    8. <groupId>org.springframework.bootgroupId>
    9. <artifactId>spring-boot-starter-webartifactId>
    10. dependency>
    11. <dependency>
    12. <groupId>org.springframework.bootgroupId>
    13. <artifactId>spring-boot-starter-integrationartifactId>
    14. dependency>
    15. <dependency>
    16. <groupId>org.springframework.integrationgroupId>
    17. <artifactId>spring-integration-mqttartifactId>
    18. dependency>
    19. <dependency>
    20. <groupId>org.springframework.integrationgroupId>
    21. <artifactId>spring-integration-streamartifactId>
    22. dependency>
    23. <dependency>
    24. <groupId>org.projectlombokgroupId>
    25. <artifactId>lombokartifactId>
    26. dependency>
    27. dependencies>pre>

    注意寻找依赖包:spring.io->projects->LEARN(右边)->2.6.10 GA Refence Doc.-> Documentation Overview ->7.Messaging-> Spring Integration: Auto-configuration for Spring Integration

    1. 创建配置类(2步骤可忽略,此地只为推导使用,配置类的正确使用方式见步骤6)
    1. class="prettyprint hljs java" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">//修正官网后的(这个配置文件还不可以使用,正确的使用方式见在后面--此地是视频课程讲的讲解中推导步骤,可以省略不看)
    2. @Configuration
    3. public class MqttConfig {
    4. @Bean
    5. public MessageChannel mqttInputChannel() {
    6. return new DirectChannel();
    7. }
    8. /**
    9. * 连接mqtt服务器的工厂
    10. * @return
    11. */
    12. @Bean
    13. public MqttPahoClientFactory mqttClientFactory() {
    14. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    15. MqttConnectOptions options = new MqttConnectOptions();
    16. options.setServerURIs(new String[] {
    17. "tcp://10.9.48.165:1883" });
    18. options.setUserName("guest");
    19. options.setPassword("guest".toCharArray());
    20. factory.setConnectionOptions(options);
    21. return factory;
    22. }
    23. @Bean
    24. public MessageProducer inbound(MessageChannel mqttInputChannel,MqttPahoClientFactory mqttClientFactory) {
    25. MqttPahoMessageDrivenChannelAdapter adapter =
    26. new MqttPahoMessageDrivenChannelAdapter("springclient",mqttClientFactory,
    27. "zheshisha");
    28. adapter.setCompletionTimeout(5000);
    29. adapter.setConverter(new DefaultPahoMessageConverter());
    30. //设置一次需要应答
    31. adapter.setQos(1);
    32. //设置对外的通道
    33. adapter.setOutputChannel(mqttInputChannel);
    34. return adapter;
    35. }
    36. }
  1. class="prettyprint hljs java" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">//官方文档
  2. @Bean
  3. public MessageChannel mqttInputChannel() {
  4. return new DirectChannel();
  5. }
  6. @Bean
  7. public MessageProducer inbound() {
  8. MqttPahoMessageDrivenChannelAdapter adapter =
  9. new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
  10. "topic1", "topic2");
  11. adapter.setCompletionTimeout(5000);
  12. adapter.setConverter(new DefaultPahoMessageConverter());
  13. adapter.setQos(1);
  14. //问题所在:mqttInputChannel()这个是调用方法,而在这个方法上面加一一个注解@Bean相当于白加~~~
  15. adapter.setOutputChannel(mqttInputChannel());
  16. return adapter;
  17. }
  1. 创建接口
  1. class="prettyprint hljs kotlin" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@MessagingGateway(defaultRequestChannel =  "mqttOutboundChannel")
  2. public interface MyGateway {
  3. void sendToMqtt(String data);
  4. }
  1. 启动类
  1. class="prettyprint hljs less" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@SpringBootApplication
  2. //扫描整合的注解
  3. @IntegrationComponentScan
  4. public class MqttStartApp {
  5. public static void main(String[] args) {
  6. SpringApplication.run(MqttStartApp.class, args);
  7. }
  8. }
  1. 编写controller类测试
  1. class="prettyprint hljs kotlin" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@RestController
  2. public class MqttController {
  3. private MyGateway myGateway;
  4. @Autowired
  5. public void setMyGateway(MyGateway myGateway) {
  6. this.myGateway = myGateway;
  7. }
  8. @PostMapping("/msg")
  9. public String sendMsg(String msg){
  10. myGateway.sendToMqtt(msg);
  11. return "success";
  12. }
  13. }
  1. 修改后的配置类
  1. class="prettyprint hljs java" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Configuration
  2. public class MqttConfig {
  3. /**
  4. * 连接mqtt服务器的工厂
  5. * @return
  6. */
  7. @Bean
  8. public MqttPahoClientFactory mqttClientFactory() {
  9. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  10. MqttConnectOptions options = new MqttConnectOptions();
  11. options.setServerURIs(new String[] {
  12. "tcp://10.9.48.165:1883" });
  13. options.setUserName("guest");
  14. options.setPassword("guest".toCharArray());
  15. factory.setConnectionOptions(options);
  16. return factory;
  17. }
  18. @Bean
  19. public MessageChannel mqttOutboundChannel() {
  20. return new DirectChannel();
  21. }
  22. @Bean
  23. @ServiceActivator(inputChannel = "mqttOutboundChannel") //inputChannel的名字必须和上面的MessageChannel的方法名保持一致
  24. public MessageHandler mqttOutbound() {
  25. MqttPahoMessageHandler messageHandler =
  26. new MqttPahoMessageHandler("testClient", mqttClientFactory());
  27. messageHandler.setAsync(true);
  28. messageHandler.setDefaultTopic("zheshisha");
  29. return messageHandler;
  30. }
  31. }

二、收消息

在配配置文件中加入

  1. class="prettyprint hljs java" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">/**
  2. * 收消息的通道,注意实际开发中和发的可能不在一起
  3. * @return
  4. */
  5. @Bean
  6. public MessageChannel mqttInputChannel() {
  7. return new DirectChannel();
  8. }
  9. @Bean
  10. public MessageProducer inbound() {
  11. MqttPahoMessageDrivenChannelAdapter adapter =
  12. new MqttPahoMessageDrivenChannelAdapter("tcp://10.9.48.165:1883", "testClient",
  13. "chixihua");
  14. adapter.setCompletionTimeout(5000);
  15. adapter.setConverter(new DefaultPahoMessageConverter());
  16. adapter.setQos(1);
  17. adapter.setOutputChannel(mqttInputChannel());
  18. return adapter;
  19. }
  20. /**
  21. * 收消息的处理器,用于如何处理消息
  22. * mqttInputChannel 代表的是收消息的通道对象的id
  23. * @return
  24. */
  25. @Bean
  26. @ServiceActivator(inputChannel = "mqttInputChannel")
  27. public MessageHandler handler() {
  28. return new MessageHandler() {
  29. @Override
  30. public void handleMessage(Message message) throws MessagingException {
  31. System.out.println(message.getPayload());
  32. }
  33. };
  34. }

三、SpringBoot整合MQTT

  1. 导入依赖包
  1. <pre class="prettyprint hljs xml" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
  2. <dependency>
  3. <groupId>org.springframework.bootgroupId>
  4. <artifactId>spring-boot-starter-integrationartifactId>
  5. dependency>
  6. <dependency>
  7. <groupId>org.springframework.integrationgroupId>
  8. <artifactId>spring-integration-mqttartifactId>
  9. dependency>
  10. <dependency>
  11. <groupId>org.springframework.integrationgroupId>
  12. <artifactId>spring-integration-streamartifactId>
  13. dependency>pre>
  1. 创建配置类
  1. class="prettyprint hljs java" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Configuration
  2. public class MqttConfig {
  3. @Bean
  4. public MqttConnectOptions options(){
  5. MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
  6. mqttConnectOptions.setServerURIs(new String[] {
  7. "tcp://10.9.48.190:1883" });
  8. mqttConnectOptions.setUserName("dc3");
  9. mqttConnectOptions.setPassword("dc3".toCharArray());
  10. return mqttConnectOptions;
  11. }
  12. /**
  13. * 创建连接工厂
  14. * @param options
  15. * @return
  16. */
  17. @Bean
  18. public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions options){
  19. DefaultMqttPahoClientFactory defaultMqttPahoClientFactory=new DefaultMqttPahoClientFactory();
  20. defaultMqttPahoClientFactory.setConnectionOptions(options);
  21. return defaultMqttPahoClientFactory;
  22. }
  23. @Bean
  24. public MessageChannel messageInputChannel(){
  25. return new DirectChannel();
  26. }
  27. @Bean
  28. public MessageProducer mqttInbound(MessageChannel messageInputChannel, MqttPahoClientFactory mqttPahoClientFactory){
  29. MqttPahoMessageDrivenChannelAdapter adapter =
  30. new MqttPahoMessageDrivenChannelAdapter("testClient",mqttPahoClientFactory, "chixihua");
  31. adapter.setCompletionTimeout(5000);
  32. adapter.setConverter(new DefaultPahoMessageConverter());
  33. adapter.setQos(1);
  34. adapter.setOutputChannel(messageInputChannel);
  35. return adapter;
  36. }
  37. }
  1. 配置消息处理的类
  1. class="prettyprint hljs kotlin" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Configuration
  2. public class MessageReceiverHandler {
  3. /**
  4. * 收到设备发送来的上行数据的时候执行,具体怎么做取决于业务,比如这里面可能是设备发来的一些传感器数据,我们需要保存并发送到统计平台
  5. * @return
  6. */
  7. @Bean
  8. @ServiceActivator(inputChannel = "messageInputChannel")
  9. public MessageHandler messageHandler(){
  10. return message -> {
  11. //获取到消息正文
  12. Object payload = message.getPayload();
  13. System.err.println(payload);
  14. //处理消息
  15. System.err.println("等下就处理消息");
  16. };
  17. }
  18. }
  1. 在启动类添加注解
  1. class="prettyprint hljs less" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@SpringBootApplication
  2. @IntegrationComponentScan
  3. public class MqttStartApp {
  4. public static void main(String[] args) {
  5. SpringApplication.run(MqttStartApp.class, args);
  6. }
  7. }
  • 相关阅读:
    Android案例手册 - 实现一个华容道拼图游戏
    lowbit()运算及原码、反码、补码
    Polygon zkEVM R1CS与Plonk电路转换
    如何发高质量的外链?
    ELK体系部署文档(elk+filebeat+redis)
    宣泰医药通过注册:拟募资6亿 联和投资是大股东
    快速生成力扣链表题的链表,实现快速调试
    js 中的错误类型及处理方式
    DPDK vhost库
    虚拟机安装openEuler系统
  • 原文地址:https://blog.csdn.net/Java_ttcd/article/details/126133907