import asyncio import atexit from json import loads from time import time from typing import Callable from uuid import uuid4 from aiohttp import ClientSession from sseclient import SSEClient class User: def __init__( self, name: str, number: str, id: str, first_nickname: str | None = None, last_nickname: str | None = None, note: str | None = None, ): """Creates a new User. Args: name: The User's username. number: The User's phone number. id: The User's Signal UUID. first_nickname: last_nickname: note: """ self.name = name self.number = number self.id = id self.first_nickname = first_nickname self.last_nickname = last_nickname self.note = note def __repr__(self) -> str: return f'' class Group: def __init__(self, name: str, id: str): """Creates a new Group. Args: name: the Group's name. id: the Group's Signal UUID. """ self.name = name self.id = id def __repr__(self) -> str: return f'' class ReceivedMessage: def __init__(self, message: str, user: User, group: Group | None): """Creates a new ReceivedMessage. Args: message: The Message's body. user: The User that sent the Message. group: The Group where the message was sent on, or None for a direct Message. """ self.message = message self.user = user self.group = group def __repr__(self) -> str: return f'' class Signal: def __init__(self, host: str, port: int): self.url = host.rstrip("/") + ":" + str(port) self.session = ClientSession() atexit.register(self.exit) self._hooks: dict[str, Callable] = {} self.usersCache = {} self.usersCacheTime = 0 def exit(self): asyncio.run(self.session.close()) async def _post(self, method: str, **params): async with self.session.post( self.url + "/api/v1/rpc", json={ "jsonrpc": "2.0", "method": method, "params": params, "id": str(uuid4()), }, ) as response: if response.status == 200: return await response.json() raise ConnectionError( f"Error while sending {method} to Server: HTTP code {response.status}." ) async def sendMessage(self, message: str, recipient: User | Group): if isinstance(recipient, User): return await self._post("send", message=message, recipient=recipient.id) if isinstance(recipient, Group): return await self._post("send", message=message, groupId=recipient.id) async def listGroups(self) -> list[Group]: return [ Group(group["name"], group["id"]) for group in (await self._post("listGroups"))["result"] ] async def listUsers(self) -> list[User]: return [ User( user["name"], user["number"], user["uuid"], user["nickGivenName"], user["nickFamilyName"], user["note"], ) for user in (await self._post("listContacts"))["result"] ] async def getUser( self, *, number: str | None = None, id: str | None = None, first_nickname: str | None = None, last_nickname: str | None = None, ) -> User | None: for user in await self.listUsers(): if number and user.number == number: return user if id and user.id == id: return user if first_nickname and user.first_nickname == first_nickname: return user if last_nickname and user.last_nickname == last_nickname: return user return None def onMessage(self, group: Group | None = None) -> Callable: def wrapper(func: Callable): async def callback(data: dict): envelope = data.get("envelope", {}) dataMessage = envelope.get("dataMessage", {}) groupInfo = dataMessage.get("groupInfo", {}) msg_user = User( envelope.get("sourceName"), envelope.get("sourceNumber"), envelope.get("sourceUuid"), ) msg_group = None if groupInfo: msg_group = Group( groupInfo.get("groupName"), groupInfo.get("groupId") ) msg_body = envelope.get("dataMessage", {}).get("message") if msg_body is not None: if group is None or (msg_group and msg_group.id == group.id): await func( ReceivedMessage( msg_body, (await self.getUser(number=msg_user.number)) or msg_user, msg_group, ) ) self._hooks[func.__name__] = callback return func return wrapper def onMessageRaw(self) -> Callable: def wrapper(func: Callable): async def callback(data: dict): await func(data) self._hooks[func.__name__] = callback return func return wrapper async def loop(self): for msg in SSEClient(self.url + "/api/v1/events"): try: data = loads(msg.data) except Exception: data = {} for callback in self._hooks.values(): await callback(data)