bot/libsignal.py
2026-02-16 12:16:32 +01:00

553 lines
19 KiB
Python

import asyncio
import logging
from json import loads
from pathlib import Path
from typing import Any, Callable, Literal
from uuid import uuid4
from aiohttp import ClientSession
logger = logging.getLogger(__name__)
class User:
def __init__(
self,
id: str,
number: str | None = None,
name: str | None = None,
first_name: str | None = None,
last_name: str | None = None,
nickname: str | None = None,
first_nickname: str | None = None,
last_nickname: str | None = None,
note: str | None = None,
about: str | None = None,
emoji: str | None = None,
profile_first_name: str | None = None,
profile_last_name: str | None = None,
username: str | None = None,
):
self.id = id
self.number = number
self.name = name
self.first_name = first_name
self.last_name = last_name
self.nickname = nickname
self.first_nickname = first_nickname
self.last_nickname = last_nickname
self.note = note
self.about = about
self.emoji = emoji
self.profile_first_name = profile_first_name
self.profile_last_name = profile_last_name
self.username = username
@classmethod
def self(cls, number: str | None = None):
"""Create a User instance representing the current user."""
return cls("", number=number)
def __repr__(self) -> str:
return f'<User "{self.name}" at "{self.number}">'
class Group:
def __init__(
self,
id: str,
name: str | None = None,
description: str | None = None,
members: list[User] | None = None,
admins: list[User] | None = None,
banned: list[User] | None = None,
):
self.id = id
self.name = name
self.description = description
self.members = members or []
self.admins = admins or []
self.banned = banned or []
def __repr__(self) -> str:
return f'<Group "{self.name}" at "{self.id}">'
class Sticker:
def __init__(self, pack_id: str, id: int):
self.pack_id = pack_id
self.id = id
class MessageMention:
def __init__(self, start: int, length: int, recipient: User):
self.start = start
self.length = length
self.recipient = recipient
@classmethod
def newMessage(cls, recipient: User):
return Message("X", mentions=[cls(0, 1, recipient)])
def get(self) -> str:
return (
f"{self.start}:{self.length}:{self.recipient.number or self.recipient.id}"
)
class MessageStyle:
def __init__(
self,
start: int,
length: int,
style: Literal["BOLD", "ITALIC", "SPOILER", "STRIKETHROUGH", "MONOSPACE"],
):
self.start = start
self.length = length
self.style = style
def get(self) -> str:
return f"{self.start}:{self.length}:{self.style}"
class Message:
def __init__(
self,
text: str | None = None,
user: User | None = None,
group: Group | None = None,
timestamp: int | None = None,
attachments: list[Path] | None = None,
view_once: bool = False,
sticker: Sticker | None = None,
mentions: list[MessageMention] | None = None,
styles: list[MessageStyle] | None = None,
quote: "Message | Poll | None" = None,
edit: "Message | None" = None,
):
self.text = text
self.user = user
self.group = group
self.timestamp = timestamp
self.attachments = attachments or []
self.view_once = view_once
self.sticker = sticker
self.mentions = mentions or []
self.styles = styles or []
self.quote = quote
self.edit = edit
def __add__(self, b) -> "Message":
a = Message(
text=self.text,
user=self.user,
group=self.group,
timestamp=self.timestamp,
attachments=self.attachments.copy(),
view_once=self.view_once,
sticker=self.sticker,
mentions=self.mentions.copy(),
styles=self.styles.copy(),
quote=self.quote,
edit=self.edit,
)
if isinstance(b, Message):
a.text = a.text or ""
offset = len(a.text)
for mention in b.mentions:
a.mentions.append(
MessageMention(
mention.start + offset,
mention.length,
mention.recipient,
),
)
if b.text:
a.text += b.text
return a
class Poll:
def __init__(
self,
question: str,
options: list[str],
user: User | None = None,
group: Group | None = None,
timestamp: int | None = None,
multiple: bool = True,
):
self.question = question
self.options = options
self.user = user
self.group = group
self.timestamp = timestamp
self.multiple = multiple
class Signal:
def __init__(self, host: str, port: int, self_number: str | None = None):
self.url = host.rstrip("/") + ":" + str(port)
self._hooks: list[Callable] = []
self._session: ClientSession | None = None
self._self_number = self_number
async def __aenter__(self):
self._session = ClientSession()
return self
async def __aexit__(self, *args):
if self._session:
await self._session.close()
async def _get_session(self) -> ClientSession:
"""Get or create a session for API calls."""
if self._session is None:
self._session = ClientSession()
return self._session
async def _post(self, method: str, **params) -> dict[str, Any]:
"""Make a JSON-RPC POST request to the Signal API."""
session = await self._get_session()
try:
async with session.post(
self.url + "/api/v1/rpc",
json={
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": str(uuid4()),
},
) as response:
if response.status == 200:
result = await response.json()
if "error" in result:
raise ValueError(f"API error: {result['error']}")
return result
text = await response.text()
raise ConnectionError(
f"Error while sending {method} to Server: HTTP {response.status}. {text}"
)
except Exception as e:
logger.error(f"Failed to call {method}: {e}")
raise
async def sendMessage(self, message: Message, recipient: User | Group) -> dict[str, Any]:
"""Send a message to a user or group."""
if not message.text and not message.attachments and not message.sticker:
raise ValueError("Message must have text, attachments, or sticker")
params: dict = {}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
if message.attachments:
params["attachment"] = [
str(file.absolute()) for file in message.attachments
]
if message.view_once:
params["viewOnce"] = message.view_once
if message.text:
params["message"] = message.text
if message.mentions:
params["mention"] = [mention.get() for mention in message.mentions]
if message.styles:
params["textStyle"] = [style.get() for style in message.styles]
if message.sticker:
params["sticker"] = message.sticker.pack_id + ":" + str(message.sticker.id)
if message.quote:
if message.quote.timestamp:
params["quoteTimestamp"] = message.quote.timestamp
if message.quote.user:
params["quoteAuthor"] = message.quote.user.id
if isinstance(message.quote, Message):
if message.quote.mentions:
params["quoteMention"] = [
mention.get() for mention in message.quote.mentions
]
if message.quote.styles:
params["quoteTextStyle"] = [
style.get() for style in message.quote.styles
]
if message.quote.attachments:
params["quoteAttachment"] = [
str(file.absolute()) for file in message.quote.attachments
]
if message.edit:
params["editTimestamp"] = message.edit.timestamp
result = await self._post("send", **params)
timestamp = result.get("result", {}).get("timestamp")
if timestamp:
message.timestamp = timestamp
message.user = User.self(self._self_number)
return result
async def sendReaction(self, emoji: str, message: Message, recipient: User | Group) -> dict[str, Any]:
"""Send a reaction emoji to a message."""
params: dict = {"emoji": emoji}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
if message.user:
params["targetAuthor"] = message.user.id
if message.timestamp:
params["targetTimestamp"] = message.timestamp
return await self._post("sendReaction", **params)
async def startTyping(self, recipient: User | Group) -> dict[str, Any]:
"""Start typing indicator for a recipient."""
params: dict = {}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
return await self._post("sendTyping", **params)
async def stopTyping(self, recipient: User | Group) -> dict[str, Any]:
"""Stop typing indicator for a recipient."""
params: dict = {"stop": True}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
return await self._post("sendTyping", **params)
async def deleteMessage(self, message: Message, recipient: User | Group) -> dict[str, Any]:
"""Delete a message remotely."""
if not message.timestamp:
raise ValueError("Message must have a timestamp to be deleted")
params: dict = {}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
params["targetTimestamp"] = message.timestamp
return await self._post("remoteDelete", **params)
async def sendPoll(self, poll: Poll, recipient: User | Group) -> dict[str, Any]:
"""Send a poll to a user or group."""
params: dict = {}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
params["question"] = poll.question
params["options"] = poll.options
if not poll.multiple:
params["noMulti"] = not poll.multiple
result = await self._post("sendPollCreate", **params)
timestamp = result.get("result", {}).get("timestamp")
if timestamp:
poll.timestamp = timestamp
poll.user = User.self(self._self_number)
return result
async def listGroups(self) -> list[Group]:
return [
Group(
id=group["id"],
name=group["name"],
description=group["description"],
members=[
await self.getUser(member["uuid"]) for member in group["members"]
],
admins=[await self.getUser(admin["uuid"]) for admin in group["admins"]],
banned=[await self.getUser(ban["uuid"]) for ban in group["banned"]],
)
for group in (await self._post("listGroups"))["result"]
]
async def listUsers(self) -> list[User]:
return [
User(
id=user["uuid"],
number=user["number"],
name=user["name"],
first_name=user["givenName"],
last_name=user["familyName"],
nickname=user["nickName"],
first_nickname=user["nickGivenName"],
last_nickname=user["nickFamilyName"],
note=user["note"],
about=user["profile"]["about"],
emoji=user["profile"]["aboutEmoji"],
profile_first_name=user["profile"]["familyName"],
profile_last_name=user["profile"]["givenName"],
username=user["username"],
)
for user in (await self._post("listContacts"))["result"]
]
async def getGroup(self, id: str) -> Group:
for group in await self.listGroups():
if group.id == id:
return group
return Group(id)
async def getUser(self, id: str) -> User:
for user in await self.listUsers():
if user.id == id:
return user
return User(id)
def onMessage(
self,
*,
user: list[User] | User | None = None,
group: list[Group] | Group | None = None,
) -> Callable:
"""Decorator to register a message handler with optional user/group filters."""
def wrapper(func: Callable):
async def callback(data: dict):
try:
envelope = data.get("envelope", {})
if envelope and envelope.get("dataMessage"):
dataMessage = envelope["dataMessage"]
message: str | None = dataMessage.get("message")
groupId: str | None = None
if dataMessage.get("groupInfo"):
groupId = dataMessage["groupInfo"]["groupId"]
sticker: Sticker | None = None
if dataMessage.get("sticker"):
s = dataMessage["sticker"]
sticker = Sticker(s["packId"], s["stickerId"])
# attachments
# contacts
viewOnce: bool = dataMessage.get("viewOnce", False)
sourceUuid: str = envelope["sourceUuid"]
timestamp: int = envelope["timestamp"]
msg_user: User = await self.getUser(sourceUuid)
msg_group: Group | None = (
(await self.getGroup(groupId)) if groupId else None
)
if message or sticker:
msg = Message(
text=message,
user=msg_user,
group=msg_group,
timestamp=timestamp,
sticker=sticker,
view_once=viewOnce,
)
try:
await self._post(
"sendReceipt",
recipient=msg_user.id,
targetTimestamp=timestamp,
type="viewed",
)
except Exception as e:
logger.warning(f"Failed to send receipt: {e}")
if not user and not group:
return await func(msg)
if isinstance(user, User) and user.id == msg_user.id:
return await func(msg)
if (
isinstance(group, Group)
and msg_group
and group.id == msg_group.id
):
return await func(msg)
if isinstance(user, list):
for usr in user:
if usr.id == msg_user.id:
return await func(msg)
if isinstance(group, list) and msg_group:
for grp in group:
if grp.id == msg_group.id:
return await func(msg)
except Exception as e:
logger.error(f"Error in message handler: {e}", exc_info=True)
self._hooks.append(callback)
return func
return wrapper
def onMessageRaw(self) -> Callable:
"""Decorator to register a raw message handler that receives unprocessed data."""
def wrapper(func: Callable):
async def callback(data: dict):
try:
await func(data)
except Exception as e:
logger.error(f"Error in raw message handler: {e}", exc_info=True)
self._hooks.append(callback)
return func
return wrapper
async def loop(self, retry_delay: int = 5):
"""
Start the event loop to receive messages from Signal.
Automatically reconnects on connection loss.
"""
while True:
try:
session = await self._get_session()
logger.info(f"Connecting to Signal events at {self.url}/api/v1/events")
async with session.get(self.url + "/api/v1/events") as response:
if response.status == 200:
logger.info("Connected to Signal events")
async for msg in response.content:
msg_data = msg.decode().strip()
if msg_data.startswith("data"):
try:
data = loads(msg_data.lstrip("data:"))
except Exception as e:
logger.warning(f"Failed to parse message: {e}")
continue
for callback in self._hooks:
asyncio.create_task(callback(data))
else:
logger.error(f"Failed to connect: HTTP {response.status}")
await asyncio.sleep(retry_delay)
except Exception as e:
logger.error(f"Connection lost: {e}. Retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)