• 消息通讯-MQTT WebHook&SpringBoot案例


    一、EMQX WebHook介绍

    1、EMQX WebHook 是由 emqx_web_hook (opens new window)插件提供的将EMQX中的钩子事件通知到某个Web服务的功能。
    2、WebHook 的内部实现是基于钩子,借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。

    3、它通过在钩子上的挂载回调函数,获取到 EMQX 中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。

    以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:

    1. Client | EMQX | emqx_web_hook | HTTP +------------+
    2. =============>| - - - - - - -> - - - - - - - ->===========> | Web Server |
    3. | Broker | | Request +------------+

    注意:WebHook 对于事件的处理是单向的,它仅支持将 EMQX 中的事件推送给 Web 服务,并不关心 Web 服务的返回。

    二、EMQX WebHook配置项

    Webhook 的配置文件位于 etc/plugins/emqx_web_hook.conf,配置项的详细说明可以查看官方文档配置

    在默认etc/plugins/emqx_web_hook.conf中,官方为我们提供了如下的配置触发规则:

    1. ##====================================================================
    2. ## WebHook
    3. ##====================================================================
    4. web.hook.api.url = http://127.0.0.1:80
    5. ## Format:
    6. ## web.hook.rule.<HookName>.<No> = <Spec>
    7. #web.hook.rule.client.connect.1 = {"action": "on_client_connect"}
    8. #web.hook.rule.client.connack.1 = {"action": "on_client_connack"}
    9. #web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
    10. #web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
    11. #web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
    12. #web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"}
    13. #web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"}
    14. #web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
    15. #web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"}
    16. #web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
    17. #web.hook.rule.message.delivered.1 = {"action": "on_message_delivered"}
    18. #web.hook.rule.message.acked.1 = {"action": "on_message_acked"}

    其中webhook支持的事件如下:

    三、如何使用EMQX WebHook配置

    手动修改Webhook的配置文件位于:etc/plugins/emqx_web_hook.conf.
    修改配置非常简单,打开指定文件后,官方已经写好这些了,如上面所示,
    只需要修改web.hook.url,和在你需要的钩子事件前的#号去掉即可。

    emqx 启用插件 webhook插件(emqx有web控制台,在插件里启用就可以)

    刷新后emqx_web_hook插件运行中即启动成功

    然后就是自行搭建一个web接口服务,当客户端有指定事件后会主动推送,
    接口地址即为Webhook的配置文件中配置的web.hook.api.url

    WebHook配置事件推送参数详解

    1、client.connect(处理连接报文)

     

    2、client.connack(下发连接应答)

    3、client.connected(成功接入)

    4、client.disconnected(连接断开)

    5、client.subscribe(订阅主题)

    6、client.unsubscribe(取消订阅)

    session.subscribed(会话订阅主题)
    同 client.subscribe,action 为 session_subscribed

    session.unsubscribed(会话取消订阅)
    同 client.unsubscribe,action 为 session_unsubscribe

    1、message.publish(消息发布)
    2、message.delivered(消息投递)

    3、message.acked(消息回执)

    四、SpringBoot集成Webhook实现客户端断连监控

    通过使用Emqx的Webhook插件实现监控所有客户端的连接状态,并做出客户端上线和下线后的逻辑处理

    1. 实现前提

    修改 etc/plugins/emqx_web_hook.conf 文件,设置事件转发的url和地址和触发规则
     

    1. # 事件需要转发的目的服务器地址
    2. web.hook.api.url = http://127.0.0.1:8000/api/mqtt/webhook
    3. # 打开下面两个触发规则
    4. web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
    5. web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
    2. 代码实现接口

    通过EMQX 的webhook将客户端的连接断开等事件通知到我们自建的服务上,通过事件类型获取客户端的连接状态,然后将客户端的连接状态进行存储,并且提供HTTP API供后台系统查询所有客户端的状态。

    1. package com.team.modules.mqtt.controller;
    2. import com.google.gson.Gson;
    3. import com.team.modules.mqtt.client.MyMQTTClient;
    4. import com.team.modules.mqtt.service.SenderService;
    5. import com.team.modules.system.domain.vo.ResultVO;
    6. import lombok.extern.slf4j.Slf4j;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.stereotype.Controller;
    9. import org.springframework.web.bind.annotation.*;
    10. import java.util.HashMap;
    11. import java.util.Map;
    12. @RestController
    13. @Slf4j
    14. @RequestMapping("/api/mqtt")
    15. public class SenderController {
    16. @Autowired
    17. private MyMQTTClient myMQTTClient;
    18. @PostMapping("/webhook")
    19. public void webhook(@RequestBody() Map message) {
    20. // 监听连接的上线和下线
    21. System.out.println("收到的消息:" + message);
    22. String action = (String) message.get("action");
    23. String clientid = (String) message.get("clientid");
    24. if (action.equals("client_connected")) {
    25. log.info("client:{} 上线", clientid);
    26. }
    27. if (action.equals("client_disconnected")) {
    28. log.info("client:{} 下线", clientid);
    29. }
    30. }
    31. }
    3. 监听结果

    启动服务后,模拟客户端连接emqx,查看服务记录的日志:

    五、总结

    由此看出客户端建立连接和断开连接后,Emqx服务都会给配置的接口web.hook.api.url = http://127.0.0.1:8000/api/mqtt/webhook发送一条数据,该数据字段可以参考上述的:WebHook配置事件推送参数详解。使用此方法可以将mqtt应用的更加灵活。其他的监听事件配置也类似,遇到合适场景可以自行发挥。
     

  • 相关阅读:
    理解make/makefile/cmake/qmake和Makefile编写规则
    CDR插件开发之Addon插件004 - VS2022开发环境简介及个性化配置
    LINUX之进程管理
    计算机网络:数据报与虚电路
    什么耳机戴着舒服耳朵不疼?不塞耳道的骨传导耳机
    VS Code插件 — Settings Sync : 设置和同步用户配置
    lxparse:解析列表页链接和详情页内容
    java List截取
    阿里云服务器上部署Flask
    【AGC】崩溃服务之常见问题
  • 原文地址:https://blog.csdn.net/Knight_Key/article/details/134454522