• GlobalWebsoket.js 封装配置分析



    前言

    由于项目业务逻辑需要,此次前端界面需要接收后端服务器 WebSoket 实时传输的数据,并在页面当中显示实时数据
    项目中已经用 js 封装好了能用的 GlobalWebsoket.js


    一、 封装好的 GlobalWebsoket.js

    代码如下

    1. GlobalWebsoket.js

    // GlobalWebsoket.js
    import store from '@/store/index.js';
    import Config from '@/core/config' // 引入 cofig ,Config.js 当中配置的是 url 地址
    import {
    	Observable
    } from "rxjs";
    
    // 后端api地址
    const wsHost = Config.get('wsUrl')
    let ws;
    let count = 0;
    var subs = {};
    let timer = {};
    const MAX_RETRIES = 2000;
    let trySendCount = 0;
    let tempQueue = [];
    let socketOpen = false;
    const initWebSocket = () => {
    	let token = store.state.token ? store.state.token : store.getters.token;
    	const wsUrl = `${wsHost}/messaging/${token}?:X_Access_Token=${token}`;
    	try {
    		//微信websocket最大并发不能超过5个
    		//https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.html
    
    		if (count > 0) {
    			return ws;
    		}
    		clearInterval(timer);
    		ws = uni.connectSocket({
    			url: wsUrl,
    			complete: () => {}
    		});
    		count += 1;
    		uni.onSocketClose(function() {
    			socketOpen = false;
    			ws = undefined;
    			setTimeout(initWebSocket, 5000 * count);
    		});
    		uni.onSocketOpen(function() {
    			socketOpen = true;
    		});
    		uni.onSocketMessage(function(msg) {
    			var data = JSON.parse(msg.data);
    			if (data.type === 'error') {
    				uni.showToast({
    					title: data.message,
    					icon: "none",
    					duration: 3500
    				})
    			}
    			if (subs[data.requestId]) {
    				if (data.type === 'complete') {
    					subs[data.requestId].forEach(function(element) {
    						element.complete();
    					});;
    				} else if (data.type === 'result') {
    					subs[data.requestId].forEach(function(element) {
    						element.next(data);
    					});;
    				}
    			}
    		});
    	} catch (error) {
    		setTimeout(initWebSocket, 5000 * count);
    	}
    
    	timer = setInterval(function() {
    		try {
    			ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({
    				"type": "ping"
    			})) : 0;
    		} catch (error) {
    			console.error(error, '发送心跳错误');
    		}
    		//同时判断
    		if (tempQueue.length > 0 && ws && ws.readyState === 1) {
    			sendSocketMessage(tempQueue[0], 1);
    		}
    	}, 2000);
    	return ws;
    };
    
    //flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
    function sendSocketMessage(msg, flag) {
    	if (socketOpen) {
    		uni.sendSocketMessage({
    			data: msg
    		});
    		if (flag === 1) {
    			tempQueue.splice(0, 1);
    		}
    	} else {
    		if (flag != 1) {
    			tempQueue.push(msg);
    		}
    	}
    }
    
    
    const getWebsocket = (id, topic, parameter) => {
    	return Observable.create(function(observer) {
    		if (!subs[id]) {
    			subs[id] = [];
    		}
    		subs[id].push({
    			next: function(val) {
    				observer.next(val);
    			},
    			complete: function() {
    				observer.complete();
    			}
    		});
    		var msg = JSON.stringify({
    			id: id,
    			topic: topic,
    			parameter: parameter,
    			type: 'sub'
    		});
    		var thisWs = initWebSocket();
    		if (thisWs) {
    			try {
    				sendSocketMessage(msg);
    			} catch (error) {
    				initWebSocket();
    				uni.showToast({
    					title: 'websocket服务连接失败',
    					icon: "none",
    					duration: 3500
    				})
    			}
    		} else {
    			tempQueue.push(msg);
    			ws = undefined
    			count = 0
    			initWebSocket();
    		}
    		return function() {
    			var unsub = JSON.stringify({
    				id: id,
    				type: "unsub"
    			});
    			delete subs[id];
    			if (thisWs) {
    				sendSocketMessage(unsub)
    			}
    		};
    	});
    };
    exports.getWebsocket = getWebsocket;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150

    二、GlobalWebsoket.js 代码分析

    1.GlobalWebsoket.js import 分析

    import store from '@/store/index.js'; // vueX 做状态管理的
    import Config from '@/core/config' // 引入 cofig ,Config.js 当中配置的是 url 地址
    import {
    	Observable
    } from "rxjs";
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Config 的地址是从 config.js 中来的
    在这里插入图片描述

    RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。 可以把 RxJS 当做是用来处理事件的 Lodash 。

    2.GlobalWebsoket.js 整体分析

    GlobalWebsoket.js 文件导出了getWebsocket,这是一个函数

    import store from '@/store/index.js';
    import Config from '@/core/config'
    import {
    	Observable
    } from "rxjs";
    
    // 后端api地址
    // 这里通过 Config 获取需要连接 websoket 的地址
    const wsHost = Config.get('wsUrl')
    let ws;
    let count = 0;
    var subs = {};
    let timer = {};
    const MAX_RETRIES = 2000;
    let trySendCount = 0;
    let tempQueue = [];
    let socketOpen = false;
    
    
    exports.getWebsocket = getWebsocket;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3. initWebSoket()

    const initWebSocket = () => {
    	// 先获取 token 
    	let token = store.state.token ? store.state.token : store.getters.token;
    	// 这里是 websoket 的 url 地址
    	// ${wsHost} 是 通过 Config 获取需要连接 websoket 的地址
    	// ${token} 是 token 信息
    	const wsUrl = `${wsHost}/messaging/${token}?:X_Access_Token=${token}`;
    	try {
    		//微信websocket最大并发不能超过5个
    		//https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.html
    
    		// 如果连接数量大于 0 
    		if (count > 0) {
    			// 这里就返回当前连接
    			return ws;
    		}
    		// 执行到这里说明连接数为 0 ,以下代码为创建新的 websoket 的连接
    		clearInterval(timer);
    		
    		// 调用 uni.connectSocket 来创建连接
    		ws = uni.connectSocket({
    			url: wsUrl,
    			complete: () => {}
    		});
    		count += 1;
    		
    		// 关闭连接的回调函数
    		uni.onSocketClose(function() {
    			socketOpen = false;
    			ws = undefined;
    			setTimeout(initWebSocket, 5000 * count);
    		});
    		
    		// 连接打开的回调函数
    		uni.onSocketOpen(function() {
    			socketOpen = true;
    		});
    		
    		// 当向 websoket 发送信息时的回调函数
    		uni.onSocketMessage(function(msg) {
    			var data = JSON.parse(msg.data);
    			if (data.type === 'error') {
    				uni.showToast({
    					title: data.message,
    					icon: "none",
    					duration: 3500
    				})
    			}
    			if (subs[data.requestId]) {
    				if (data.type === 'complete') {
    					subs[data.requestId].forEach(function(element) {
    						element.complete();
    					});;
    				} else if (data.type === 'result') {
    					subs[data.requestId].forEach(function(element) {
    						element.next(data);
    					});;
    				}
    			}
    		});
    	} catch (error) {
    		setTimeout(initWebSocket, 5000 * count);
    	}
    
    	// 设置定时器,每 2 秒执行一次,发送一次 'ping'
    	timer = setInterval(function() {
    		try {
    			ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({
    				"type": "ping"
    			})) : 0;
    		} catch (error) {
    			console.error(error, '发送心跳错误');
    		}
    		//同时判断
    		if (tempQueue.length > 0 && ws && ws.readyState === 1) {
    			sendSocketMessage(tempQueue[0], 1);
    		}
    	}, 2000);
    	// 返回新建的 ws
    	return ws;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    3. getWebsoket

    const getWebsocket = (id, topic, parameter) => { 
    	// 根据传递的 id, 处理 id, 获取需要监听的内容
    	return Observable.create(function(observer) {
    		if (!subs[id]) {
    			subs[id] = [];
    		}
    		subs[id].push({ // 所有需要监听的内容 push 到 subs[] 数组当中
    			next: function(val) {
    				observer.next(val);
    			},
    			complete: function() {
    				observer.complete();
    			}
    		});
    	// 根据传参的 id,topic,parameter ,讲需要发送的监听的内容封装到 msg 对象中
    		var msg = JSON.stringify({
    			id: id,
    			topic: topic,
    			parameter: parameter,
    			type: 'sub'
    		});
    		// 调用 initWebSoket,初始化 websocket,在 initWebSoket 当中发起连接
    		var thisWs = initWebSocket();
    		if (thisWs) { // 如果连接成功
    			try {
    				sendSocketMessage(msg); // 发送需要 websoket 绑定监听的 msg(上面封装好了)
    			} catch (error) { // 如果发送失败,再次发起连接
    				initWebSocket();
    				uni.showToast({
    					title: 'websocket服务连接失败',
    					icon: "none",
    					duration: 3500
    				})
    			}
    		} else { // 如果没有连接成功
    			tempQueue.push(msg); // 临时队列中先把 msg 存起来
    			ws = undefined // 断掉当前的连接
    			count = 0 // 并把连接数设为 0
    			initWebSocket(); // 再次初始化 websoket
    		}
    		return function() {  // 这里是解绑的时候会执行的 (remove)
    			var unsub = JSON.stringify({  
    				id: id,
    				type: "unsub"
    			});
    			delete subs[id];
    			if (thisWs) {
    				sendSocketMessage(unsub)
    			}
    		};
    	});
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    4. sendSocketMessage

    
    // 发送 soket 信息
    //flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
    function sendSocketMessage(msg, flag) {
    	// 如果当前的 websoket 是打开的
    	if (socketOpen) {
    		// 向 websoket 发送消息
    		uni.sendSocketMessage({
    			data: msg
    		});
    		if (flag === 1) {
    			tempQueue.splice(0, 1);
    		}
    	} else {
    		if (flag != 1) {
    			tempQueue.push(msg);
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    三、GlobalWebsoket.js 使用分析

  • 相关阅读:
    深度学习——day18 卷积神经网络第四周——人脸识别和神经风格迁移
    【3】CH347应用--USB转JTAG进行FPGA调试下载
    <stack和queue>——《C++初阶》
    SpringMVC项目整合SSM统一结果封装
    一个程序员的职业生涯到底该怎么规划?
    快速上手Django(六) -Django之Django drf 序列化器Serializer类
    Go语言之channel实现原理
    Spring七大组件
    .NET现代应用的产品设计 - DDD实践
    微信小程序(五)--- Vant组件库,API Promise化,MboX全局数据共享,分包相关
  • 原文地址:https://blog.csdn.net/YZRHANYU/article/details/127987182