import logging from dataclasses import dataclass from enum import Enum, auto import inspect import functools from sqlalchemy import Column, DateTime, Binary, String, Float, Integer from sqlalchemy.ext.declarative import declarative_base BANGS = ('!',) ModelBase = declarative_base() class GroupBasedSingleton(ModelBase): __tablename__ = "group_based_singleton" group_id = Column(String, primary_key=True) command_name = Column(String, primary_key=True) subpart_name = Column(String, primary_key=True) integer_data = Column(Integer) float_data = Column(Float) string_data = Column(String) binary_data = Column(Binary) datetime_data = Column(DateTime) @staticmethod def get_or_create(db, group_id, command_name, subpart_name): sing = db.query(GroupBasedSingleton).get((group_id, command_name, subpart_name)) if sing is None: sing = GroupBasedSingleton( group_id=group_id, command_name=command_name, subpart_name=subpart_name, integer_data=None, float_data=None, string_data=None, binary_data=None, datetime_data=None ) db.add(sing) return sing def pop_arg(text): if text is None: return None, None parts = text.split(maxsplit=1) if len(parts) == 1: return parts[0], None return parts[0], parts[1].strip() @dataclass class RollbotMessage: src: str name: str sender_id: str group_id: str message_id: str message_txt: str from_admin: bool def __post_init__(self): self.is_command = False if len(self.message_txt) > 0 and self.message_txt[0] in BANGS: cmd, raw = pop_arg(self.message_txt[1:].strip()) if cmd is not None: self.is_command = True self.command = cmd.lower() self.raw_args = raw @staticmethod def from_groupme(msg, global_admins=(), group_admins={}): sender_id = msg["sender_id"] group_id = msg["group_id"] return RollbotMessage( "GROUPME", msg["name"], sender_id, group_id, msg["id"], msg["text"].strip(), sender_id in global_admins or ( group_id in group_admins and sender_id in group_admins[group_id]) ) @staticmethod def from_discord(msg, global_admins=(), group_admins={}): sender_id = str(msg.author.id) group_id = str(msg.channel.id) return RollbotMessage( "DISCORD", msg.author.name, sender_id, group_id, msg.id, msg.content.strip(), sender_id in global_admins or ( group_id in group_admins and sender_id in group_admins[group_id]) ) def args(self, normalize=True): arg, rest = pop_arg(self.raw_args) while arg is not None: yield arg.lower() if normalize else arg arg, rest = pop_arg(rest) class RollbotFailure(Enum): INVALID_COMMAND = auto() MISSING_SUBCOMMAND = auto() INVALID_SUBCOMMAND = auto() INVALID_ARGUMENTS = auto() SERVICE_DOWN = auto() PERMISSIONS = auto() INTERNAL_ERROR = auto() _RESPONSE_TEMPLATE = """Response{ Original Message: %s, Text Response: %s, Image Response: %s, Respond: %s, Failure Reason: %s, Failure Notes: %s }""" @dataclass class RollbotResponse: msg: RollbotMessage txt: str = None img: str = None respond: bool = True failure: RollbotFailure = None debugging: dict = None def __post_init__(self): self.info = _RESPONSE_TEMPLATE % (self.msg, self.txt, self.img, self.respond, self.failure, self.debugging) self.is_success = self.failure is None if self.failure is None: self.failure_msg = None elif self.failure == RollbotFailure.INVALID_COMMAND: self.failure_msg = "Sorry - I don't think I understand the command '!%s'... " % self.msg.command \ + "I'll try to figure it out and get back to you!" elif self.failure == RollbotFailure.MISSING_SUBCOMMAND: self.failure_msg = "Sorry - !%s requires a sub-command." % self.msg.command elif self.failure == RollbotFailure.INVALID_SUBCOMMAND: self.failure_msg = "Sorry - the sub-command you used for %s was not valid." % self.msg.command elif self.failure == RollbotFailure.INVALID_ARGUMENTS: self.failure_msg = "Sorry - %s cannot use those arguments!" % self.msg.command elif self.failure == RollbotFailure.SERVICE_DOWN: self.failure_msg = "Sorry - %s relies on a service I couldn't reach!" % self.msg.command elif self.failure == RollbotFailure.PERMISSIONS: self.failure_msg = "Sorry - you don't have permission to use that command or sub-command in this chat!" elif self.failure == RollbotFailure.INTERNAL_ERROR: self.failure_msg = "Sorry - I encountered an unrecoverable error, please review internal logs." if self.debugging is not None and "explain" in self.debugging: self.failure_msg += " " + self.debugging["explain"] class RollbotPlugin: def __init__(self, command, bot, logger=logging.getLogger(__name__)): self.command = command self.bot = bot self.logger = logger self.logger.info(f"Intializing {type(self).__name__} matching {command}") def on_start(self, db): self.logger.info(f"No on_start initialization of {type(self).__name__}") def on_shutdown(self, db): self.logger.info(f"No on_shutdown de-initialization of {type(self).__name__}") def on_command(self, db, message): raise NotImplementedError def as_plugin(command): if isinstance(command, str): command_name = command else: command_name = command.__name__ def init_standin(self, bot, logger=logging.getLogger(__name__)): RollbotPlugin.__init__(self, command_name, bot, logger=logger) def decorator(fn): sig = inspect.signature(fn) converters = [] for p in sig.parameters: if p in ("msg", "message", "_msg"): converters.append(lambda self, db, msg: msg) elif p in ("db", "database"): converters.append(lambda self, db, msg: db) elif p in ("log", "logger"): converters.append(lambda self, db, msg: self.logger) elif p in ("bot", "rollbot"): converters.append(lambda self, db, msg: self.bot) elif p.startswith("data") or p.endswith("data") or p in ("group_singleton", "singleton"): subp = fn.__annotations__.get(p, "") converters.append(lambda self, db, msg, subp=subp: GroupBasedSingleton.get_or_create(db, msg.group_id, self.command, subp)) else: raise ValueError(f"Illegal argument name {p} in decorated plugin {command_name}") def on_command_standin(self, db, msg): res = fn(*[c(self, db, msg) for c in converters]) if isinstance(res, RollbotResponse): return res else: return RollbotResponse(msg, txt=str(res)) return type( f"AutoGenerated`{command_name}`Command", (RollbotPlugin,), dict( __init__=init_standin, on_command=on_command_standin, ) ) if isinstance(command, str): return decorator else: return decorator(command)