123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- import logging
- from dataclasses import dataclass
- from enum import Enum, auto
- import inspect
- import functools
- from sqlalchemy import Column, DateTime, Binary, String, Float, Integer
- from sqlalchemy.ext.declarative import declarative_base
- BANGS = ('!',)
- ModelBase = declarative_base()
- class GroupBasedSingleton(ModelBase):
- __tablename__ = "group_based_singleton"
- group_id = Column(String, primary_key=True)
- command_name = Column(String, primary_key=True)
- subpart_name = Column(String, primary_key=True)
- integer_data = Column(Integer)
- float_data = Column(Float)
- string_data = Column(String)
- binary_data = Column(Binary)
- datetime_data = Column(DateTime)
- @staticmethod
- def get_or_create(db, group_id, command_name, subpart_name):
- sing = db.query(GroupBasedSingleton).get((group_id, command_name, subpart_name))
- if sing is None:
- sing = GroupBasedSingleton(
- group_id=group_id,
- command_name=command_name,
- subpart_name=subpart_name,
- integer_data=None,
- float_data=None,
- string_data=None,
- binary_data=None,
- datetime_data=None
- )
- db.add(sing)
- return sing
- def pop_arg(text):
- if text is None:
- return None, None
- parts = text.split(maxsplit=1)
- if len(parts) == 1:
- return parts[0], None
- return parts[0], parts[1].strip()
- @dataclass
- class RollbotMessage:
- src: str
- name: str
- sender_id: str
- group_id: str
- message_id: str
- message_txt: str
- from_admin: bool
- def __post_init__(self):
- self.is_command = False
- if len(self.message_txt) > 0 and self.message_txt[0] in BANGS:
- cmd, raw = pop_arg(self.message_txt[1:].strip())
- if cmd is not None:
- self.is_command = True
- self.command = cmd.lower()
- self.raw_args = raw
- @staticmethod
- def from_groupme(msg, global_admins=(), group_admins={}):
- sender_id = msg["sender_id"]
- group_id = msg["group_id"]
- return RollbotMessage(
- "GROUPME",
- msg["name"],
- sender_id,
- group_id,
- msg["id"],
- msg["text"].strip(),
- sender_id in global_admins or (
- group_id in group_admins and
- sender_id in group_admins[group_id])
- )
- @staticmethod
- def from_discord(msg, global_admins=(), group_admins={}):
- sender_id = str(msg.author.id)
- group_id = str(msg.channel.id)
- return RollbotMessage(
- "DISCORD",
- msg.author.name,
- sender_id,
- group_id,
- msg.id,
- msg.content.strip(),
- sender_id in global_admins or (
- group_id in group_admins and
- sender_id in group_admins[group_id])
- )
- def args(self, normalize=True):
- arg, rest = pop_arg(self.raw_args)
- while arg is not None:
- yield arg.lower() if normalize else arg
- arg, rest = pop_arg(rest)
- class RollbotFailure(Enum):
- INVALID_COMMAND = auto()
- MISSING_SUBCOMMAND = auto()
- INVALID_SUBCOMMAND = auto()
- INVALID_ARGUMENTS = auto()
- SERVICE_DOWN = auto()
- PERMISSIONS = auto()
- INTERNAL_ERROR = auto()
- _RESPONSE_TEMPLATE = """Response{
- Original Message: %s,
- Text Response: %s,
- Image Response: %s,
- Respond: %s,
- Failure Reason: %s,
- Failure Notes: %s
- }"""
- @dataclass
- class RollbotResponse:
- msg: RollbotMessage
- txt: str = None
- img: str = None
- respond: bool = True
- failure: RollbotFailure = None
- debugging: dict = None
- def __post_init__(self):
- self.info = _RESPONSE_TEMPLATE % (self.msg, self.txt, self.img, self.respond, self.failure, self.debugging)
- self.is_success = self.failure is None
- if self.failure is None:
- self.failure_msg = None
- elif self.failure == RollbotFailure.INVALID_COMMAND:
- self.failure_msg = "Sorry - I don't think I understand the command '!%s'... " % self.msg.command \
- + "I'll try to figure it out and get back to you!"
- elif self.failure == RollbotFailure.MISSING_SUBCOMMAND:
- self.failure_msg = "Sorry - !%s requires a sub-command." % self.msg.command
- elif self.failure == RollbotFailure.INVALID_SUBCOMMAND:
- self.failure_msg = "Sorry - the sub-command you used for %s was not valid." % self.msg.command
- elif self.failure == RollbotFailure.INVALID_ARGUMENTS:
- self.failure_msg = "Sorry - %s cannot use those arguments!" % self.msg.command
- elif self.failure == RollbotFailure.SERVICE_DOWN:
- self.failure_msg = "Sorry - %s relies on a service I couldn't reach!" % self.msg.command
- elif self.failure == RollbotFailure.PERMISSIONS:
- self.failure_msg = "Sorry - you don't have permission to use that command or sub-command in this chat!"
- elif self.failure == RollbotFailure.INTERNAL_ERROR:
- self.failure_msg = "Sorry - I encountered an unrecoverable error, please review internal logs."
- if self.debugging is not None and "explain" in self.debugging:
- self.failure_msg += " " + self.debugging["explain"]
- class RollbotPlugin:
- def __init__(self, command, bot, logger=logging.getLogger(__name__)):
- self.command = command
- self.bot = bot
- self.logger = logger
- self.logger.info(f"Intializing {type(self).__name__} matching {command}")
- def on_start(self, db):
- self.logger.info(f"No on_start initialization of {type(self).__name__}")
- def on_shutdown(self, db):
- self.logger.info(f"No on_shutdown de-initialization of {type(self).__name__}")
- def on_command(self, db, message):
- raise NotImplementedError
- def as_plugin(command):
- if isinstance(command, str):
- command_name = command
- else:
- command_name = command.__name__
- def init_standin(self, bot, logger=logging.getLogger(__name__)):
- RollbotPlugin.__init__(self, command_name, bot, logger=logger)
- def decorator(fn):
- sig = inspect.signature(fn)
- converters = []
- for p in sig.parameters:
- if p in ("msg", "message", "_msg"):
- converters.append(lambda self, db, msg: msg)
- elif p in ("db", "database"):
- converters.append(lambda self, db, msg: db)
- elif p in ("log", "logger"):
- converters.append(lambda self, db, msg: self.logger)
- elif p in ("bot", "rollbot"):
- converters.append(lambda self, db, msg: self.bot)
- elif p.startswith("data") or p.endswith("data") or p in ("group_singleton", "singleton"):
- subp = fn.__annotations__.get(p, "")
- converters.append(lambda self, db, msg, subp=subp: GroupBasedSingleton.get_or_create(db, msg.group_id, self.command, subp))
- else:
- raise ValueError(f"Illegal argument name {p} in decorated plugin {command_name}")
- def on_command_standin(self, db, msg):
- res = fn(*[c(self, db, msg) for c in converters])
- if isinstance(res, RollbotResponse):
- return res
- else:
- return RollbotResponse(msg, txt=str(res))
- return type(
- f"AutoGenerated`{command_name}`Command",
- (RollbotPlugin,),
- dict(
- __init__=init_standin,
- on_command=on_command_standin,
- )
- )
- if isinstance(command, str):
- return decorator
- else:
- return decorator(command)
|