最近在研究Websocket功能,本来想接入抖音和快手的弹幕功能,以及短消息功能。
在了解的过程中,也开发了一些测试项目。
这不是,就把CSDN的短消息项目给弄出来了。
直接上代码:
- # !/usr/bin python3
- # -*- encoding=utf-8 -*-
- # Description :
- # Author :
- # @Email :
- # @File : csdn.py
- # @Time : 2023-09-05 15:18:58
- # @Project : live_core
-
- import requests
- import websocket
- import threading
- import time
- import json
- import logging
- import re
-
-
- class CSDN(object):
-
- user_name = ''
- device_id = ''
- cookie = ''
-
- def init_wws(self, cookie):
- try:
- self.cookie = cookie
- self.user_name = re.search('UserName=(.*?);', cookie).group(1)
- self.device_id = re.search('uuid_tt_dd=(.*?);', cookie).group(1)
- except Exception as e:
- logging.error('错误'.format(e.args))
-
- def _connect_data(self):
- _url = 'https://bizapi.csdn.net/im-manage/v1.0/dispatch/do'
- _h = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
- 'Cookie': self.cookie
- }
-
- _d = {
- 'appId': "CSDN-PC",
- 'channelType': "privateMsg",
- 'deviceId': self.device_id,
- 'groupId': "CSDN-private-MSG",
- 'linkType': 1,
- 'token': "",
- 'userId': self.user_name
- }
-
- _req = requests.post(_url, headers=_h, json=_d)
- return _req.json()
-
- def _on_open(self, ws):
- _body = self._connect_data()
- # print(_body.get('data'))
- _body_data = _body.get('data')
- _data = {
- 'body': {
- 'userId': self.user_name,
- 'appId': 'CSDN-PC',
- 'imToken': _body_data.get('imToken'),
- 'groupId': 'CSDN-private-MSG'
- },
- 'cmdId': 2,
- 'isZip': 0,
- 'ver': '1.0'
- }
- ws.send(json.dumps(_data))
- threading.Thread(target=self._keep_heart_beat, args=(ws,), daemon=True).start()
-
- def _keep_heart_beat(self, ws: websocket.WebSocketApp):
- while True:
- time.sleep(5)
- _payload = {
- 'cmdId': 1
- }
- ws.send(json.dumps(_payload))
-
- def _on_message(self, ws: websocket.WebSocketApp, message):
- logging.info(message)
-
- def _on_error(self, ws: websocket.WebSocketApp, error):
- logging.error(error)
-
- def _on_close(self, ws):
- logging.info('close')
-
- def wss_start(self):
- websocket.enableTrace(False)
- ws = websocket.WebSocketApp(
- 'wss://im-linkserver-66.csdn.net/', on_message=self._on_message, on_error=self._on_error, on_close=self._on_close,
- on_open=self._on_open
- )
- ws.run_forever()
-
-
- if __name__ == '__main__':
- LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
- logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
-
- # cookies 这里填CSDN的Cookies
- _c = ''
-
- _csdn = CSDN()
- _csdn.init_wws(_c)
- _csdn.wss_start()
运行结果如下:
