• 实现http流式输出的最小实践


    engine端

    1. import asyncio
    2. class AsyncStream:
    3. #实现了__aiter__和__anext__表示这是一个异步队列,而不是同步队列,所以不能用for _ in 访问
    4. #内部元素,只能用async for _ in 访问
    5. def __init__(self, request_id: str) -> None:
    6. self.request_id = request_id
    7. self._queue = asyncio.Queue()
    8. self._finished = False
    9. def put(self, item) -> None:
    10. if self._finished:
    11. return
    12. self._queue.put_nowait(item)
    13. def finish(self) -> None:
    14. self._queue.put_nowait(StopIteration)
    15. self._finished = True
    16. @property
    17. def finished(self) -> bool:
    18. return self._finished
    19. def __aiter__(self):
    20. return self
    21. async def __anext__(self):
    22. result = await self._queue.get()
    23. if result is StopIteration:
    24. raise StopAsyncIteration
    25. elif isinstance(result, Exception):
    26. raise result
    27. return result
    28. class Engine():
    29. def __init__(self):
    30. self.items = ['my', 'name', 'is', 'lewis']
    31. self.stream = AsyncStream()
    32. async def generate(self):
    33. #这是一个异步生成器
    34. async for request_output in self.stream:
    35. print(request_output)
    36. yield request_output
    37. def get_engine():
    38. return Engine()

    server端

    1. import argparse
    2. import json
    3. from typing import AsyncGenerator
    4. from fastapi import FastAPI, Request
    5. from fastapi.responses import JSONResponse, Response, StreamingResponse
    6. import uvicorn
    7. from api_engine import get_engine
    8. TIMEOUT_KEEP_ALIVE = 5 # seconds.
    9. TIMEOUT_TO_PREVENT_DEADLOCK = 1 # seconds.
    10. app = FastAPI()
    11. engine = get_engine()
    12. @app.post("/generate")
    13. async def generate(request: Request) -> Response:
    14. for item in engine.items:
    15. print('push:', item)
    16. engine.stream.put(item)
    17. results_generator = engine.generate()
    18. # Streaming case
    19. async def stream_results() -> AsyncGenerator[bytes, None]:
    20. async for request_output in results_generator:
    21. ret = {"text": request_output}
    22. yield (json.dumps(ret) + "\0").encode("utf-8")
    23. return StreamingResponse(stream_results())
    24. if __name__ == "__main__":
    25. parser = argparse.ArgumentParser()
    26. parser.add_argument("--port", type=int, default=8000)
    27. args = parser.parse_args()
    28. uvicorn.run(app,
    29. host='localhost',
    30. port=args.port,
    31. log_level="debug",
    32. timeout_keep_alive=TIMEOUT_KEEP_ALIVE)

    client端

    1. import requests
    2. import json
    3. from typing import Iterable, List
    4. def get_streaming_response(response: requests.Response) -> Iterable[List[str]]:
    5. for chunk in response.iter_lines(chunk_size=8192,
    6. decode_unicode=False,
    7. delimiter=b"\0"):
    8. if chunk:
    9. data = json.loads(chunk.decode("utf-8"))
    10. output = data["text"]
    11. yield output
    12. def post_http_request(
    13. api_url: str,
    14. ) -> requests.Response:
    15. headers = {"User-Agent": "Test Client"}
    16. response = requests.post(api_url, headers=headers, stream=True)
    17. return response
    18. if __name__ == "__main__":
    19. api_url = f"http://localhost:8000/generate"
    20. response = post_http_request(api_url)
    21. num_printed_lines = 0
    22. for h in get_streaming_response(response):
    23. print(h)

  • 相关阅读:
    Nginx Note02——事件驱动模型
    在Linux环境下编写第一个c程序
    html网页多个div鼠标移动自动排列实例
    经典算法-----八皇后问题
    C语言 统计汉字的个数
    opencv基础篇 ——(七)边缘检测和图像锐化
    详解企业财务数字化转型路径|推荐收藏
    智慧能源三维可视化平台实时动态呈现运维状态
    基于STM32+OneNet设计的GPS定位器(ESP8266)
    LQ0153 字串排序【排序】
  • 原文地址:https://blog.csdn.net/zhuikefeng/article/details/134052626