• python使用websocket服务并在fastAPI中启动websocket服务


    1. 依赖
      pip install websockets-routes
    2. 代码
      1. import asyncio
      2. import json
      3. from typing import Union
      4. import websockets
      5. import websockets_routes
      6. from websockets.legacy.server import WebSocketServerProtocol
      7. from websockets_routes import RoutedPath
      8. # 初始化一个router对象
      9. router = websockets_routes.Router()
      10. # 连接句柄
      11. connections: dict[str, Union[None, WebSocketServerProtocol]] = {
      12. 'setting': None,
      13. 'fastapi': None,
      14. 'administrator': None
      15. }
      16. flags: dict[str, any] = {
      17. 'is_allow_query': True,
      18. 'opt_success': False
      19. }
      20. # 消息监听
      21. @router.route("/command/{identification}")
      22. async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
      23. """
      24. 更新连接句柄
      25. """
      26. if path.params['identification'] == 'setting':
      27. connections['setting'] = websocket
      28. elif path.params["identification"] == 'fastapi':
      29. connections['fastapi'] = websocket
      30. elif path.params["identification"] == 'administrator':
      31. connections['administrator'] = websocket
      32. else:
      33. return
      34. """
      35. 处理消息
      36. """
      37. async for message in websocket:
      38. # 处理setting用户的业务
      39. if path.params['identification'] == 'setting':
      40. if connections['fastapi'] is None:
      41. await websocket.send("fastapi的websocket未连接")
      42. else:
      43. msg: dict = json.loads(message)
      44. if msg['cmd'] == 'is_allow_query':
      45. flags['is_allow_query'] = True if msg['data'] == 'true' else False
      46. # 处理fastapi用户的业务
      47. elif path.params["identification"] == 'fastapi':
      48. if connections['setting'] is None:
      49. await websocket.send("setting的websocket未连接")
      50. else:
      51. await connections['setting'].send(message)
      52. await websocket.send("发送成功")
      53. # 处理administrator用户的业务
      54. elif path.params["identification"] == 'administrator':
      55. await websocket.send("【administrator】你发给我的消息是:" + json.dumps(message))
      56. # 其他
      57. else:
      58. await websocket.send("身份错误")
      59. # 启动WebSocket服务器
      60. async def main():
      61. # 启动WebSocket服务
      62. async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
      63. await asyncio.Future() # run forever
      64. def start():
      65. # 启动WebSocket服务
      66. asyncio.run(main())
    3. fastAPI中调用

       

    4. 连接服务

    5. 在fastAPI中启动websocket服务

      1. import asyncio
      2. import websockets
      3. import websockets_routes
      4. from websockets.legacy.server import WebSocketServerProtocol
      5. from websockets_routes import RoutedPath
      6. # 初始化一个router对象
      7. router = websockets_routes.Router()
      8. # 消息监听
      9. @router.route("/command/{identification}")
      10. async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
      11. # 收到消息
      12. async for message in websocket:
      13. # 处理setting用户的业务
      14. if path.params['identification'] == 'setting':
      15. await websocket.send("你发给我的消息是:" + message)
      16. # 处理administrator用户的业务
      17. elif path.params["identification"] == 'administrator':
      18. await websocket.send("你发给我的消息是:" + message)
      19. else:
      20. await websocket.send("指令码错误")
      21. # 启动WebSocket服务器
      22. async def main():
      23. # 启动WebSocket服务
      24. async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
      25. await asyncio.Future() # run forever
      26. def start():
      27. # 启动WebSocket服务
      28. asyncio.run(main())
      1. if __name__ == "__main__":
      2. # 开启一个线程去启动WebSocket服务器
      3. thread = Thread(target=start)
      4. thread.start()
      5. # 启动Web服务
      6. uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)

  • 相关阅读:
    第75步 时间序列建模实战:多步滚动预测 vol-3(以决策树回归为例)
    使用jib-maven-plugin插件构建镜像并推送至私服Harbor
    GoldenEye
    面试官:你会如何设计QQ中的网络协议?
    python:日期时间处理
    Linux 修改文件(文件夹)的权限 chown 与 chmod
    C++基类和派生类的内存分配,多态的实现
    leetcode 移除链表元素
    攻防演练-组织沙盘推演的4个阶段.
    线上多域名实战
  • 原文地址:https://blog.csdn.net/wenxingchen/article/details/128093267