• SpringBoot 集成 IBMMQ 代码示例教程



    前言

    SpringBoot 集成 IBMMQ,实现两个服务间的消息通信。IBMMQ 版本 7.5.0.2,兼容其他版本。实现容器 Listener 事件执行 MQ 初始化及销毁,观察者模式消息通知策略,IBMMQ 断线自动重连策略。

    由于整体结构内容较多,以下仅针对主要的类进行了解析。

    了解更多:

    demo资源地址:https://download.csdn.net/download/demo_yo/88417742


    项目结构如下:
    在这里插入图片描述
    mq 模块结构如下:
    在这里插入图片描述

    一、集成 IBMMQ

    1. pom 依赖

    <dependency>
        <groupId>com.ibm.mqgroupId>
        <artifactId>com.ibm.mq.allclientartifactId>
        <version>9.1.1.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. yml 配置

    # esb-mq配置
    esb:
      mq:
        enabled: true                       # 是否开启
        host: 127.0.0.1                     # 服务地址
        port: 1414                          # 服务端口
        queue-manager: QUEUE_MANAGER        # 队列管理器
        ccs-id: 1208                        # 编码
        channel: CHANNEL                    # 连接通道
        in-queue: IN_QUEUE                  # 发送队列
        out-queue: OUT_QUEUE                # 接收队列
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    esb 为自己对接的项目名。

    3. Properties 配置类

    package com.example.demo.config;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    @Data
    @Configuration
    @ConfigurationProperties(prefix = "esb.mq")
    public class EsbMqProperties {
    
        /**
         * 是否开启
         */
        private boolean enabled;
    
        /**
         * 服务器地址
         */
        private String host;
    
        /**
         * 端口
         */
        public int port;
    
        /**
         * 队列管理器名
         */
        private String queueManager;
    
        /**
         * 服务器MQ服务使用的编1381代表GBK,1208代表UTF(Coded Character Set Identifier:CCSID)
         */
        private int ccsId = 1208;
    
        /**
         * 连接通道
         */
        private String channel;
    
        /**
         * 输入队列名(用于发送消息)
         */
        private String inQueue;
    
        /**
         * 输出队列名(用于获取消息)
         */
        private String outQueue;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    4. Listener 初始化事件监听器

    作用:

    1. 注册消息观察者。
    2. 容器启动时,初始化mq队列管理器连接。
    3. 容器销毁时,销毁mq队列管理器连接。

    继承ServletContextListener类,实现spring容器启动时自动进行mq初始化,以及spring容器销毁时自动进行mq销毁。采用Runnable子线程方式来完成mq初始化,不影响主线程启动。

    package com.example.demo.listener;
    
    import com.example.demo.config.EsbMqProperties;
    import com.example.demo.esb.EsbObserverService;
    import com.example.demo.esb.mq.EsbMqFactory;
    import com.example.demo.esb.mq.EsbMqMsgGet;
    import com.example.demo.esb.mq.thread.EsbMqGetMsgRunnable;
    import com.example.demo.esb.observer.IEsbObserver;
    import com.example.demo.esb.observer.annotation.EsbObserver;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.context.WebApplicationContext;
    import org.springframework.web.context.support.WebApplicationContextUtils;
    
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    import javax.servlet.annotation.WebListener;
    import java.util.Map;
    
    /**
     * ESB-MQ连接-事件处理器
     *
     * @author Baien
     * @date 2023/9/16 13:41:37
     */
    @Slf4j
    @WebListener
    public class EsbMqListener implements ServletContextListener {
    
        /**
         * mq初始化
         * 

    * 当Servlet容器启动Web应用时调用该方法。 * 在调用完该方法之后,容器再对Filter初始化,并且对那些在Web应用启动时就需要被初始化的Servlet进行初始化。 * * @param sce Information about the ServletContext that was initialized */ @Override public void contextInitialized(ServletContextEvent sce) { WebApplicationContext appContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext()); EsbMqProperties esbMqProperties = appContext.getBean(EsbMqProperties.class); // 如果mq未开启,则返回 if (!esbMqProperties.isEnabled()) { log.info("esb-mq配置未开启"); return; } log.info("注册消息观察者..."); // 注册消息观察者 EsbObserverService esbObserverService = appContext.getBean(EsbObserverService.class); Map<String, Object> observerBeans = appContext.getBeansWithAnnotation(EsbObserver.class); for (Object observerBean : observerBeans.values()) { // 注册消息观察者 esbObserverService.registerObserver((IEsbObserver) observerBean); } // 开启线程:mq连接初始化 + 分发mq消息 EsbMqFactory esbMqFactory = appContext.getBean(EsbMqFactory.class); EsbMqMsgGet esbMqMsgGet = appContext.getBean(EsbMqMsgGet.class); EsbMqGetMsgRunnable esbMqGetMsgRunnable = new EsbMqGetMsgRunnable(esbMqFactory, esbMqMsgGet); Thread thread = new Thread(esbMqGetMsgRunnable); thread.start(); } /** * mq销毁 *

    * 当Servlet容器终止Web应用时调用该方法。在调用该方法之前,容器会先销毁所有的Servlet和Filter过滤器。 * * @param sce Information about the ServletContext that was destroyed */ @Override public void contextDestroyed(ServletContextEvent sce) { log.info("进入mq销毁程序"); WebApplicationContext appContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext()); EsbMqFactory esbMqFactory = appContext.getBean(EsbMqFactory.class); esbMqFactory.disConnect(esbMqFactory.getMqQueueManager()); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    5. Runnable 初始化与获取消息线程

    package com.example.demo.esb.mq.thread;
    
    import com.example.demo.esb.mq.EsbMqFactory;
    import com.example.demo.esb.mq.EsbMqMsgGet;
    import com.example.demo.esb.mq.model.EsbMqStatus;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * esb-mq获取消息runnable
     *
     * @author Baien
     * @date 2023/11/30 11:13:59
     */
    @Slf4j
    public class EsbMqGetMsgRunnable implements Runnable {
    
        private final EsbMqFactory esbMqFactory;
    
        private final EsbMqMsgGet esbMqMsgGet;
    
        public EsbMqGetMsgRunnable(EsbMqFactory esbMqFactory, EsbMqMsgGet esbMqMsgGet) {
            this.esbMqFactory = esbMqFactory;
            this.esbMqMsgGet = esbMqMsgGet;
        }
    
        @Override
        public void run() {
            /*--------------------1.mq连接初始化--------------------*/
            log.info("=============== mq连接初始化开始 ===============");
            log.info("初始化mq连接...");
            // 初始化队列管理器
            esbMqFactory.initMqQueueManager();
            log.info("=============== mq连接初始化结束 ===============");
    
            /*--------------------2.分发mq消息--------------------*/
            log.info("开始获取mq消息...");
            // 循环检测队列消息进行分发
            while (true) {
                try {
                    EsbMqStatus esbMqStatus = new EsbMqStatus(true); // mq连接状态对象
                    // 获取消息
                    esbMqMsgGet.getMessage(esbMqStatus);
    
                    // 如果mq连接异常,则进入延时(10s)(防止队列管理器连接失效时频繁访问队列,不能设置过长避免队列管理器连接重连后等待时间过长)
                    if (!esbMqStatus.getConnected()) {
                        Thread.sleep(10000);
                    }
                } catch (Exception e) {
                    log.error("循环分发消息异常!{}", e.getMessage());
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    6. Factory 连接工厂类

    mq连接管理工厂,用于管理队列管理器。

    使用ConcurrentHashMap实现线程间共享及并发。创建mq管理器时,利用map的put方法替换掉之前的管理器实例,保证共用全局唯一有效的mq管理器实例,其他类获取mq管理器实例只能通过EsbMqFactory工厂类提供的get方法,避免mq管理器实例的频繁创建,节约资源。初始化需要访问的消息队列,保存打开消息队列的配置信息,放入ConcurrentHashMap,避免队列打开方式的重复构建,方便后续的队列连接。

    package com.example.demo.esb.mq;
    
    import cn.hutool.core.util.ObjectUtil;
    import com.alibaba.fastjson.JSONObject;
    import com.example.demo.config.EsbMqProperties;
    import com.example.demo.esb.constant.EsbConstants;
    import com.example.demo.esb.mq.queue.AbsTractEsbMqQueue;
    import com.example.demo.esb.mq.queue.EsbMqOutQueue;
    import com.ibm.mq.MQException;
    import com.ibm.mq.MQQueueManager;
    import com.ibm.mq.constants.MQConstants;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import java.util.Hashtable;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * mq连接工厂类
     *
     * @author Baien
     * @date 2023/9/16 13:41:37
     */
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class EsbMqFactory {
    
        private static ConcurrentHashMap<String, MQQueueManager> mqQueueManagerMap = new ConcurrentHashMap<>(); // mq队列管理器map
    
        private static ConcurrentHashMap<String, AbsTractEsbMqQueue> abstractEsbMqQueueMap = new ConcurrentHashMap<>(); // mq队列map
    
        private final EsbMqProperties esbMqProperties;
    
        /**
         * 初始化队列管理器
         *
         * @author Baien
         * @date 2023/9/16 13:41:37
         */
        public void initMqQueueManager() {
            // 输出配置信息
            log.info("配置信息:{}", JSONObject.toJSONString(esbMqProperties));
            // 初始化队列
            initEsbMqQueue();
            // 创建队列管理器
            createMqQueueManager();
        }
    
        /**
         * 获取队列
         *
         * @param queueName 队列名称
         * @return 队列
         * @author Baien
         * @date 2023/9/16 13:41:37
         */
        public AbsTractEsbMqQueue getEsbMqQueueMap(String queueName) {
            return abstractEsbMqQueueMap.get(queueName);
        }
    
        /**
         * 获取队列管理器
         *
         * @return 队列管理器
         * @author Baien
         * @date 2023/9/16 13:41:37
         */
        public MQQueueManager getMqQueueManager() {
            return mqQueueManagerMap.get(esbMqProperties.getQueueManager());
        }
    
        /**
         * 创建队列管理器
         *
         * @return 队列管理器
         * @author Baien
         * @date 2023/9/16 13:41:37
         */
        public MQQueueManager createMqQueueManager() {
            // 配置队列管理器参数
            Hashtable<String, Object> properties = new Hashtable<>();
            properties.put(MQConstants.HOST_NAME_PROPERTY, esbMqProperties.getHost());        // 服务器地址
            properties.put(MQConstants.PORT_PROPERTY, esbMqProperties.getPort());             // 端口号
            properties.put(MQConstants.CHANNEL_PROPERTY, esbMqProperties.getChannel());       // 连接通道
            properties.put(MQConstants.CCSID_PROPERTY, esbMqProperties.getCcsId());           // MQ服务使用的编码(CCSID):1381代表GBK,1208代表UTF
            properties.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES); // 传输类型
    
            MQQueueManager mqQueueManager;
            try {
                // 创建队列管理器连接
                mqQueueManager = new MQQueueManager(esbMqProperties.getQueueManager(), properties);
            } catch (MQException e) {
                if (ObjectUtil.equals(e.getReason(), EsbConstants.MqReasonCode.MQRC_Q_MGR_NOT_AVAILABLE.getCode())) {
                    log.error("队列管理器创建失败!队列管理器连接不可用");
                } else {
                    log.error("队列管理器创建失败!{}", e.getMessage());
                }
                return null;
            }
    
            // 销毁之前的队列管理器连接(新连接创建成功后再销毁失效连接,始终保证持有一个队列管理器连接实例,即使它是连接失效的)
            disConnect(mqQueueManagerMap.get(esbMqProperties.getQueueManager()));
            // 放入map
            mqQueueManagerMap.put(esbMqProperties.getQueueManager(), mqQueueManager);
            log.info("队列管理器创建成功");
    
            // 检测队列连接状态
            checkQueueConnected(mqQueueManager);
    
            return mqQueueManager;
        }
    
        /**
         * 初始化队列
         *
         * @author Baien
         * @date 2023/9/16 13:41:37
         */
        private void initEsbMqQueue() {
            // 输入队列(未启用)
    //        EsbMqInQueue inQueue = new EsbMqInQueue(esbProperties.getInQueueName());
    //        abstractEsbMqQueueMap.put(inQueue.getQueueName(), inQueue);
    
            // 输出队列
            EsbMqOutQueue outQueue = new EsbMqOutQueue(esbMqProperties.getOutQueue());
            abstractEsbMqQueueMap.put(outQueue.getQueueName(), outQueue);
    
            log.info("队列初始化列表:{}", JSONObject.toJSONString(abstractEsbMqQueueMap.keySet()));
        }
    
        /**
         * 检测队列连接状态
         *
         * @param mqQueueManager 队列管理器
         * @return 所有队列连接状态
         */
        public boolean checkQueueConnected(MQQueueManager mqQueueManager) {
            boolean isAllConnected = true; // 所有队列连接状态
            // 检测mq队列连接
            for (AbsTractEsbMqQueue absTractEsbMqQueue : abstractEsbMqQueueMap.values()) {
                boolean isConnected = absTractEsbMqQueue.isConnected(mqQueueManager); // 队列连接状态
                if (isAllConnected && !isConnected) {
                    isAllConnected = false;
                }
            }
            return isAllConnected;
        }
    
        /**
         * 断开队列管理器连接
         *
         * @param mqQueueManager 队列管理器
         * @author Baien
         * @date 2023/9/16 13:41:37
         */
        public void disConnect(MQQueueManager mqQueueManager) {
            if (mqQueueManager == null) {
                return;
            }
            try {
                // 断开连接
                mqQueueManager.disconnect();
                // 从map中移除
                mqQueueManagerMap.remove(mqQueueManager.getName());
                log.info("队列管理器连接销毁成功");
            } catch (MQException e) {
                try {
                    // 第二次断开连接(防止队列管理器连接失效时第一次断开连接产生异常,执行二次销毁确认。)
                    // (队列管理器连接失效时第一次主动断开连接,会销毁所有连接资源,在主动close关闭连接时产生连接失效异常,将连接状态设置为false)
                    mqQueueManager.disconnect();
                    // 从map中移除
                    mqQueueManagerMap.remove(mqQueueManager.getName());
                    log.info("队列管理器连接销毁成功");
                } catch (MQException ex) {
                    log.error("队列管理器连接销毁异常!{}", ex.getMessage());
                }
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183

    7. MQ 消息发送服务类

    package com.example.demo.esb.mq;
    
    import cn.hutool.core.util.StrUtil;
    import com.alibaba.fastjson.JSONObject;
    import com.example.demo.config.EsbMqProperties;
    import com.example.demo.esb.EsbObserverService;
    import com.example.demo.esb.constant.EsbConstants;
    import com.example.demo.esb.mq.model.EsbMqStatus;
    import com.example.demo.esb.mq.queue.AbsTractEsbMqQueue;
    import com.ibm.mq.*;
    import com.ibm.mq.constants.MQConstants;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    
    /**
     * esb-mq获取消息服务
     *
     * @author Baien
     * @date 2023/9/16 13:41:37
     */
    @Slf4j
    @Service
    @RequiredArgsConstructor
    public class EsbMqMsgGet {
    
        private final EsbMqProperties esbMqProperties;
    
        private final EsbMqFactory esbMqFactory;
    
        private final EsbMqService esbMqService;
    
        private final EsbObserverService esbObserverService;
    
        /**
         * 获取消息
         * 

    * 获取消息,并分发给目标服务。 * * @param esbMqStatus mq连接状态对象 * @author Baien * @date 2023/9/16 13:41:37 */ public void getMessage(EsbMqStatus esbMqStatus) { // 获取队列管理器 MQQueueManager mqQueueManager = esbMqFactory.getMqQueueManager(); if (mqQueueManager == null || !mqQueueManager.isConnected()) { log.error("获取消息失败!队列管理器连接失效"); esbMqStatus.setConnected(false); // 设置mq连接状态 return; } // 获取输出队列 AbsTractEsbMqQueue outQueue = esbMqFactory.getEsbMqQueueMap(esbMqProperties.getOutQueue()); if (outQueue == null) { log.error("获取消息失败!未查询到目标队列:{}", esbMqProperties.getOutQueue()); return; } // 获取mq队列消息 MQMessage mqMessage = getMqMessage(mqQueueManager, outQueue, esbMqStatus); if (mqMessage == null) { return; } log.info("=============== mq分发消息开始 ==============="); log.info("{} 队列收到消息,进行分发...", outQueue.getQueueName()); // 消息分发 try { // 解析mq消息内容 JSONObject esbHeader; String message; try { esbHeader = JSONObject.parseObject(mqMessage.getStringProperty(EsbConstants.ESB_MESSAGE_HEADER)); message = mqMessage.readStringOfByteLength(mqMessage.getDataLength()); } catch (Exception e) { log.error("消息解析异常!{}", e.getMessage()); log.error("消息内容:{}", JSONObject.toJSONString(mqMessage)); mqQueueManager.commit(); // 提交事务 log.info("=============== mq分发消息结束 ==============="); return; } log.info("消息头:{}", JSONObject.toJSONString(esbHeader)); log.info("消息内容:{}", message); // 解析消息服务名 String serviceName = esbHeader.getString("ServiceName"); // 校验消息合法性 if (StrUtil.isBlank(serviceName) || StrUtil.isBlank(message)) { log.error("消息不合规,服务名称或消息内容为空"); mqQueueManager.commit(); // 提交事务 log.info("=============== mq分发消息结束 ==============="); return; } // 提交事务 mqQueueManager.commit(); // 提交事务(先提交事务后消费。允许消息丢失,防止重复消费,防止异常消息无法消费阻塞后续消息的消费。) try { // 分发信息 esbObserverService.notifyObservers(serviceName, message); } catch (Exception e) { log.error("消息处理异常!{}", e.getMessage()); } log.info("消息分发成功"); } catch (Exception e) { log.error("分发消息异常!{}", e.getMessage()); try { mqQueueManager.backout(); // 回滚事务 } catch (MQException ex) { log.error("回滚事务异常!{}", ex.getMessage()); } } log.info("=============== mq分发消息结束 ==============="); } /** * 获取mq队列消息 * * @param mqQueueManager mq队列管理器 * @param absTractEsbMqQueue 队列 * @param esbMqStatus mq连接状态对象 * @return mq队列消息列表 * @author Baien * @date 2023/9/16 13:41:37 */ private MQMessage getMqMessage(MQQueueManager mqQueueManager, AbsTractEsbMqQueue absTractEsbMqQueue, EsbMqStatus esbMqStatus) { MQQueue queue = null; MQMessage mqMessage = new MQMessage(); try { try { // 打开队列 queue = esbMqService.accessQueue(mqQueueManager, absTractEsbMqQueue); } catch (Exception e) { esbMqStatus.setConnected(false); // 设置mq连接状态 throw e; } // 获取队列当前深度 int currentDepth = queue.getCurrentDepth(); if (currentDepth < 1) { return null; // 消息为空返回 } // 设置取消息参数属性 MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = MQConstants.MQGMO_SYNCPOINT | MQConstants.MQGMO_WAIT; // 开启获取消息事务+开启获取消息等待 gmo.waitInterval = 1000; // 单位毫秒(设置等待时间,-1为无限等待)。没有获取到消息时,会等待1秒。 // 从队列中获取消息 queue.get(mqMessage, gmo); // 获取消息 return mqMessage; } catch (MQException e) { if (e.getReason() == EsbConstants.MqReasonCode.MQRC_NO_MSG_AVAILABLE.getCode()) { log.info("队列消息已取完!"); } else { log.error("消息获取异常!{}", e.getMessage()); } return null; } catch (Exception e) { log.error("消息获取异常!{}", e.getMessage()); return null; } finally { try { if (queue != null) { queue.close(); // 关闭队列 } } catch (MQException e) { log.error("关闭队列异常!{}", e.getMessage()); } } } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173

    8. MQ 消息获取服务类

    package com.example.demo.esb.mq;
    
    import com.alibaba.fastjson.JSONObject;
    import com.example.demo.config.EsbMqProperties;
    import com.example.demo.esb.mq.queue.AbsTractEsbMqQueue;
    import com.ibm.mq.*;
    import com.ibm.mq.constants.MQConstants;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    /**
     * esb-mq发送消息服务
     *
     * @author Baien
     * @date 2023/9/16 13:41:37
     */
    @Slf4j
    @Service
    @RequiredArgsConstructor
    public class EsbMqMsgSend {
    
        private final EsbMqProperties esbMqProperties;
    
        private final EsbMqFactory esbMqFactory;
    
        private final EsbMqService esbMqService;
    
        /**
         * 发送消息
         *
         * @param serviceName 服务名
         * @param message     消息内容
         * @throws Exception 异常
         * @author Baien
         * @date 2023/9/16 13:41:37
         */
        public void sendMessage(String serviceName, String message) throws Exception {
            // 获取队列管理器
            MQQueueManager mqQueueManager = esbMqFactory.getMqQueueManager();
            if (mqQueueManager == null || !mqQueueManager.isConnected()) {
                throw new Exception("队列管理器连接异常!请稍后重试");
            }
            // 获取队列配置
            AbsTractEsbMqQueue inQueue = esbMqFactory.getEsbMqQueueMap(esbMqProperties.getInQueue());
            if (inQueue == null) {
                throw new Exception("未查询到目标队列:" + esbMqProperties.getInQueue());
            }
    
            log.info("=============== mq发送消息开始 ===============");
            String esbHeader = encodeEsbHeader(serviceName);
            log.info("目标队列:{}", inQueue.getQueueName());
            log.info("消息头:{}", esbHeader);
            log.info("消息内容:{}", message);
    
            MQQueue queue = null;
            try {
                // 打开队列
                queue = esbMqService.accessQueue(mqQueueManager, inQueue);
                // 设置mq消息
                MQMessage mqMessage = new MQMessage();
                mqMessage.format = MQConstants.MQFMT_STRING;
                mqMessage.setStringProperty("ESBHeader", esbHeader);
                mqMessage.characterSet = esbMqProperties.getCcsId();
                mqMessage.writeString(message);
                // 设置发送消息参数
                MQPutMessageOptions pmo = new MQPutMessageOptions();
                pmo.options = MQConstants.MQPMO_SYNCPOINT; // 放入消息事务开启
                // 将消息放入队列
                queue.put(mqMessage, pmo);
                // 提交事务
                mqQueueManager.commit();
                log.info("消息发送成功");
            } catch (Exception e) {
                try {
                    // 事务回滚
                    mqQueueManager.backout();
                } catch (MQException ex) {
                    log.error("回滚事务异常!{}", ex.getMessage());
                }
                throw new Exception("发送消息异常!" + e.getMessage());
            } finally {
                if (queue != null) {
                    try {
                        // 关闭队列
                        queue.close();
                    } catch (MQException e) {
                        log.error("队列关闭异常!{}", e.getMessage());
                    }
                }
                log.info("=============== mq发送消息结束 ===============");
            }
        }
    
        /**
         * 设置mq消息header
         *
         * @param serviceName 服务名
         * @return mq消息header
         * @author Baien
         * @date 2023/9/16 13:41:37
         */
        private String encodeEsbHeader(String serviceName) {
            JSONObject header = new JSONObject();
            header.put("RequestID", UUID.randomUUID());
            header.put("ServiceName", serviceName);
            return JSONObject.toJSONString(header);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112

    9. MQ 公共服务类

    mq服务类,用于提供mq服务的一些方法。

    package com.example.demo.esb.mq;
    
    import com.example.demo.esb.constant.EsbConstants;
    import com.example.demo.esb.mq.queue.AbsTractEsbMqQueue;
    import com.ibm.mq.MQException;
    import com.ibm.mq.MQQueue;
    import com.ibm.mq.MQQueueManager;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    
    /**
     * esb-mq基础服务
     *
     * @author Baien
     * @date 2023/9/16 13:41:37
     */
    @Slf4j
    @Service
    @RequiredArgsConstructor
    public class EsbMqService {
    
        private final EsbMqFactory esbMqFactory;
    
        /**
         * mq重连检测
         * 

    * 检测mq连接状态,连接断开则进行重连。 * * @author Baien * @date 2023/9/16 13:41:37 */ public void checkReconnect() { /*--------------------1.检测mq管理器连接--------------------*/ MQQueueManager mqQueueManager = esbMqFactory.getMqQueueManager(); // 如果mq管理器未连接,则进行重连 if (mqQueueManager == null || !mqQueueManager.isConnected()) { log.info("=============== mq重连开始 ==============="); log.info("队列管理器连接异常!进行重连..."); esbMqFactory.createMqQueueManager(); log.info("=============== mq重连结束 ==============="); return; } /*--------------------2.检测mq队列连接--------------------*/ boolean isConnected = esbMqFactory.checkQueueConnected(mqQueueManager); // 如果连接失效,则进行重连 if (!isConnected) { log.info("=============== mq重连开始 ==============="); log.info("队列服务器通道连接异常!进行重连..."); esbMqFactory.createMqQueueManager(); log.info("=============== mq重连结束 ==============="); } } /** * 获取打开目标队列 * * @param mqQueueManager 队列管理器 * @param absTractEsbMqQueue 队列 * @return 队列 * @author Baien * @date 2023/9/16 13:41:37 */ public MQQueue accessQueue(MQQueueManager mqQueueManager, AbsTractEsbMqQueue absTractEsbMqQueue) throws Exception { String queueName = absTractEsbMqQueue.getQueueName(); // 队列名称 try { // 打开队列 return mqQueueManager.accessQueue(absTractEsbMqQueue.getQueueName(), absTractEsbMqQueue.getOpenOptions()); } catch (MQException e) { if (e.getReason() == EsbConstants.MqReasonCode.MQRC_CONNECTION_BROKEN.getCode()) { throw new Exception(queueName + " 队列连接失效!"); } else if (e.getReason() == EsbConstants.MqReasonCode.MQRC_UNKNOWN_OBJECT_NAME.getCode()) { throw new Exception(queueName + " 未知队列!"); } else { throw new Exception(queueName + " 队列打开异常!" + e.getMessage()); } } } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    10. MQ 重连检测定时任务

    mq定时任务,任务如下:

    1. 定时检测mq连接状态,进行重连。
    • 采用定时任务的形式,检测mq连接状态,进行重连,单一控制mq的重新创建过程,避免大量mq连接创建的资源占用,避免mq连接失效时引起大量的并发创建问题。
    package com.example.demo.scheduled;
    
    import com.example.demo.esb.mq.EsbMqService;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    /**
     * esb-mq定时任务
     *
     * @author Baien
     * @date 2023/9/18 18:08:42
     */
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class EsbMqScheduled {
    
        private final EsbMqService esbMqService;
    
        /**
         * mq重连检测定时任务
         */
        @Scheduled(cron = "0/30 * * * * ?")
        public void checkReconnectScheduled() {
            // mq重连检测
            esbMqService.checkReconnect();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    二、开发连接调试

    进行连接测试时,可参考此处。

    队列管理器不存在连接者时默认连接计数显示为21,当我们创建一个队列管理器连接时,该连接计数加1。

    在这里插入图片描述

    右键对应通道-》状态-》通道状态,可查看该通道当前对话数,不存在连接对话时为0,当我们创建一个队列管理器连接使用该通道时,当前对话数加1。

    在这里插入图片描述

    三、MQ 连接断开重连说明

    IBMMQ连接断开存在两种情况:

    1. 队列管理器连接失效:mq服务端异常,mq服务地址无法访问或者mq服务端队列管理器服务异常,导致本地服务已连接的mq队列管理器连接失效。(mq服务器停止、与mq服务器网络断开、注册在mq服务端的队列管理器停止。)
    2. 队列管理器服务器通道连接失效:mq服务端队列管理器连接正常,而队列管理器中的服务器通道连接异常,导致本地服务已连接的队列管理器服务器通道连接失效。(队列管理器中对应的服务器通道停止。)

    当队列管理器连接成功创建时,队列管理器和队列管理器中对应的服务器通道会持有该连接的句柄。无论哪种方式导致mq连接失效时,原连接所在判断IBMMQ连接是否断开时,需要检测mq队列管理器的连接状态以及mq队列管理器中各消息队列的连接状态。

    当mq连接失效时,及时调用 MQQueueManager.disconnect() 方法将mq连接销毁。否则该连接会一直占用资源。

    四、MQ 连接断开重连测试

    需要测试两种场景:

    1. 成功连接后,停止队列管理器,打印连接异常日志后,启动队列管理器,测试是否重连成功。
    2. 成功连接后,停止服务器通道,打印连接异常日志后,启动服务器通道,测试是否重连成功。
  • 相关阅读:
    jxTMS+职教:SaaS模式的低门槛开发实训
    PyTorch搭建LSTM神经网络实现文本情感分析实战(附源码和数据集)
    浏览器运行机制
    模拟堆的实现
    zookeeper/HA集群配置
    设计模式——组合模式(Composite Pattern)+ Spring相关源码
    Python深度学习进阶与应用丨注意力(Attention)机制、Transformer模型、生成式模型、目标检测算法、图神经网络、强化学习详解等
    Diffusion Models视频生成-博客汇总
    【YOLO】YOLO简介
    设计模式-组合模式
  • 原文地址:https://blog.csdn.net/demo_yo/article/details/133854975