from __future__ import annotations import io import asyncio import logging.config import os import tomllib from typing import Any import discord import rollbot from commands import config logging.config.fileConfig("logging.conf", disable_existing_loggers=False) with open(os.environ.get("SECRET_FILE", "secrets.toml"), "rb") as sfile: secrets = tomllib.load(sfile) database_file = os.environ.get("DATABASE_FILE", secrets["database_file"]) config.bangs = tuple(t for t in secrets["discord"].get("bangs", "!/")) class DiscordBot(rollbot.Rollbot[discord.Message]): def __init__(self, client, loop): super().__init__(config, database_file, loop=loop) self.discord_client = client client.event(self.on_message) if secrets["discord"].get("enable_react_notifs", False): client.event(self.on_reaction_add) def read_config(self, key: str) -> Any: cfg = secrets for part in key.split("."): cfg = cfg.get(part, None) if cfg is None: return None return cfg async def on_command( self, raw: discord.Message, message: rollbot.Message, command: str ): async with raw.channel.typing(): return await super().on_command(raw, message, command) async def on_reaction_add( self, reaction: discord.Reaction, user: discord.Member | discord.User ): sender_id = getattr(reaction.message.author, "id", None) if ( str(sender_id) not in secrets["discord"].get("wants_react_notifs", []) or user.id == sender_id ): return channel_name = getattr(reaction.message.channel, "name", "UNKNOWN CHANNEL") content = reaction.message.content content = (content[:17] + "...") if len(content) > 17 else content react_name = ( reaction.emoji if isinstance(reaction.emoji, str) else f":{reaction.emoji.name}:" ) notif = f"{user.name} {react_name}'d your message '{content}' in {channel_name}" user = await self.discord_client.fetch_user(sender_id) await user.send(notif) async def parse(self, msg: discord.Message) -> rollbot.Message: # TODO might be nice to only read attachments lazily attachments = [ rollbot.Attachment( name=att.filename, body=await att.read(), ) for att in msg.attachments ] if msg.reference is not None and msg.reference.resolved is not None: channel = await self.discord_client.fetch_channel(msg.channel.id) attachments.append( rollbot.Attachment( name="reply", body=await channel.fetch_message(msg.reference.resolved.id), ) ) return rollbot.Message( origin_id="DISCORD", channel_id=str(msg.channel.id), sender_id=str(msg.author.id), message_id=str(msg.id), timestamp=msg.created_at, origin_admin="RollbotAdmin" in [r.name for r in getattr(msg.author, "roles", [])], channel_admin=False, # TODO - implement this if discord allows it sender_name=msg.author.name, text=msg.content, attachments=attachments, force_command=( isinstance(msg.channel, discord.DMChannel) and msg.author != msg.channel.me ), ) async def respond(self, response: rollbot.Response): if response.origin_id != "DISCORD": self.context.logger.error(f"Unable to respond to {response.origin_id}") return channel = await self.discord_client.fetch_channel(response.channel_id) args = {} args["content"] = response.text or "" attachments = [] files = [] reacts = [] pin = False if response.attachments is not None: for att in response.attachments: if att.name == "image": if isinstance(att.body, bytes): embed = discord.Embed() file = discord.File(io.BytesIO(att.body), filename="image.png") embed.set_image(url="attachment://image.png") args["embed"] = embed # TODO might eventually be nice to figure out a way of doing multiple embeds files.append(file) else: args["content"] += "\n" + att.body elif att.name == "reply": if att.body is None or not isinstance(att.body, str): raise ValueError("Invalid reply body type, must be message ID") args["reference"] = await channel.fetch_message(int(att.body)) elif att.name == "react": reacts.append(att.body) elif att.name == "pin": pin = True elif isinstance(att.body, discord.Attachment): attachments.append(att.body) if len(attachments) > 0: args["attachments"] = attachments if len(files) > 0: args["files"] = files # TODO add abilitly to disable silent? message = await channel.send(silent=True, **args) for react in reacts: await message.add_reaction(react) if pin: await message.pin() if __name__ == "__main__": loop = asyncio.get_event_loop() intents = discord.Intents.default() intents.message_content = True intents.reactions = True client = discord.Client(intents=intents, loop=loop) bot = DiscordBot(client, loop) try: loop.run_until_complete(bot.on_startup()) loop.run_until_complete(client.start(secrets["discord"]["token"])) except KeyboardInterrupt: loop.run_until_complete(client.close()) finally: loop.run_until_complete(bot.on_shutdown())