Genesis commit

This commit is contained in:
Malasaur 2026-03-15 21:16:04 +01:00
commit a2a2bcb206
No known key found for this signature in database
6 changed files with 4167 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
.env
venv
__pycache__

9
requirements.txt Normal file
View file

@ -0,0 +1,9 @@
aiohappyeyeballs==2.6.1
aiohttp==3.13.3
aiosignal==1.4.0
attrs==25.4.0
frozenlist==1.8.0
idna==3.11
multidict==6.7.1
propcache==0.4.1
yarl==1.23.0

3761
src/api.json Normal file

File diff suppressed because it is too large Load diff

242
src/main copy.py Normal file
View file

@ -0,0 +1,242 @@
import asyncio
import base64
import binascii
import json
import time
from re import compile
from typing import Any, AsyncGenerator, Callable
import aiohttp
class Minecraft:
def __init__(
self,
host: str,
client_id: str,
client_secret: str,
*,
server_id: str | None = None,
server_name: str | None = None,
session: aiohttp.ClientSession | None = None,
poll_interval: float = 1.0,
timeout: float = 15.0,
verify_ssl: bool = True,
):
if not host:
raise ValueError("host is required")
if not client_id or not client_secret:
raise ValueError("client_id and client_secret are required")
if not server_id and not server_name:
raise ValueError("server_id or server_name is required")
self._base_url = host.rstrip("/")
self._client_id = client_id
self._client_secret = client_secret
self._server_id = server_id
self._server_name = server_name
self._poll_interval = poll_interval
self._ssl = None if verify_ssl else False
self._session = session or aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=timeout)
)
self._own_session = session is None
self._token: str | None = None
self._token_expires_at = 0.0
self._token_lock = asyncio.Lock()
self._hooks: list[Callable] = []
async def close(self):
if self._own_session and not self._session.closed:
await self._session.close()
async def command(self, command: str) -> str:
"Runs a command on the Server console."
if not command:
raise ValueError("command is required")
await self._ensure_server_id()
await self._request(
"POST",
f"/api/servers/{self._server_id}/console",
json_body=command,
)
return ""
async def logs_stream(self) -> AsyncGenerator[str, Any]:
"Streams the Server logs as a generator of strings."
await self._ensure_server_id()
# Start from "now" to avoid dumping the full backlog.
last_epoch = int(time.time() * 1_000_000)
while True:
epoch, lines = await self._fetch_logs(time_param=last_epoch)
if epoch is not None and epoch > last_epoch:
last_epoch = epoch
for line in lines:
yield line
await asyncio.sleep(self._poll_interval)
async def logs_tail(self, back: int = 10) -> AsyncGenerator[str, Any]:
"Returns the last `back` lines of the Server logs as a generator of strings."
if back <= 0:
return
yield # pragma: no cover - keeps this as an async generator
await self._ensure_server_id()
_, lines = await self._fetch_logs(time_param=0)
for line in lines[-back:]:
yield line
async def logs_tails(self, back: int = 10) -> AsyncGenerator[str, Any]:
"Alias for logs_tail (kept for backward compatibility)."
async for line in self.logs_tail(back=back):
yield line
def onConsoleLog(self, pattern: str | None = None) -> Callable:
"Registers a callback function to be called when a line matching the given pattern is logged."
def wrapper(func: Callable):
compiled = compile(pattern) if pattern else None
async def callback(line: str):
if compiled is None:
func(line)
return
match = compiled.fullmatch(line)
if match is None:
return
groups = match.groups()
if groups:
await func(*groups)
else:
await func(line)
self._hooks.append(callback)
return func
return wrapper
async def loop(self):
async for line in self.logs_stream():
for hook in self._hooks:
await hook(line)
async def _ensure_token(self) -> None:
async with self._token_lock:
if self._token and time.monotonic() < (self._token_expires_at - 30):
return
url = f"{self._base_url}/oauth2/token"
payload = {
"grant_type": "client_credentials",
"client_id": self._client_id,
"client_secret": self._client_secret,
}
async with self._session.post(url, data=payload, ssl=self._ssl) as resp:
data = await self._read_json(resp)
token = data.get("access_token")
if not token:
raise RuntimeError("Failed to obtain access token from PufferPanel")
expires_in = int(data.get("expires_in", 3600))
self._token = token
self._token_expires_at = time.monotonic() + max(expires_in, 0)
async def _ensure_server_id(self) -> None:
if self._server_id:
return
if not self._server_name:
raise RuntimeError("server_id is not set and server_name is missing")
data = await self._request_json("GET", "/api/servers")
servers = data.get("servers", [])
for server in servers:
if server.get("name") == self._server_name:
self._server_id = server.get("id")
break
if not self._server_id:
raise RuntimeError(f"Server named {self._server_name} was not found")
async def _request(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
json_body: Any | None = None,
) -> None:
await self._ensure_token()
url = f"{self._base_url}{path}"
headers = {"Authorization": f"Bearer {self._token}"}
async with self._session.request(
method,
url,
params=params,
json=json_body,
headers=headers,
ssl=self._ssl,
) as resp:
if resp.status >= 400:
body = await resp.text()
raise RuntimeError(
f"PufferPanel API error {resp.status} for {method} {path}: {body}"
)
async def _request_json(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
json_body: Any | None = None,
) -> dict[str, Any]:
await self._ensure_token()
url = f"{self._base_url}{path}"
headers = {"Authorization": f"Bearer {self._token}"}
async with self._session.request(
method,
url,
params=params,
json=json_body,
headers=headers,
ssl=self._ssl,
) as resp:
return await self._read_json(resp)
async def _read_json(self, resp: aiohttp.ClientResponse) -> dict[str, Any]:
text = await resp.text()
if resp.status >= 400:
raise RuntimeError(
f"PufferPanel API error {resp.status} for {resp.request_info.method} "
f"{resp.request_info.url}: {text}"
)
if not text:
return {}
try:
return json.loads(text)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Invalid JSON response: {text}") from exc
async def _fetch_logs(self, *, time_param: int) -> tuple[int | None, list[str]]:
data = await self._request_json(
"GET",
f"/api/servers/{self._server_id}/console",
params={"time": time_param},
)
epoch = data.get("epoch")
raw_logs = data.get("logs") or ""
decoded = self._decode_logs(raw_logs)
lines = decoded.splitlines() if decoded else []
return epoch, lines
def _decode_logs(self, raw_logs: str) -> str:
if not raw_logs:
return ""
try:
decoded = base64.b64decode(raw_logs)
except (binascii.Error, ValueError):
return raw_logs
try:
return decoded.decode("utf-8", errors="replace")
except Exception:
return decoded.decode(errors="replace")

90
src/main.py Normal file
View file

@ -0,0 +1,90 @@
import binascii
from asyncio import Lock
from base64 import b64decode
from time import monotonic, time
from typing import Any, Callable, Literal
from aiohttp import ClientSession
class Minecraft:
def __init__(self, host: str, client_id: str, client_secret: str, server_id: str):
self.host = host.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.server_id = server_id
self.session = ClientSession()
self.token: str | None = None
self.token_expiry: float = 0.0
self.token_lock = Lock()
self.hooks: list[Callable] = []
async def close(self):
if not self.session.closed:
await self.session.close()
async def _request(
self,
method: Literal["GET", "POST"],
path: str,
params: dict[str, Any] | None = None,
body: Any | None = None,
) -> Any:
async with self.token_lock:
if self.token and monotonic() < self.token_expiry - 30:
return
url = f"{self.host}/oauth2/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
async with self.session.post(url, data=payload) as response:
if response.status == 200:
data = await response.json()
else:
raise RuntimeError("Error")
token = data.get("access_token")
if not token:
raise RuntimeError("Error")
expires_in = int(data.get("expires_in", 3600))
self.token = token
self.token_expiry = monotonic() + expires_in
url = f"{self.host}{path}"
headers = {"Authorization": f"Bearer {self.token}"}
async with self.session.request(
method, url, params=params, json=body, headers=headers
) as response:
return await response.json()
async def _fetch_logs(self, last_epoch: int) -> AsyncGenerator[str, None]:
data = await self._request(
"GET", f"/api/servers/{self.server_id}/console", params={"time": last_epoch}
)
epoch = data.get("epoch")
raw_logs = data.get("logs", "")
try:
...
except (binascii.Error, ValueError):
decoded = raw_logs
async def command(self, command: str) -> str:
await self._request(
"POST", f"/api/servers/{self.server_id}/console", body=command
)
return "" # TODO: return command output
async def logs_stream(self) -> AsyncGenerator[str, None]:
last_epoch = int(time() * 1_000_000)
while True:
...

62
test.py Normal file
View file

@ -0,0 +1,62 @@
import asyncio
import os
import sys
from pathlib import Path
def load_env_file(path: Path) -> None:
if not path.exists():
return
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
os.environ.setdefault(key, value)
async def main() -> None:
root = Path(__file__).resolve().parent
load_env_file(root / ".env")
sys.path.insert(0, str(root / "src"))
from src.main import Minecraft
host = os.environ.get("SERVER_HOST")
client_id = os.environ.get("PUFFERPANEL_CLIENT_ID")
client_secret = os.environ.get("PUFFERPANEL_CLIENT_SECRET")
if not host or not client_id or not client_secret:
raise RuntimeError(
"Missing SERVER_HOST / PUFFERPANEL_CLIENT_ID / PUFFERPANEL_CLIENT_SECRET"
)
mc = Minecraft(
host=host,
client_id=client_id,
client_secret=client_secret,
server_name="Retards Server",
poll_interval=0.5,
)
try:
await mc.command("say hi from test.py")
tail = [line async for line in mc.logs_tail(5)]
print("tail (last 5 lines):")
for line in tail:
print(line)
print("streaming next log line...")
async for line in mc.logs_stream():
print("stream:", line)
break
finally:
await mc.close()
if __name__ == "__main__":
asyncio.run(main())