import asyncio import logging from json import loads from pathlib import Path from typing import Any, Callable, Literal from uuid import uuid4 from aiohttp import ClientSession logger = logging.getLogger(__name__) class User: def __init__( self, id: str, number: str | None = None, name: str | None = None, first_name: str | None = None, last_name: str | None = None, nickname: str | None = None, first_nickname: str | None = None, last_nickname: str | None = None, note: str | None = None, about: str | None = None, emoji: str | None = None, profile_first_name: str | None = None, profile_last_name: str | None = None, username: str | None = None, ): self.id = id self.number = number self.name = name self.first_name = first_name self.last_name = last_name self.nickname = nickname self.first_nickname = first_nickname self.last_nickname = last_nickname self.note = note self.about = about self.emoji = emoji self.profile_first_name = profile_first_name self.profile_last_name = profile_last_name self.username = username @classmethod def self(cls, number: str | None = None): """Create a User instance representing the current user.""" return cls("", number=number) def __repr__(self) -> str: return f'' class Group: def __init__( self, id: str, name: str | None = None, description: str | None = None, members: list[User] | None = None, admins: list[User] | None = None, banned: list[User] | None = None, ): self.id = id self.name = name self.description = description self.members = members or [] self.admins = admins or [] self.banned = banned or [] def __repr__(self) -> str: return f'' class Sticker: def __init__(self, pack_id: str, id: int): self.pack_id = pack_id self.id = id class MessageMention: def __init__(self, start: int, length: int, recipient: User): self.start = start self.length = length self.recipient = recipient @classmethod def newMessage(cls, recipient: User): return Message("X", mentions=[cls(0, 1, recipient)]) def get(self) -> str: return ( f"{self.start}:{self.length}:{self.recipient.number or self.recipient.id}" ) class MessageStyle: def __init__( self, start: int, length: int, style: Literal["BOLD", "ITALIC", "SPOILER", "STRIKETHROUGH", "MONOSPACE"], ): self.start = start self.length = length self.style = style def get(self) -> str: return f"{self.start}:{self.length}:{self.style}" class Message: def __init__( self, text: str | None = None, user: User | None = None, group: Group | None = None, timestamp: int | None = None, attachments: list[Path] | None = None, view_once: bool = False, sticker: Sticker | None = None, mentions: list[MessageMention] | None = None, styles: list[MessageStyle] | None = None, quote: "Message | Poll | None" = None, edit: "Message | None" = None, ): self.text = text self.user = user self.group = group self.timestamp = timestamp self.attachments = attachments or [] self.view_once = view_once self.sticker = sticker self.mentions = mentions or [] self.styles = styles or [] self.quote = quote self.edit = edit def __add__(self, b) -> "Message": a = Message( text=self.text, user=self.user, group=self.group, timestamp=self.timestamp, attachments=self.attachments.copy(), view_once=self.view_once, sticker=self.sticker, mentions=self.mentions.copy(), styles=self.styles.copy(), quote=self.quote, edit=self.edit, ) if isinstance(b, Message): a.text = a.text or "" offset = len(a.text) for mention in b.mentions: a.mentions.append( MessageMention( mention.start + offset, mention.length, mention.recipient, ), ) if b.text: a.text += b.text return a class Poll: def __init__( self, question: str, options: list[str], user: User | None = None, group: Group | None = None, timestamp: int | None = None, multiple: bool = True, ): self.question = question self.options = options self.user = user self.group = group self.timestamp = timestamp self.multiple = multiple class Signal: def __init__(self, host: str, port: int, self_number: str | None = None): self.url = host.rstrip("/") + ":" + str(port) self._hooks: list[Callable] = [] self._session: ClientSession | None = None self._self_number = self_number async def __aenter__(self): self._session = ClientSession() return self async def __aexit__(self, *args): if self._session: await self._session.close() async def _get_session(self) -> ClientSession: """Get or create a session for API calls.""" if self._session is None: self._session = ClientSession() return self._session async def _post(self, method: str, **params) -> dict[str, Any]: """Make a JSON-RPC POST request to the Signal API.""" session = await self._get_session() try: async with session.post( self.url + "/api/v1/rpc", json={ "jsonrpc": "2.0", "method": method, "params": params, "id": str(uuid4()), }, ) as response: if response.status == 200: result = await response.json() if "error" in result: raise ValueError(f"API error: {result['error']}") return result text = await response.text() raise ConnectionError( f"Error while sending {method} to Server: HTTP {response.status}. {text}" ) except Exception as e: logger.error(f"Failed to call {method}: {e}") raise async def sendMessage(self, message: Message, recipient: User | Group) -> dict[str, Any]: """Send a message to a user or group.""" if not message.text and not message.attachments and not message.sticker: raise ValueError("Message must have text, attachments, or sticker") params: dict = {} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id if message.attachments: params["attachment"] = [ str(file.absolute()) for file in message.attachments ] if message.view_once: params["viewOnce"] = message.view_once if message.text: params["message"] = message.text if message.mentions: params["mention"] = [mention.get() for mention in message.mentions] if message.styles: params["textStyle"] = [style.get() for style in message.styles] if message.sticker: params["sticker"] = message.sticker.pack_id + ":" + str(message.sticker.id) if message.quote: if message.quote.timestamp: params["quoteTimestamp"] = message.quote.timestamp if message.quote.user: params["quoteAuthor"] = message.quote.user.id if isinstance(message.quote, Message): if message.quote.mentions: params["quoteMention"] = [ mention.get() for mention in message.quote.mentions ] if message.quote.styles: params["quoteTextStyle"] = [ style.get() for style in message.quote.styles ] if message.quote.attachments: params["quoteAttachment"] = [ str(file.absolute()) for file in message.quote.attachments ] if message.edit: params["editTimestamp"] = message.edit.timestamp result = await self._post("send", **params) timestamp = result.get("result", {}).get("timestamp") if timestamp: message.timestamp = timestamp message.user = User.self(self._self_number) return result async def sendReaction(self, emoji: str, message: Message, recipient: User | Group) -> dict[str, Any]: """Send a reaction emoji to a message.""" params: dict = {"emoji": emoji} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id if message.user: params["targetAuthor"] = message.user.id if message.timestamp: params["targetTimestamp"] = message.timestamp return await self._post("sendReaction", **params) async def startTyping(self, recipient: User | Group) -> dict[str, Any]: """Start typing indicator for a recipient.""" params: dict = {} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id return await self._post("sendTyping", **params) async def stopTyping(self, recipient: User | Group) -> dict[str, Any]: """Stop typing indicator for a recipient.""" params: dict = {"stop": True} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id return await self._post("sendTyping", **params) async def deleteMessage(self, message: Message, recipient: User | Group) -> dict[str, Any]: """Delete a message remotely.""" if not message.timestamp: raise ValueError("Message must have a timestamp to be deleted") params: dict = {} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id params["targetTimestamp"] = message.timestamp return await self._post("remoteDelete", **params) async def sendPoll(self, poll: Poll, recipient: User | Group) -> dict[str, Any]: """Send a poll to a user or group.""" params: dict = {} if isinstance(recipient, User): params["recipient"] = recipient.id if isinstance(recipient, Group): params["groupId"] = recipient.id params["question"] = poll.question params["options"] = poll.options if not poll.multiple: params["noMulti"] = not poll.multiple result = await self._post("sendPollCreate", **params) timestamp = result.get("result", {}).get("timestamp") if timestamp: poll.timestamp = timestamp poll.user = User.self(self._self_number) return result async def listGroups(self) -> list[Group]: return [ Group( id=group["id"], name=group["name"], description=group["description"], members=[ await self.getUser(member["uuid"]) for member in group["members"] ], admins=[await self.getUser(admin["uuid"]) for admin in group["admins"]], banned=[await self.getUser(ban["uuid"]) for ban in group["banned"]], ) for group in (await self._post("listGroups"))["result"] ] async def listUsers(self) -> list[User]: return [ User( id=user["uuid"], number=user["number"], name=user["name"], first_name=user["givenName"], last_name=user["familyName"], nickname=user["nickName"], first_nickname=user["nickGivenName"], last_nickname=user["nickFamilyName"], note=user["note"], about=user["profile"]["about"], emoji=user["profile"]["aboutEmoji"], profile_first_name=user["profile"]["familyName"], profile_last_name=user["profile"]["givenName"], username=user["username"], ) for user in (await self._post("listContacts"))["result"] ] async def getGroup(self, id: str) -> Group: for group in await self.listGroups(): if group.id == id: return group return Group(id) async def getUser(self, id: str) -> User: for user in await self.listUsers(): if user.id == id: return user return User(id) def onMessage( self, *, user: list[User] | User | None = None, group: list[Group] | Group | None = None, ) -> Callable: """Decorator to register a message handler with optional user/group filters.""" def wrapper(func: Callable): async def callback(data: dict): try: envelope = data.get("envelope", {}) if envelope and envelope.get("dataMessage"): dataMessage = envelope["dataMessage"] message: str | None = dataMessage.get("message") groupId: str | None = None if dataMessage.get("groupInfo"): groupId = dataMessage["groupInfo"]["groupId"] sticker: Sticker | None = None if dataMessage.get("sticker"): s = dataMessage["sticker"] sticker = Sticker(s["packId"], s["stickerId"]) # attachments # contacts viewOnce: bool = dataMessage.get("viewOnce", False) sourceUuid: str = envelope["sourceUuid"] timestamp: int = envelope["timestamp"] msg_user: User = await self.getUser(sourceUuid) msg_group: Group | None = ( (await self.getGroup(groupId)) if groupId else None ) if message or sticker: msg = Message( text=message, user=msg_user, group=msg_group, timestamp=timestamp, sticker=sticker, view_once=viewOnce, ) try: await self._post( "sendReceipt", recipient=msg_user.id, targetTimestamp=timestamp, type="viewed", ) except Exception as e: logger.warning(f"Failed to send receipt: {e}") if not user and not group: return await func(msg) if isinstance(user, User) and user.id == msg_user.id: return await func(msg) if ( isinstance(group, Group) and msg_group and group.id == msg_group.id ): return await func(msg) if isinstance(user, list): for usr in user: if usr.id == msg_user.id: return await func(msg) if isinstance(group, list) and msg_group: for grp in group: if grp.id == msg_group.id: return await func(msg) except Exception as e: logger.error(f"Error in message handler: {e}", exc_info=True) self._hooks.append(callback) return func return wrapper def onMessageRaw(self) -> Callable: """Decorator to register a raw message handler that receives unprocessed data.""" def wrapper(func: Callable): async def callback(data: dict): try: await func(data) except Exception as e: logger.error(f"Error in raw message handler: {e}", exc_info=True) self._hooks.append(callback) return func return wrapper async def loop(self, retry_delay: int = 5): """ Start the event loop to receive messages from Signal. Automatically reconnects on connection loss. """ while True: try: session = await self._get_session() logger.info(f"Connecting to Signal events at {self.url}/api/v1/events") async with session.get(self.url + "/api/v1/events") as response: if response.status == 200: logger.info("Connected to Signal events") async for msg in response.content: msg_data = msg.decode().strip() if msg_data.startswith("data"): try: data = loads(msg_data.lstrip("data:")) except Exception as e: logger.warning(f"Failed to parse message: {e}") continue for callback in self._hooks: asyncio.create_task(callback(data)) else: logger.error(f"Failed to connect: HTTP {response.status}") await asyncio.sleep(retry_delay) except Exception as e: logger.error(f"Connection lost: {e}. Retrying in {retry_delay}s...") await asyncio.sleep(retry_delay)