import asyncio from json import loads from typing import Callable from uuid import uuid4 from aiohttp import ClientSession from config import Config from sseclient import SSEClient class User: def __init__( self, id: str, number: str | None = None, name: str | None = None, first_name: str | None = None, last_name: str | None = None, nickname: str | None = None, first_nickname: str | None = None, last_nickname: str | None = None, note: str | None = None, about: str | None = None, emoji: str | None = None, profile_first_name: str | None = None, profile_last_name: str | None = None, username: str | None = None, ): self.id = id self.number = number self.name = name self.first_name = first_name self.last_name = last_name self.nickname = nickname self.first_nickname = first_nickname self.last_nickname = last_nickname self.note = note self.about = about self.emoji = emoji self.profile_first_name = profile_first_name self.profile_last_name = profile_last_name self.username = username def __repr__(self) -> str: return f'' class Group: def __init__( self, id: str, name: str | None = None, description: str | None = None, members: list[User] = [], admins: list[User] = [], banned: list[User] = [], ): self.id = id self.name = name self.description = description self.members = members self.admins = admins self.banned = banned def __repr__(self) -> str: return f'' class ReceivedMessage: def __init__(self, message: str, user: User, group: Group | None): """Creates a new ReceivedMessage. Args: message: The Message's body. user: The User that sent the Message. group: The Group where the message was sent on, or None for a direct Message. """ self.message = message self.user = user self.group = group def __repr__(self) -> str: group = ( ' on "' + (self.group.name or self.group.id) + '"' if self.group and self.group else "" ) return f'' class MessageMention: ... class MessageStyles: ... class Message: def __init__( self, recipient: User | Group, sticker: str | None = None, mentions: list[MessageMention] = [], styles: list[MessageStyles] = [], view_once: bool = False, ): ... class Signal: def __init__(self, host: str, port: int): self.url = host.rstrip("/") + ":" + str(port) self._hooks: dict[str, Callable] = {} async def _post(self, method: str, **params): async with ClientSession() as session: async with session.post( self.url + "/api/v1/rpc", json={ "jsonrpc": "2.0", "method": method, "params": params, "id": str(uuid4()), }, ) as response: if response.status == 200: return await response.json() raise ConnectionError( f"Error while sending {method} to Server: HTTP code {response.status}." ) async def sendMessage(self, message: str, recipient: User | Group): if isinstance(recipient, User): return await self._post("send", message=message, recipient=recipient.id) if isinstance(recipient, Group): return await self._post("send", message=message, groupId=recipient.id) async def listGroups(self) -> list[Group]: return [ Group( id=group["id"], name=group["name"], description=group["description"], # members=[self.getUser(member["id"]) for member in group["members"]], # admins=[self.getUser(admin["id"]) for admin in group["admins"]], # banned=[self.getUser(ban["id"]) for ban in group["banned"]], ) for group in (await self._post("listGroups"))["result"] ] async def listUsers(self) -> list[User]: return [ User( id=user["uuid"], number=user["number"], name=user["name"], first_name=user["givenName"], last_name=user["familyName"], nickname=user["nickName"], first_nickname=user["nickGivenName"], last_nickname=user["nickFamilyName"], note=user["note"], about=user["profile"]["about"], emoji=user["profile"]["aboutEmoji"], profile_first_name=user["profile"]["familyName"], profile_last_name=user["profile"]["givenName"], username=user["username"], ) for user in (await self._post("listContacts"))["result"] ] def onMessage( self, *, user: list[User] | User | None = None, group: list[Group] | Group | None = None, ) -> Callable: def wrapper(func: Callable): async def callback(data: dict): envelope = data.get("envelope", {}) dataMessage = envelope.get("dataMessage", {}) groupInfo = dataMessage.get("groupInfo", {}) msg_user = User( envelope.get("sourceName"), envelope.get("sourceNumber"), envelope.get("sourceUuid"), ) msg_group = None if groupInfo: msg_group = Group( groupInfo.get("groupName"), groupInfo.get("groupId") ) msg = None msg_body = envelope.get("dataMessage", {}).get("message") if msg_body is not None: msg = ReceivedMessage(msg_body, msg_user, msg_group) if not user and not group: return func(msg) if isinstance(user, User) and user.id == msg_user.id: return func(msg) if ( isinstance(group, Group) and msg_group and group.id == msg_group.id ): return func(msg) if isinstance(user, list): for usr in user: if usr.id == msg_user.id: return func(msg) if isinstance(group, list) and msg_group: for grp in group: if grp.id == msg_group.id: return func(msg) self._hooks[func.__name__] = callback return func return wrapper def onMessageRaw(self) -> Callable: def wrapper(func: Callable): async def callback(data: dict): func(data) self._hooks[func.__name__] = callback return func return wrapper async def loop(self): for msg in SSEClient(self.url + "/api/v1/events"): try: data = loads(msg.data.encode("latin1").decode()) except Exception: data = {} for callback in self._hooks.values(): await callback(data) signal = Signal(Config.SIGNAL_HOST, Config.SIGNAL_PORT) user = asyncio.run(signal.listGroups())[1] print(user.__dict__)