Genesis commit
This commit is contained in:
commit
0736d1fe08
4 changed files with 181 additions and 0 deletions
71
handler.py
Normal file
71
handler.py
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
from typing import AsyncIterator, List
|
||||
from asyncio.subprocess import PIPE
|
||||
import asyncio
|
||||
|
||||
|
||||
class ServerHandler:
|
||||
def __init__(self, argv: List[str]):
|
||||
self.argv = argv
|
||||
self._proc: asyncio.subprocess.Process | None = None
|
||||
self._stdout_queue: asyncio.Queue[str | None] = asyncio.Queue()
|
||||
self._reader_task: asyncio.Task | None = None
|
||||
self._stderr_task: asyncio.Task | None = None
|
||||
self._watcher_task: asyncio.Task | None = None
|
||||
self._stop_event = asyncio.Event()
|
||||
|
||||
async def start(self) -> None:
|
||||
if self._proc and self._proc.returncode is None:
|
||||
return
|
||||
|
||||
self._stop_event = asyncio.Event()
|
||||
self._stdout_queue = asyncio.Queue()
|
||||
|
||||
self._proc = await asyncio.create_subprocess_exec(
|
||||
*self.argv,
|
||||
stdout=PIPE,
|
||||
stderr=PIPE,
|
||||
)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
self._reader_task = loop.create_task(
|
||||
self._read_stream(self._proc.stdout))
|
||||
self._stderr_task = loop.create_task(
|
||||
self._read_stream(self._proc.stderr))
|
||||
self._watcher_task = loop.create_task(self._watch_process())
|
||||
|
||||
async def _read_stream(self, stream: asyncio.StreamReader | None) -> None:
|
||||
if stream is None:
|
||||
return
|
||||
try:
|
||||
while not stream.at_eof():
|
||||
line_bytes = await stream.readline()
|
||||
if not line_bytes:
|
||||
break
|
||||
|
||||
try:
|
||||
line = line_bytes.decode(errors="replace").rstrip("\n")
|
||||
except Exception:
|
||||
line = line_bytes.decode("utf-8", "replace").rstrip("\n")
|
||||
|
||||
print(line)
|
||||
await self._stdout_queue.put(line)
|
||||
except asyncio.CancelledError:
|
||||
...
|
||||
finally:
|
||||
await self._stdout_queue.put(None)
|
||||
|
||||
async def _watch_process(self) -> None:
|
||||
if self._proc is not None:
|
||||
await self._proc.wait()
|
||||
self._stop_event.set()
|
||||
for t in (self._reader_task, self._stderr_task):
|
||||
if t and not t.done():
|
||||
t.cancel()
|
||||
await self._stdout_queue.put(None)
|
||||
|
||||
async def reader(self) -> AsyncIterator[str]:
|
||||
while True:
|
||||
item = await self._stdout_queue.get()
|
||||
if item is None:
|
||||
break
|
||||
yield item
|
||||
Loading…
Add table
Add a link
Reference in a new issue