minecraftd/main.py
2025-11-30 15:47:45 +01:00

120 lines
3.2 KiB
Python

from asyncio import create_task
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.responses import StreamingResponse
from .controllers import Controllers
from .models import Models
from .responses import Responses
from .util import check_password, stop_server
app = FastAPI()
@app.get("/start")
async def start() -> Responses.StartResponse:
"Starts the Server process."
if Controllers.process.is_started():
return {"status": "running"}
Controllers.process.start()
return {"status": "started"}
@app.get("/status")
async def status() -> Responses.StatusResponse:
"Checks whether the Server is running and returns its information."
if not Controllers.process.is_started():
# Crashed
if Controllers.process.is_crashed():
return {"status": "crashed"}
# Maintainance
if Controllers.maintainance.is_set():
return {"status": "maintainance", "reason": Controllers.maintainance.get()}
# Offline
return {"status": "offline"}
status = Controllers.server.status()
# Starting
if not status["online"]:
return {"status": "starting"}
# Online
return {
"status": "online",
"motd": status.get("motd", ""),
"icon": status.get("icon", None),
"players": status.get(
"players",
{
"online": 0,
"max": 20,
"list": [],
},
),
}
@app.get("/stop")
async def stop(data: Models.StopModel, authorization: Annotated[str, Header()]):
"Stops the Server."
check_password(authorization)
create_task(stop_server("STOPPING", data.countdown, data.reason, data.timeout))
@app.get("/restart")
async def restart(data: Models.RestartModel, authorization: Annotated[str, Header()]):
"Restarts the Server."
check_password(authorization)
create_task(
stop_server(
"RESTARTING",
data.countdown,
data.reason,
data.timeout,
Controllers.process.start,
)
)
@app.get("/maintainance")
async def maintainance(
data: Models.MaintainanceModel, authorization: Annotated[str, Header()]
):
"Stops the Server and sets it to maintainance status."
check_password(authorization)
create_task(
stop_server(
"STOPPING FOR MAINTAINANCE",
data.countdown,
data.reason,
data.timeout,
Controllers.maintainance.set(data.reason),
)
)
@app.get("/command")
async def command(
data: Models.CommandModel, authorization: Annotated[str, Header()]
) -> str:
"Runs a command on the Server and returns its output."
check_password(authorization)
return Controllers.server.command(data.command)
@app.get("/logs/stream")
async def logs_stream(authorization: Annotated[str, Header()]) -> StreamingResponse:
check_password(authorization)
return StreamingResponse(Controllers.logs.stream(), media_type="text/event-stream")
@app.get("/logs/tail")
async def logs_tail(
data: Models.LogsTailModel, authorization: Annotated[str, Header()]
) -> StreamingResponse:
check_password(authorization)
return StreamingResponse(Controllers.logs.tail(data.back))