from typing import AsyncIterator, List from asyncio.subprocess import PIPE import asyncio class ServerHandler: def __init__(self, argv: List[str]): self.argv = argv self._proc: asyncio.subprocess.Process | None = None self._stdout_queue: asyncio.Queue[str | None] = asyncio.Queue() self._reader_task: asyncio.Task | None = None self._stderr_task: asyncio.Task | None = None self._watcher_task: asyncio.Task | None = None self._stop_event = asyncio.Event() async def start(self) -> None: if self._proc and self._proc.returncode is None: return self._stop_event = asyncio.Event() self._stdout_queue = asyncio.Queue() self._proc = await asyncio.create_subprocess_exec( *self.argv, stdout=PIPE, stderr=PIPE, ) loop = asyncio.get_running_loop() self._reader_task = loop.create_task( self._read_stream(self._proc.stdout)) self._stderr_task = loop.create_task( self._read_stream(self._proc.stderr)) self._watcher_task = loop.create_task(self._watch_process()) async def _read_stream(self, stream: asyncio.StreamReader | None) -> None: if stream is None: return try: while not stream.at_eof(): line_bytes = await stream.readline() if not line_bytes: break try: line = line_bytes.decode(errors="replace").rstrip("\n") except Exception: line = line_bytes.decode("utf-8", "replace").rstrip("\n") print(line) await self._stdout_queue.put(line) except asyncio.CancelledError: ... finally: await self._stdout_queue.put(None) async def _watch_process(self) -> None: if self._proc is not None: await self._proc.wait() self._stop_event.set() for t in (self._reader_task, self._stderr_task): if t and not t.done(): t.cancel() await self._stdout_queue.put(None) async def reader(self) -> AsyncIterator[str]: while True: item = await self._stdout_queue.get() if item is None: break yield item