• 接入Websocket,自动接收CSDN短消息


    最近在研究Websocket功能,本来想接入抖音和快手的弹幕功能,以及短消息功能。

    在了解的过程中,也开发了一些测试项目。

    这不是,就把CSDN的短消息项目给弄出来了。

    直接上代码:

    1. # !/usr/bin python3
    2. # -*- encoding=utf-8 -*-
    3. # Description :
    4. # Author :
    5. # @Email :
    6. # @File : csdn.py
    7. # @Time : 2023-09-05 15:18:58
    8. # @Project : live_core
    9. import requests
    10. import websocket
    11. import threading
    12. import time
    13. import json
    14. import logging
    15. import re
    16. class CSDN(object):
    17. user_name = ''
    18. device_id = ''
    19. cookie = ''
    20. def init_wws(self, cookie):
    21. try:
    22. self.cookie = cookie
    23. self.user_name = re.search('UserName=(.*?);', cookie).group(1)
    24. self.device_id = re.search('uuid_tt_dd=(.*?);', cookie).group(1)
    25. except Exception as e:
    26. logging.error('错误'.format(e.args))
    27. def _connect_data(self):
    28. _url = 'https://bizapi.csdn.net/im-manage/v1.0/dispatch/do'
    29. _h = {
    30. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
    31. 'Cookie': self.cookie
    32. }
    33. _d = {
    34. 'appId': "CSDN-PC",
    35. 'channelType': "privateMsg",
    36. 'deviceId': self.device_id,
    37. 'groupId': "CSDN-private-MSG",
    38. 'linkType': 1,
    39. 'token': "",
    40. 'userId': self.user_name
    41. }
    42. _req = requests.post(_url, headers=_h, json=_d)
    43. return _req.json()
    44. def _on_open(self, ws):
    45. _body = self._connect_data()
    46. # print(_body.get('data'))
    47. _body_data = _body.get('data')
    48. _data = {
    49. 'body': {
    50. 'userId': self.user_name,
    51. 'appId': 'CSDN-PC',
    52. 'imToken': _body_data.get('imToken'),
    53. 'groupId': 'CSDN-private-MSG'
    54. },
    55. 'cmdId': 2,
    56. 'isZip': 0,
    57. 'ver': '1.0'
    58. }
    59. ws.send(json.dumps(_data))
    60. threading.Thread(target=self._keep_heart_beat, args=(ws,), daemon=True).start()
    61. def _keep_heart_beat(self, ws: websocket.WebSocketApp):
    62. while True:
    63. time.sleep(5)
    64. _payload = {
    65. 'cmdId': 1
    66. }
    67. ws.send(json.dumps(_payload))
    68. def _on_message(self, ws: websocket.WebSocketApp, message):
    69. logging.info(message)
    70. def _on_error(self, ws: websocket.WebSocketApp, error):
    71. logging.error(error)
    72. def _on_close(self, ws):
    73. logging.info('close')
    74. def wss_start(self):
    75. websocket.enableTrace(False)
    76. ws = websocket.WebSocketApp(
    77. 'wss://im-linkserver-66.csdn.net/', on_message=self._on_message, on_error=self._on_error, on_close=self._on_close,
    78. on_open=self._on_open
    79. )
    80. ws.run_forever()
    81. if __name__ == '__main__':
    82. LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
    83. logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
    84. # cookies 这里填CSDN的Cookies
    85. _c = ''
    86. _csdn = CSDN()
    87. _csdn.init_wws(_c)
    88. _csdn.wss_start()

     运行结果如下:

  • 相关阅读:
    Windows 10 布置IP安全策略
    品牌百度百科词条创建方法是什么?品牌百科怎么创建?
    速盾:cdn加速防ddos
    位置编码器
    以数据为中心的安全市场快速增长
    【C++】string类的使用,小试牛刀
    C++:中的继承关系:单继承,多继承,菱形继承的详细介绍
    wget编译升级故障解决
    国产操作系统银河麒麟v10 (SP1) Kylin-server 安装Elasticsearch7.6.1
    尚硅谷最新Node.js 学习笔记(一)
  • 原文地址:https://blog.csdn.net/John_Lenon/article/details/132701151