bot/libsignal.py
2025-12-17 00:27:01 +01:00

485 lines
15 KiB
Python

import asyncio
from json import loads
from pathlib import Path
from typing import Callable, Literal
from uuid import uuid4
from aiohttp import ClientSession
class User:
def __init__(
self,
id: str,
number: str | None = None,
name: str | None = None,
first_name: str | None = None,
last_name: str | None = None,
nickname: str | None = None,
first_nickname: str | None = None,
last_nickname: str | None = None,
note: str | None = None,
about: str | None = None,
emoji: str | None = None,
profile_first_name: str | None = None,
profile_last_name: str | None = None,
username: str | None = None,
):
self.id = id
self.number = number
self.name = name
self.first_name = first_name
self.last_name = last_name
self.nickname = nickname
self.first_nickname = first_nickname
self.last_nickname = last_nickname
self.note = note
self.about = about
self.emoji = emoji
self.profile_first_name = profile_first_name
self.profile_last_name = profile_last_name
self.username = username
@classmethod
def self(cls):
return cls("", number="+393406838100")
def __repr__(self) -> str:
return f'<User "{self.name}" at "{self.number}">'
class Group:
def __init__(
self,
id: str,
name: str | None = None,
description: str | None = None,
members: list[User] = [],
admins: list[User] = [],
banned: list[User] = [],
):
self.id = id
self.name = name
self.description = description
self.members = members
self.admins = admins
self.banned = banned
def __repr__(self) -> str:
return f'<Group "{self.name}" at "{self.id}">'
class Sticker:
def __init__(self, pack_id: str, id: int):
self.pack_id = pack_id
self.id = id
class MessageMention:
def __init__(self, start: int, length: int, recipient: User):
self.start = start
self.length = length
self.recipient = recipient
@classmethod
def newMessage(cls, recipient: User):
return Message("X", mentions=[cls(0, 1, recipient)])
def get(self) -> str:
return (
f"{self.start}:{self.length}:{self.recipient.number or self.recipient.id}"
)
class MessageStyle:
def __init__(
self,
start: int,
length: int,
style: Literal["BOLD", "ITALIC", "SPOILER", "STRIKETHROUGH", "MONOSPACE"],
):
self.start = start
self.length = length
self.style = style
def get(self) -> str:
return f"{self.start}:{self.length}:{self.style}"
class Message:
def __init__(
self,
text: str | None = None,
user: User | None = None,
group: Group | None = None,
timestamp: int | None = None,
attachments: list[Path] = [],
view_once: bool = False, # TODO: fix
sticker: Sticker | None = None,
mentions: list[MessageMention] = [],
styles: list[MessageStyle] = [],
quote: "Message | Poll | None" = None,
edit: "Message | None" = None,
):
self.text = text
self.user = user
self.group = group
self.timestamp = timestamp
self.attachments = attachments
self.view_once = view_once
self.sticker = sticker
self.mentions = mentions
self.styles = styles
self.quote = quote
self.edit = edit
def __add__(self, b) -> "Message":
a = Message(
text=self.text,
user=self.user,
group=self.group,
timestamp=self.timestamp,
attachments=self.attachments,
view_once=self.view_once,
sticker=self.sticker,
mentions=self.mentions,
styles=self.styles,
quote=self.quote,
edit=self.edit,
)
if isinstance(b, Message):
a.text = a.text or ""
for mention in b.mentions:
a.mentions.append(
MessageMention(
mention.start + len(a.text),
mention.length + len(a.text),
mention.recipient,
),
)
if b.text:
a.text += b.text
return a
class Poll:
def __init__(
self,
question: str,
options: list[str],
user: User | None = None,
group: Group | None = None,
timestamp: int | None = None,
multiple: bool = True,
):
self.question = question
self.options = options
self.user = user
self.group = group
self.timestamp = timestamp
self.multiple = multiple
class Signal:
def __init__(self, host: str, port: int):
self.url = host.rstrip("/") + ":" + str(port)
self._hooks: list[Callable] = []
async def _post(self, method: str, **params):
async with ClientSession() as session:
async with session.post(
self.url + "/api/v1/rpc",
json={
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": str(uuid4()),
},
) as response:
if response.status == 200:
return await response.json()
raise ConnectionError(
f"Error while sending {method} to Server: HTTP code {response.status}."
)
async def sendMessage(self, message: Message, recipient: User | Group):
params: dict = {}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
if message.attachments:
params["attachment"] = [
str(file.absolute()) for file in message.attachments
]
if message.view_once:
params["viewOnce"] = message.view_once
if message.text:
params["message"] = message.text
if message.mentions:
params["mention"] = [mention.get() for mention in message.mentions]
if message.styles:
params["textStyle"] = [style.get() for style in message.styles]
if message.sticker:
params["sticker"] = message.sticker.pack_id + ":" + str(message.sticker.id)
if message.quote:
if message.quote.timestamp:
params["quoteTimestamp"] = message.quote.timestamp
if message.quote.user:
params["quoteAuthor"] = message.quote.user.id
if isinstance(message.quote, Message):
if message.quote.mentions:
params["quoteMention"] = [
mention.get() for mention in message.quote.mentions
]
if message.quote.styles:
params["quoteTextStyle"] = [
style.get() for style in message.quote.styles
]
if message.quote.attachments:
params["quoteAttachment"] = [
str(file.absolute()) for file in message.quote.attachments
]
if message.edit:
params["editTimestamp"] = message.edit.timestamp
result = await self._post("send", **params)
timestamp = result.get("result", {}).get("timestamp")
if timestamp:
message.timestamp = timestamp
message.user = User.self()
return result
async def sendReaction(self, emoji: str, message: Message, recipient: User | Group):
params: dict = {"emoji": emoji}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
if message.user:
params["targetAuthor"] = message.user.id
if message.timestamp:
params["targetTimestamp"] = message.timestamp
return await self._post("sendReaction", **params)
async def startTyping(self, recipient: User | Group):
params: dict = {}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
return await self._post("sendTyping", **params)
async def stopTyping(self, recipient: User | Group):
params: dict = {"stop": True}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
return await self._post("sendTyping", **params)
async def deleteMessage(self, message: Message, recipient: User | Group):
params: dict = {}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
params["targetTimestamp"] = message.timestamp
return await self._post("remoteDelete", **params)
async def sendPoll(self, poll: Poll, recipient: User | Group):
params: dict = {}
if isinstance(recipient, User):
params["recipient"] = recipient.id
if isinstance(recipient, Group):
params["groupId"] = recipient.id
params["question"] = poll.question
params["options"] = poll.options
if not poll.multiple:
params["noMulti"] = not poll.multiple
result = await self._post("sendPollCreate", **params)
timestamp = result.get("result", {}).get("timestamp")
if timestamp:
poll.timestamp = timestamp
poll.user = User.self()
return result
async def listGroups(self) -> list[Group]:
return [
Group(
id=group["id"],
name=group["name"],
description=group["description"],
members=[
await self.getUser(member["uuid"]) for member in group["members"]
],
admins=[await self.getUser(admin["uuid"]) for admin in group["admins"]],
banned=[await self.getUser(ban["uuid"]) for ban in group["banned"]],
)
for group in (await self._post("listGroups"))["result"]
]
async def listUsers(self) -> list[User]:
return [
User(
id=user["uuid"],
number=user["number"],
name=user["name"],
first_name=user["givenName"],
last_name=user["familyName"],
nickname=user["nickName"],
first_nickname=user["nickGivenName"],
last_nickname=user["nickFamilyName"],
note=user["note"],
about=user["profile"]["about"],
emoji=user["profile"]["aboutEmoji"],
profile_first_name=user["profile"]["familyName"],
profile_last_name=user["profile"]["givenName"],
username=user["username"],
)
for user in (await self._post("listContacts"))["result"]
]
async def getGroup(self, id: str) -> Group:
for group in await self.listGroups():
if group.id == id:
return group
return Group(id)
async def getUser(self, id: str) -> User:
for user in await self.listUsers():
if user.id == id:
return user
return User(id)
def onMessage(
self,
*,
user: list[User] | User | None = None,
group: list[Group] | Group | None = None,
) -> Callable:
def wrapper(func: Callable):
async def callback(data: dict):
envelope = data.get("envelope", {})
if envelope and envelope.get("dataMessage"):
dataMessage = envelope["dataMessage"]
message: str | None = dataMessage["message"]
groupId: str | None = None
if dataMessage.get("groupInfo"):
groupId = dataMessage["groupInfo"]["groupId"]
sticker: Sticker | None = None
if dataMessage.get("sticker"):
s = dataMessage["sticker"]
sticker = Sticker(s["packId"], s["stickerId"])
# attachments
# contacts
viewOnce: bool = dataMessage["viewOnce"]
sourceUuid: str = envelope["sourceUuid"]
timestamp: int = envelope["timestamp"]
msg_user: User = await self.getUser(sourceUuid)
msg_group: Group | None = (
(await self.getGroup(groupId)) if groupId else None
)
if message or sticker:
msg = Message(
text=message,
user=msg_user,
group=msg_group,
timestamp=timestamp,
sticker=sticker,
view_once=viewOnce,
)
await self._post(
"sendReceipt",
recipient=msg_user.id,
targetTimestamp=timestamp,
type="viewed",
)
if not user and not group:
return await func(msg)
if isinstance(user, User) and user.id == msg_user.id:
return await func(msg)
if (
isinstance(group, Group)
and msg_group
and group.id == msg_group.id
):
return await func(msg)
if isinstance(user, list):
for usr in user:
if usr.id == msg_user.id:
return await func(msg)
if isinstance(group, list) and msg_group:
for grp in group:
if grp.id == msg_group.id:
return await func(msg)
self._hooks.append(callback)
return func
return wrapper
def onMessageRaw(self) -> Callable:
def wrapper(func: Callable):
async def callback(data: dict):
await func(data)
self._hooks.append(callback)
return func
return wrapper
async def loop(self):
async with ClientSession() as session:
async with session.get(self.url + "/api/v1/events") as response:
if response.status == 200:
async for msg in response.content:
msg_data = msg.decode().strip()
if msg_data.startswith("data"):
try:
data = loads(msg_data.lstrip("data:"))
except Exception:
data = {}
for callback in self._hooks:
asyncio.create_task(callback(data))