71 lines
2.3 KiB
Python
71 lines
2.3 KiB
Python
from typing import AsyncIterator, List
|
|
from asyncio.subprocess import PIPE
|
|
import asyncio
|
|
|
|
|
|
class ServerHandler:
|
|
def __init__(self, argv: List[str]):
|
|
self.argv = argv
|
|
self._proc: asyncio.subprocess.Process | None = None
|
|
self._stdout_queue: asyncio.Queue[str | None] = asyncio.Queue()
|
|
self._reader_task: asyncio.Task | None = None
|
|
self._stderr_task: asyncio.Task | None = None
|
|
self._watcher_task: asyncio.Task | None = None
|
|
self._stop_event = asyncio.Event()
|
|
|
|
async def start(self) -> None:
|
|
if self._proc and self._proc.returncode is None:
|
|
return
|
|
|
|
self._stop_event = asyncio.Event()
|
|
self._stdout_queue = asyncio.Queue()
|
|
|
|
self._proc = await asyncio.create_subprocess_exec(
|
|
*self.argv,
|
|
stdout=PIPE,
|
|
stderr=PIPE,
|
|
)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
self._reader_task = loop.create_task(
|
|
self._read_stream(self._proc.stdout))
|
|
self._stderr_task = loop.create_task(
|
|
self._read_stream(self._proc.stderr))
|
|
self._watcher_task = loop.create_task(self._watch_process())
|
|
|
|
async def _read_stream(self, stream: asyncio.StreamReader | None) -> None:
|
|
if stream is None:
|
|
return
|
|
try:
|
|
while not stream.at_eof():
|
|
line_bytes = await stream.readline()
|
|
if not line_bytes:
|
|
break
|
|
|
|
try:
|
|
line = line_bytes.decode(errors="replace").rstrip("\n")
|
|
except Exception:
|
|
line = line_bytes.decode("utf-8", "replace").rstrip("\n")
|
|
|
|
print(line)
|
|
await self._stdout_queue.put(line)
|
|
except asyncio.CancelledError:
|
|
...
|
|
finally:
|
|
await self._stdout_queue.put(None)
|
|
|
|
async def _watch_process(self) -> None:
|
|
if self._proc is not None:
|
|
await self._proc.wait()
|
|
self._stop_event.set()
|
|
for t in (self._reader_task, self._stderr_task):
|
|
if t and not t.done():
|
|
t.cancel()
|
|
await self._stdout_queue.put(None)
|
|
|
|
async def reader(self) -> AsyncIterator[str]:
|
|
while True:
|
|
item = await self._stdout_queue.get()
|
|
if item is None:
|
|
break
|
|
yield item
|