123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- from __future__ import annotations
- import io
- import asyncio
- import logging.config
- import os
- import tomllib
- from typing import Any
- import discord
- import rollbot
- from commands import config
- logging.config.fileConfig("logging.conf", disable_existing_loggers=False)
- with open(os.environ.get("SECRET_FILE", "secrets.toml"), "rb") as sfile:
- secrets = tomllib.load(sfile)
- database_file = os.environ.get("DATABASE_FILE", secrets["database_file"])
- bangs = tuple(t for t in secrets["discord"].get("bangs", "!/"))
- class DiscordBot(rollbot.Rollbot[discord.Message]):
- def __init__(self, client):
- super().__init__(config.extend(rollbot.CommandConfiguration(bangs=bangs)), database_file)
- self.discord_client = client
- client.event(self.on_message)
- if secrets["discord"].get("enable_react_notifs", False):
- client.event(self.on_reaction_add)
- def read_config(self, key: str) -> Any:
- cfg = secrets
- for part in key.split("."):
- cfg = cfg.get(part, None)
- if cfg is None:
- return None
- return cfg
-
- async def on_command(self, raw: discord.Message, message: rollbot.Message, command: str):
- async with raw.channel.typing():
- return await super().on_command(raw, message, command)
- async def on_reaction_add(self, reaction: discord.Reaction, user: discord.Member | discord.User):
- sender_id = getattr(reaction.message.author, "id", None)
- if str(sender_id) not in secrets["discord"].get("wants_react_notifs", []) or user.id == sender_id:
- return
- channel_name = getattr(reaction.message.channel, "name", "UNKNOWN CHANNEL")
- content = reaction.message.content
- content = (content[:17] + '...') if len(content) > 17 else content
- react_name = reaction.emoji if isinstance(reaction.emoji, str) else f":{reaction.emoji.name}:"
- notif = f"{user.name} {react_name}'d your message '{content}' in {channel_name}"
- user = await self.discord_client.fetch_user(sender_id)
- await user.send(notif)
- async def parse(self, msg: discord.Message) -> rollbot.Message:
- # TODO might be nice to only read attachments lazily
- attachments = [rollbot.Attachment(
- name=att.filename,
- body=await att.read(),
- ) for att in msg.attachments]
- if msg.reference is not None:
- channel = await self.discord_client.fetch_channel(msg.channel.id)
- attachments.append(rollbot.Attachment(
- name="reply",
- body=await channel.fetch_message(msg.reference.resolved.id),
- ))
- return rollbot.Message(
- origin_id="DISCORD",
- channel_id=str(msg.channel.id),
- sender_id=str(msg.author.id),
- message_id=str(msg.id),
- timestamp=msg.created_at,
- origin_admin="RollbotAdmin" in [r.name for r in getattr(msg.author, "roles", [])],
- channel_admin=False, # TODO - implement this if discord allows it
- sender_name=msg.author.name,
- text=msg.content,
- attachments=attachments,
- )
- async def respond(self, response: rollbot.Response):
- if response.origin_id != "DISCORD":
- self.context.logger.error(f"Unable to respond to {response.origin_id}")
- return
- channel = await self.discord_client.fetch_channel(response.channel_id)
- args = {}
- args["content"] = response.text or ""
- attachments = []
- files = []
- if response.attachments is not None:
- for att in response.attachments:
- if att.name == "image":
- if isinstance(att.body, bytes):
- embed = discord.Embed(description="Embedded Image")
- file = discord.File(io.BytesIO(att.body), filename="image.png")
- embed.set_image(url="attachment://image.png")
- args["embed"] = embed
- # TODO might eventually be nice to figure out a way of doing multiple embeds
- files.append(file)
- else:
- args["content"] += "\n" + att.body
- elif att.name == "reply":
- if att.body is None or not isinstance(att.body, str):
- raise ValueError("Invalid reply body type, must be message ID")
- args["reference"] = await channel.fetch_message(int(att.body))
- elif isinstance(att.body, discord.Attachment):
- attachments.append(att.body)
- if len(attachments) > 0:
- args["attachments"] = attachments
- if len(files) > 0:
- args["files"] = files
- await channel.send(**args)
- if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- intents = discord.Intents.default()
- intents.message_content = True
- intents.reactions = True
- client = discord.Client(intents=intents, loop=loop)
- bot = DiscordBot(client)
- try:
- loop.run_until_complete(bot.on_startup())
- loop.run_until_complete(client.start(secrets["discord"]["token"]))
- except KeyboardInterrupt:
- loop.run_until_complete(client.close())
- finally:
- loop.run_until_complete(bot.on_shutdown())
|