import asyncio from json import loads from pathlib import Path from typing import Callable, Literal from uuid import uuid4 from aiohttp import ClientSession from config import Config class User: def __init__( self, id: str, number: str | None = None, name: str | None = None, first_name: str | None = None, last_name: str | None = None, nickname: str | None = None, first_nickname: str | None = None, last_nickname: str | None = None, note: str | None = None, about: str | None = None, emoji: str | None = None, profile_first_name: str | None = None, profile_last_name: str | None = None, username: str | None = None, ): self.id = id self.number = number self.name = name self.first_name = first_name self.last_name = last_name self.nickname = nickname self.first_nickname = first_nickname self.last_nickname = last_nickname self.note = note self.about = about self.emoji = emoji self.profile_first_name = profile_first_name self.profile_last_name = profile_last_name self.username = username @classmethod def self(cls): return cls("", number="+393406838100") def __repr__(self) -> str: return f'' class Group: def __init__( self, id: str, name: str | None = None, description: str | None = None, members: list[User] = [], admins: list[User] = [], banned: list[User] = [], ): self.id = id self.name = name self.description = description self.members = members self.admins = admins self.banned = banned def __repr__(self) -> str: return f'' class Sticker: def __init__(self, pack_id: str, id: int): self.pack_id = pack_id self.id = id class MessageMention: def __init__(self, start: int, length: int, recipient: User): self.start = start self.length = length self.recipient = recipient def get(self) -> str: return ( f"{self.start}:{self.length}:{self.recipient.number or self.recipient.id}" ) class MessageStyle: def __init__( self, start: int, length: int, style: Literal["BOLD", "ITALIC", "SPOILER", "STRIKETHROUGH", "MONOSPACE"], ): self.start = start self.length = length self.style = style def get(self) -> str: return f"{self.start}:{self.length}:{self.style}" class Message: def __init__( self, text: str | None = None, user: User | None = None, group: Group | None = None, timestamp: int | None = None, attachments: list[Path] = [], view_once: bool = False, # TODO: fix sticker: Sticker | None = None, mentions: list[MessageMention] = [], styles: list[MessageStyle] = [], quote: "Message | Poll | None" = None, edit: "Message | None" = None, ): self.text = text self.user = user self.group = group self.timestamp = timestamp self.attachments = attachments self.view_once = view_once self.sticker = sticker self.mentions = mentions self.styles = styles self.quote = quote self.edit = edit class Poll: def __init__( self, question: str, options: list[str], user: User | None = None, group: Group | None = None, timestamp: int | None = None, multiple: bool = True, ): self.question = question self.options = options self.user = user self.group = group self.timestamp = timestamp self.multiple = multiple class Signal: def __init__(self, host: str, port: int): self.url = host.rstrip("/") + ":" + str(port) self._hooks: list[Callable] = [] async def _post(self, method: str, **params): async with ClientSession() as session: async with session.post( self.url + "/api/v1/rpc", json={ "jsonrpc": "2.0", "method": method, "params": params, "id": str(uuid4()), }, ) as response: if response.status == 200: return await response.json() raise ConnectionError( f"Error while sending {method} to Server: HTTP code {response.status}." ) async def sendMessage(self, message: Message, recipient: User | Group): params: dict = {} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id if message.attachments: params["attachment"] = [ str(file.absolute()) for file in message.attachments ] if message.view_once: params["viewOnce"] = message.view_once if message.text: params["message"] = message.text if message.mentions: params["mention"] = [mention.get() for mention in message.mentions] if message.styles: params["textStyle"] = [style.get() for style in message.styles] if message.sticker: params["sticker"] = message.sticker.pack_id + ":" + str(message.sticker.id) if message.quote: if message.quote.timestamp: params["quoteTimestamp"] = message.quote.timestamp if message.quote.user: params["quoteAuthor"] = message.quote.user.id if isinstance(message.quote, Message): if message.quote.mentions: params["quoteMention"] = [ mention.get() for mention in message.quote.mentions ] if message.quote.styles: params["quoteTextStyle"] = [ style.get() for style in message.quote.styles ] if message.quote.attachments: params["quoteAttachment"] = [ str(file.absolute()) for file in message.quote.attachments ] if message.edit: params["editTimestamp"] = message.edit.timestamp result = await self._post("send", **params) timestamp = result.get("result", {}).get("timestamp") if timestamp: message.timestamp = timestamp message.user = User.self() return result async def sendReaction(self, emoji: str, message: Message, recipient: User | Group): params: dict = {"emoji": emoji} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id if message.user: params["targetAuthor"] = message.user.id if message.timestamp: params["targetTimestamp"] = message.timestamp return await self._post("sendReaction", **params) async def startTyping(self, recipient: User | Group): params: dict = {} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id return await self._post("sendTyping", **params) async def stopTyping(self, recipient: User | Group): params: dict = {"stop": True} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id return await self._post("sendTyping", **params) async def deleteMessage(self, message: Message, recipient: User | Group): params: dict = {} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id params["targetTimestamp"] = message.timestamp return await self._post("remoteDelete", **params) # TODO: implement polls """ async def sendPoll(self, poll: Poll, recipient: User | Group): params: dict = {} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id params["question"] = poll.question params["options"] = poll.options if not poll.multiple: params["noMulti"] = not poll.multiple result = await self._post("sendPollCreate", **params) timestamp = result.get("result", {}).get("timestamp") if timestamp: poll.timestamp = timestamp poll.user = User.self() return result """ async def listGroups(self) -> list[Group]: return [ Group( id=group["id"], name=group["name"], description=group["description"], members=[ await self.getUser(member["uuid"]) for member in group["members"] ], admins=[await self.getUser(admin["uuid"]) for admin in group["admins"]], banned=[await self.getUser(ban["uuid"]) for ban in group["banned"]], ) for group in (await self._post("listGroups"))["result"] ] async def listUsers(self) -> list[User]: return [ User( id=user["uuid"], number=user["number"], name=user["name"], first_name=user["givenName"], last_name=user["familyName"], nickname=user["nickName"], first_nickname=user["nickGivenName"], last_nickname=user["nickFamilyName"], note=user["note"], about=user["profile"]["about"], emoji=user["profile"]["aboutEmoji"], profile_first_name=user["profile"]["familyName"], profile_last_name=user["profile"]["givenName"], username=user["username"], ) for user in (await self._post("listContacts"))["result"] ] async def getGroup(self, id: str) -> Group: for group in await self.listGroups(): if group.id == id: return group return Group(id) async def getUser(self, id: str) -> User: for user in await self.listUsers(): if user.id == id: return user return User(id) def onMessage( self, *, user: list[User] | User | None = None, group: list[Group] | Group | None = None, ) -> Callable: def wrapper(func: Callable): async def callback(data: dict): envelope = data.get("envelope", {}) if envelope and envelope.get("dataMessage"): dataMessage = envelope["dataMessage"] message: str | None = dataMessage["message"] groupId: str | None = None if dataMessage.get("groupInfo"): groupId = dataMessage["groupInfo"]["groupId"] sticker: Sticker | None = None if dataMessage.get("sticker"): s = dataMessage["sticker"] sticker = Sticker(s["packId"], s["stickerId"]) # attachments # contacts viewOnce: bool = dataMessage["viewOnce"] sourceUuid: str = envelope["sourceUuid"] timestamp: int = envelope["timestamp"] msg_user: User = await self.getUser(sourceUuid) msg_group: Group | None = ( (await self.getGroup(groupId)) if groupId else None ) msg = Message( text=message, user=msg_user, group=msg_group, timestamp=timestamp, sticker=sticker, view_once=viewOnce, ) await self._post( "sendReceipt", recipient=msg_user.id, targetTimestamp=timestamp, type="viewed", ) if not user and not group: return await func(msg) if isinstance(user, User) and user.id == msg_user.id: return await func(msg) if ( isinstance(group, Group) and msg_group and group.id == msg_group.id ): return await func(msg) if isinstance(user, list): for usr in user: if usr.id == msg_user.id: return await func(msg) if isinstance(group, list) and msg_group: for grp in group: if grp.id == msg_group.id: return await func(msg) self._hooks.append(callback) return func return wrapper def onMessageRaw(self) -> Callable: def wrapper(func: Callable): async def callback(data: dict): await func(data) self._hooks.append(callback) return func return wrapper async def loop(self): async with ClientSession() as session: async with session.get(self.url + "/api/v1/events") as response: if response.status == 200: async for msg in response.content: msg_data = msg.decode().strip() if msg_data.startswith("data"): try: data = loads(msg_data.lstrip("data:")) except Exception: data = {} for callback in self._hooks: asyncio.create_task(callback(data))