bot/libsignal.py
2025-12-14 16:44:45 +01:00

203 lines
6.1 KiB
Python

import asyncio
import atexit
from json import loads
from time import time
from typing import Callable
from uuid import uuid4
from aiohttp import ClientSession
from sseclient import SSEClient
class User:
def __init__(
self,
name: str,
number: str,
id: str,
first_nickname: str | None = None,
last_nickname: str | None = None,
note: str | None = None,
):
"""Creates a new User.
Args:
name: The User's username.
number: The User's phone number.
id: The User's Signal UUID.
first_nickname:
last_nickname:
note:
"""
self.name = name
self.number = number
self.id = id
self.first_nickname = first_nickname
self.last_nickname = last_nickname
self.note = note
def __repr__(self) -> str:
return f'<User "{self.name}" at "{self.number}">'
class Group:
def __init__(self, name: str, id: str):
"""Creates a new Group.
Args:
name: the Group's name.
id: the Group's Signal UUID.
"""
self.name = name
self.id = id
def __repr__(self) -> str:
return f'<Group "{self.name}" at "{self.id}">'
class ReceivedMessage:
def __init__(self, message: str, user: User, group: Group | None):
"""Creates a new ReceivedMessage.
Args:
message: The Message's body.
user: The User that sent the Message.
group: The Group where the message was sent on, or None for a direct Message.
"""
self.message = message
self.user = user
self.group = group
def __repr__(self) -> str:
group = ' on "' + self.group.name + '"' if self.group else ""
return f'<Message "{self.message}" by "{self.user.name}"{group}>'
class Signal:
def __init__(self, host: str, port: int):
self.url = host.rstrip("/") + ":" + str(port)
self.session = ClientSession()
atexit.register(self.exit)
self._hooks: dict[str, Callable] = {}
self.usersCache = {}
self.usersCacheTime = 0
def exit(self):
asyncio.run(self.session.close())
async def _post(self, method: str, **params):
async with self.session.post(
self.url + "/api/v1/rpc",
json={
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": str(uuid4()),
},
) as response:
if response.status == 200:
return await response.json()
raise ConnectionError(
f"Error while sending {method} to Server: HTTP code {response.status}."
)
async def sendMessage(self, message: str, recipient: User | Group):
if isinstance(recipient, User):
return await self._post("send", message=message, recipient=recipient.id)
if isinstance(recipient, Group):
return await self._post("send", message=message, groupId=recipient.id)
async def listGroups(self) -> list[Group]:
return [
Group(group["name"], group["id"])
for group in (await self._post("listGroups"))["result"]
]
async def listUsers(self) -> list[User]:
return [
User(
user["name"],
user["number"],
user["uuid"],
user["nickGivenName"],
user["nickFamilyName"],
user["note"],
)
for user in (await self._post("listContacts"))["result"]
]
async def getUser(
self,
*,
number: str | None = None,
id: str | None = None,
first_nickname: str | None = None,
last_nickname: str | None = None,
) -> User | None:
for user in await self.listUsers():
if number and user.number == number:
return user
if id and user.id == id:
return user
if first_nickname and user.first_nickname == first_nickname:
return user
if last_nickname and user.last_nickname == last_nickname:
return user
return None
def onMessage(self, group: Group | None = None) -> Callable:
def wrapper(func: Callable):
async def callback(data: dict):
envelope = data.get("envelope", {})
dataMessage = envelope.get("dataMessage", {})
groupInfo = dataMessage.get("groupInfo", {})
msg_user = User(
envelope.get("sourceName"),
envelope.get("sourceNumber"),
envelope.get("sourceUuid"),
)
msg_group = None
if groupInfo:
msg_group = Group(
groupInfo.get("groupName"), groupInfo.get("groupId")
)
msg_body = envelope.get("dataMessage", {}).get("message")
if msg_body is not None:
if group is None or (msg_group and msg_group.id == group.id):
await func(
ReceivedMessage(
msg_body,
(await self.getUser(number=msg_user.number))
or msg_user,
msg_group,
)
)
self._hooks[func.__name__] = callback
return func
return wrapper
def onMessageRaw(self) -> Callable:
def wrapper(func: Callable):
async def callback(data: dict):
await func(data)
self._hooks[func.__name__] = callback
return func
return wrapper
async def loop(self):
for msg in SSEClient(self.url + "/api/v1/events"):
try:
data = loads(msg.data)
except Exception:
data = {}
for callback in self._hooks.values():
await callback(data)