engine端
def __init__(self, request_id: str) -> None:
self.request_id = request_id
self._queue = asyncio.Queue()
def put(self, item) -> None:
self._queue.put_nowait(item)
def finish(self) -> None:
self._queue.put_nowait(StopIteration)
def finished(self) -> bool:
async def __anext__(self):
result = await self._queue.get()
if result is StopIteration:
elif isinstance(result, Exception):
self.items = ['my', 'name', 'is', 'lewis']
self.stream = AsyncStream()
async def generate(self):
async for request_output in self.stream:
server端
from typing import AsyncGenerator
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse
from api_engine import get_engine
TIMEOUT_TO_PREVENT_DEADLOCK = 1
async def generate(request: Request) -> Response:
for item in engine.items:
results_generator = engine.generate()
async def stream_results() -> AsyncGenerator[bytes, None]:
async for request_output in results_generator:
ret = {"text": request_output}
yield (json.dumps(ret) + "\0").encode("utf-8")
return StreamingResponse(stream_results())
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
timeout_keep_alive=TIMEOUT_KEEP_ALIVE)
client端
from typing import Iterable, List
def get_streaming_response(response: requests.Response) -> Iterable[List[str]]:
for chunk in response.iter_lines(chunk_size=8192,
data = json.loads(chunk.decode("utf-8"))
headers = {"User-Agent": "Test Client"}
response = requests.post(api_url, headers=headers, stream=True)
if __name__ == "__main__":
api_url = f"http://localhost:8000/generate"
response = post_http_request(api_url)
for h in get_streaming_response(response):