123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- from __future__ import annotations
- import io
- import asyncio
- import logging.config
- import os
- import tomllib
- from typing import Any
- import discord
- import rollbot
- from commands import config
- logging.config.fileConfig("logging.conf", disable_existing_loggers=False)
- with open(os.environ.get("SECRET_FILE", "secrets.toml"), "rb") as sfile:
- secrets = tomllib.load(sfile)
- database_file = os.environ.get("DATABASE_FILE", secrets["database_file"])
- config.bangs = tuple(t for t in secrets["discord"].get("bangs", "!/"))
- class DiscordBot(rollbot.Rollbot[discord.Message]):
- def __init__(self, client, loop):
- super().__init__(config, database_file, loop=loop)
- self.discord_client = client
- client.event(self.on_message)
- if secrets["discord"].get("enable_react_notifs", False):
- client.event(self.on_reaction_add)
- def read_config(self, key: str) -> Any:
- cfg = secrets
- for part in key.split("."):
- cfg = cfg.get(part, None)
- if cfg is None:
- return None
- return cfg
- async def on_command(
- self, raw: discord.Message, message: rollbot.Message, command: str
- ):
- async with raw.channel.typing():
- return await super().on_command(raw, message, command)
- async def on_reaction_add(
- self, reaction: discord.Reaction, user: discord.Member | discord.User
- ):
- sender_id = getattr(reaction.message.author, "id", None)
- if (
- str(sender_id) not in secrets["discord"].get("wants_react_notifs", [])
- or user.id == sender_id
- ):
- return
- channel_name = getattr(reaction.message.channel, "name", "UNKNOWN CHANNEL")
- content = reaction.message.content
- content = (content[:17] + "...") if len(content) > 17 else content
- react_name = (
- reaction.emoji
- if isinstance(reaction.emoji, str)
- else f":{reaction.emoji.name}:"
- )
- notif = f"{user.name} {react_name}'d your message '{content}' in {channel_name}"
- user = await self.discord_client.fetch_user(sender_id)
- await user.send(notif)
- async def parse(self, msg: discord.Message) -> rollbot.Message:
- # TODO might be nice to only read attachments lazily
- attachments = [
- rollbot.Attachment(
- name=att.filename,
- body=await att.read(),
- )
- for att in msg.attachments
- ]
- if msg.reference is not None and msg.reference.resolved is not None:
- channel = await self.discord_client.fetch_channel(msg.channel.id)
- attachments.append(
- rollbot.Attachment(
- name="reply",
- body=await channel.fetch_message(msg.reference.resolved.id),
- )
- )
- return rollbot.Message(
- origin_id="DISCORD",
- channel_id=str(msg.channel.id),
- sender_id=str(msg.author.id),
- message_id=str(msg.id),
- timestamp=msg.created_at,
- origin_admin="RollbotAdmin"
- in [r.name for r in getattr(msg.author, "roles", [])],
- channel_admin=False, # TODO - implement this if discord allows it
- sender_name=msg.author.name,
- text=msg.content,
- attachments=attachments,
- force_command=(
- isinstance(msg.channel, discord.DMChannel)
- and msg.author != msg.channel.me
- ),
- )
- async def respond(self, response: rollbot.Response):
- if response.origin_id != "DISCORD":
- self.context.logger.error(f"Unable to respond to {response.origin_id}")
- return
- channel = await self.discord_client.fetch_channel(response.channel_id)
- args = {}
- args["content"] = response.text or ""
- attachments = []
- files = []
- reacts = []
- pin = False
- if response.attachments is not None:
- for att in response.attachments:
- if att.name == "image":
- if isinstance(att.body, bytes):
- embed = discord.Embed()
- file = discord.File(io.BytesIO(att.body), filename="image.png")
- embed.set_image(url="attachment://image.png")
- args["embed"] = embed
- # TODO might eventually be nice to figure out a way of doing multiple embeds
- files.append(file)
- else:
- args["content"] += "\n" + att.body
- elif att.name == "reply":
- if att.body is None or not isinstance(att.body, str):
- raise ValueError("Invalid reply body type, must be message ID")
- args["reference"] = await channel.fetch_message(int(att.body))
- elif att.name == "react":
- reacts.append(att.body)
- elif att.name == "pin":
- pin = True
- elif isinstance(att.body, discord.Attachment):
- attachments.append(att.body)
- if len(attachments) > 0:
- args["attachments"] = attachments
- if len(files) > 0:
- args["files"] = files
- # TODO add abilitly to disable silent?
- message = await channel.send(silent=True, **args)
- for react in reacts:
- await message.add_reaction(react)
- if pin:
- await message.pin()
- if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- intents = discord.Intents.default()
- intents.message_content = True
- intents.reactions = True
- client = discord.Client(intents=intents, loop=loop)
- bot = DiscordBot(client, loop)
- try:
- loop.run_until_complete(bot.on_startup())
- loop.run_until_complete(client.start(secrets["discord"]["token"]))
- except KeyboardInterrupt:
- loop.run_until_complete(client.close())
- finally:
- loop.run_until_complete(bot.on_shutdown())
|