minecraftd/main.py

122 lines
3.4 KiB
Python

from asyncio import create_task
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.responses import StreamingResponse
from .classes import ProcessStatus
from .controllers import Controllers
from .models import Models
from .responses import Responses
from .util import check_password, stop_server
app = FastAPI()
@app.get("/start")
async def start() -> Responses.StartResponse:
"Starts the Server process."
if Controllers.process.status() == ProcessStatus.RUNNING:
return {"status": "running"}
Controllers.process.start()
return {"status": "started"}
@app.get("/status")
async def status() -> Responses.StatusResponse:
"Checks whether the Server is running and returns its information."
process_status = Controllers.process.status()
if process_status != ProcessStatus.RUNNING:
# Crashed
if process_status == ProcessStatus.CRASHED:
return {"status": "crashed"}
# Maintainance
if Controllers.maintainance.is_set():
return {"status": "maintainance", "reason": Controllers.maintainance.get()}
# Offline
return {"status": "offline"}
server_status = Controllers.server.status()
# Starting
if not server_status["online"]:
return {"status": "starting"}
# Online
return {
"status": "online",
"motd": server_status.get("motd", ""),
"icon": server_status.get("icon", None),
"players": server_status.get(
"players",
{
"online": 0,
"max": 20,
"list": [],
},
),
}
@app.get("/stop")
async def stop(data: Models.StopModel, authorization: Annotated[str, Header()]):
"Stops the Server."
check_password(authorization)
create_task(stop_server("STOPPING", data.countdown, data.reason, data.timeout))
@app.get("/restart")
async def restart(data: Models.RestartModel, authorization: Annotated[str, Header()]):
"Restarts the Server."
check_password(authorization)
create_task(
stop_server(
"RESTARTING",
data.countdown,
data.reason,
data.timeout,
Controllers.process.start,
)
)
@app.get("/maintainance")
async def maintainance(
data: Models.MaintainanceModel, authorization: Annotated[str, Header()]
):
"Stops the Server and sets it to maintainance status."
check_password(authorization)
create_task(
stop_server(
"STOPPING FOR MAINTAINANCE",
data.countdown,
data.reason,
data.timeout,
Controllers.maintainance.set(data.reason),
)
)
@app.get("/command")
async def command(
data: Models.CommandModel, authorization: Annotated[str, Header()]
) -> str:
"Runs a command on the Server and returns its output."
check_password(authorization)
return Controllers.server.command(data.command)
@app.get("/logs/stream")
async def logs_stream(authorization: Annotated[str, Header()]) -> StreamingResponse:
check_password(authorization)
return StreamingResponse(Controllers.logs.stream(), media_type="text/event-stream")
@app.get("/logs/tail")
async def logs_tail(
data: Models.LogsTailModel, authorization: Annotated[str, Header()]
) -> StreamingResponse:
check_password(authorization)
return StreamingResponse(Controllers.logs.tail(data.back))