|
@@ -1,279 +0,0 @@
|
|
|
-import logging
|
|
|
-from dataclasses import dataclass
|
|
|
-from enum import Enum, auto
|
|
|
-import inspect
|
|
|
-import functools
|
|
|
-import pickle
|
|
|
-import datetime
|
|
|
-
|
|
|
-from sqlalchemy import Column, DateTime, Binary, String, Float, Integer
|
|
|
-from sqlalchemy.ext.declarative import declarative_base
|
|
|
-
|
|
|
-
|
|
|
-BANGS = ('!',)
|
|
|
-
|
|
|
-ModelBase = declarative_base()
|
|
|
-
|
|
|
-
|
|
|
-class GroupBasedSingleton(ModelBase):
|
|
|
- __tablename__ = "group_based_singleton"
|
|
|
- group_id = Column(String, primary_key=True)
|
|
|
- command_name = Column(String, primary_key=True)
|
|
|
- subpart_name = Column(String, primary_key=True)
|
|
|
- integer_data = Column(Integer)
|
|
|
- float_data = Column(Float)
|
|
|
- string_data = Column(String)
|
|
|
- binary_data = Column(Binary)
|
|
|
- datetime_data = Column(DateTime)
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def get_or_create(db, group_id, command_name, subpart_name):
|
|
|
- sing = db.query(GroupBasedSingleton).get((group_id, command_name, subpart_name))
|
|
|
- if sing is None:
|
|
|
- sing = GroupBasedSingleton(
|
|
|
- group_id=group_id,
|
|
|
- command_name=command_name,
|
|
|
- subpart_name=subpart_name,
|
|
|
- integer_data=None,
|
|
|
- float_data=None,
|
|
|
- string_data=None,
|
|
|
- binary_data=None,
|
|
|
- datetime_data=None
|
|
|
- )
|
|
|
- db.add(sing)
|
|
|
- return sing
|
|
|
-
|
|
|
- def set_binary(self, obj):
|
|
|
- self.binary_data = pickle.dumps(obj)
|
|
|
-
|
|
|
- def get_binary(self):
|
|
|
- if self.binary_data is None:
|
|
|
- return None
|
|
|
- return pickle.loads(self.binary_data)
|
|
|
-
|
|
|
-
|
|
|
-def as_group_singleton(cls):
|
|
|
- columns = {}
|
|
|
- for name, typ in cls.__annotations__.items():
|
|
|
- if name == "group_id":
|
|
|
- raise ValueError(f"Cannot have column named group_id in as_group_singleton class {cls.__name__}")
|
|
|
- if typ == int:
|
|
|
- columns[name] = Column(Integer)
|
|
|
- elif typ == float:
|
|
|
- columns[name] = Column(Float)
|
|
|
- elif typ == str:
|
|
|
- columns[name] = Column(String)
|
|
|
- elif typ in (object, "binary"):
|
|
|
- columns[name] = Column(Binary)
|
|
|
- elif typ == datetime.datetime:
|
|
|
- columns[name] = Column(DateTime)
|
|
|
- else:
|
|
|
- raise TypeError(f"Unsupported annotation {typ} for {name} in {cls.__name__}")
|
|
|
-
|
|
|
- cons_params = {k: getattr(cls, k, None) for k in columns}
|
|
|
-
|
|
|
- def get_or_create_standin(cls, db, group_id):
|
|
|
- sing = db.query(cls).get(group_id)
|
|
|
- if sing is None:
|
|
|
- sing = cls(group_id=group_id, **cons_params)
|
|
|
- db.add(sing)
|
|
|
- return sing
|
|
|
-
|
|
|
- columns["__tablename__"] = "".join(("_" + c.lower()) if "A" <= c <= "Z" else c for c in cls.__name__).strip("_")
|
|
|
- columns["group_id"] = Column(String, primary_key=True)
|
|
|
-
|
|
|
- return type(
|
|
|
- cls.__name__,
|
|
|
- (ModelBase,),
|
|
|
- dict(**columns, get_or_create=classmethod(get_or_create_standin))
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
-def pop_arg(text):
|
|
|
- if text is None:
|
|
|
- return None, None
|
|
|
- parts = text.split(maxsplit=1)
|
|
|
- if len(parts) == 1:
|
|
|
- return parts[0], None
|
|
|
- return parts[0], parts[1].strip()
|
|
|
-
|
|
|
-
|
|
|
-@dataclass
|
|
|
-class RollbotMessage:
|
|
|
- src: str
|
|
|
- name: str
|
|
|
- sender_id: str
|
|
|
- group_id: str
|
|
|
- message_id: str
|
|
|
- message_txt: str
|
|
|
- from_admin: bool
|
|
|
-
|
|
|
- def __post_init__(self):
|
|
|
- self.is_command = False
|
|
|
- if len(self.message_txt) > 0 and self.message_txt[0] in BANGS:
|
|
|
- cmd, raw = pop_arg(self.message_txt[1:].strip())
|
|
|
- if cmd is not None:
|
|
|
- self.is_command = True
|
|
|
- self.command = cmd.lower()
|
|
|
- self.raw_args = raw
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def from_groupme(msg, global_admins=(), group_admins={}):
|
|
|
- sender_id = msg["sender_id"]
|
|
|
- group_id = msg["group_id"]
|
|
|
- return RollbotMessage(
|
|
|
- "GROUPME",
|
|
|
- msg["name"],
|
|
|
- sender_id,
|
|
|
- group_id,
|
|
|
- msg["id"],
|
|
|
- msg["text"].strip(),
|
|
|
- sender_id in global_admins or (
|
|
|
- group_id in group_admins and
|
|
|
- sender_id in group_admins[group_id])
|
|
|
- )
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def from_discord(msg, global_admins=(), group_admins={}):
|
|
|
- sender_id = str(msg.author.id)
|
|
|
- group_id = str(msg.channel.id)
|
|
|
- return RollbotMessage(
|
|
|
- "DISCORD",
|
|
|
- msg.author.name,
|
|
|
- sender_id,
|
|
|
- group_id,
|
|
|
- msg.id,
|
|
|
- msg.content.strip(),
|
|
|
- sender_id in global_admins or (
|
|
|
- group_id in group_admins and
|
|
|
- sender_id in group_admins[group_id])
|
|
|
- )
|
|
|
-
|
|
|
- def args(self, normalize=True):
|
|
|
- arg, rest = pop_arg(self.raw_args)
|
|
|
- while arg is not None:
|
|
|
- yield arg.lower() if normalize else arg
|
|
|
- arg, rest = pop_arg(rest)
|
|
|
-
|
|
|
-
|
|
|
-class RollbotFailure(Enum):
|
|
|
- INVALID_COMMAND = auto()
|
|
|
- MISSING_SUBCOMMAND = auto()
|
|
|
- INVALID_SUBCOMMAND = auto()
|
|
|
- INVALID_ARGUMENTS = auto()
|
|
|
- SERVICE_DOWN = auto()
|
|
|
- PERMISSIONS = auto()
|
|
|
- INTERNAL_ERROR = auto()
|
|
|
-
|
|
|
-
|
|
|
-_RESPONSE_TEMPLATE = """Response{
|
|
|
- Original Message: %s,
|
|
|
- Text Response: %s,
|
|
|
- Image Response: %s,
|
|
|
- Respond: %s,
|
|
|
- Failure Reason: %s,
|
|
|
- Failure Notes: %s
|
|
|
-}"""
|
|
|
-
|
|
|
-
|
|
|
-@dataclass
|
|
|
-class RollbotResponse:
|
|
|
- msg: RollbotMessage
|
|
|
- txt: str = None
|
|
|
- img: str = None
|
|
|
- respond: bool = True
|
|
|
- failure: RollbotFailure = None
|
|
|
- debugging: dict = None
|
|
|
-
|
|
|
- def __post_init__(self):
|
|
|
- self.info = _RESPONSE_TEMPLATE % (self.msg, self.txt, self.img, self.respond, self.failure, self.debugging)
|
|
|
- self.is_success = self.failure is None
|
|
|
- if self.failure is None:
|
|
|
- self.failure_msg = None
|
|
|
- elif self.failure == RollbotFailure.INVALID_COMMAND:
|
|
|
- self.failure_msg = "Sorry - I don't think I understand the command '!%s'... " % self.msg.command \
|
|
|
- + "I'll try to figure it out and get back to you!"
|
|
|
- elif self.failure == RollbotFailure.MISSING_SUBCOMMAND:
|
|
|
- self.failure_msg = "Sorry - !%s requires a sub-command." % self.msg.command
|
|
|
- elif self.failure == RollbotFailure.INVALID_SUBCOMMAND:
|
|
|
- self.failure_msg = "Sorry - the sub-command you used for %s was not valid." % self.msg.command
|
|
|
- elif self.failure == RollbotFailure.INVALID_ARGUMENTS:
|
|
|
- self.failure_msg = "Sorry - %s cannot use those arguments!" % self.msg.command
|
|
|
- elif self.failure == RollbotFailure.SERVICE_DOWN:
|
|
|
- self.failure_msg = "Sorry - %s relies on a service I couldn't reach!" % self.msg.command
|
|
|
- elif self.failure == RollbotFailure.PERMISSIONS:
|
|
|
- self.failure_msg = "Sorry - you don't have permission to use that command or sub-command in this chat!"
|
|
|
- elif self.failure == RollbotFailure.INTERNAL_ERROR:
|
|
|
- self.failure_msg = "Sorry - I encountered an unrecoverable error, please review internal logs."
|
|
|
-
|
|
|
- if self.debugging is not None and "explain" in self.debugging:
|
|
|
- self.failure_msg += " " + self.debugging["explain"]
|
|
|
-
|
|
|
-
|
|
|
-class RollbotPlugin:
|
|
|
- def __init__(self, command, bot, logger=logging.getLogger(__name__)):
|
|
|
- self.command = command
|
|
|
- self.bot = bot
|
|
|
- self.logger = logger
|
|
|
- self.logger.info(f"Intializing {type(self).__name__} matching {command}")
|
|
|
-
|
|
|
- def on_start(self, db):
|
|
|
- self.logger.info(f"No on_start initialization of {type(self).__name__}")
|
|
|
-
|
|
|
- def on_shutdown(self, db):
|
|
|
- self.logger.info(f"No on_shutdown de-initialization of {type(self).__name__}")
|
|
|
-
|
|
|
- def on_command(self, db, message):
|
|
|
- raise NotImplementedError
|
|
|
-
|
|
|
-
|
|
|
-def as_plugin(command):
|
|
|
- if isinstance(command, str):
|
|
|
- command_name = command
|
|
|
- else:
|
|
|
- command_name = command.__name__
|
|
|
-
|
|
|
- def init_standin(self, bot, logger=logging.getLogger(__name__)):
|
|
|
- RollbotPlugin.__init__(self, command_name, bot, logger=logger)
|
|
|
-
|
|
|
- def decorator(fn):
|
|
|
- sig = inspect.signature(fn)
|
|
|
- converters = []
|
|
|
- for p in sig.parameters:
|
|
|
- if p in ("msg", "message", "_msg"):
|
|
|
- converters.append(lambda cmd, db, msg: msg)
|
|
|
- elif p in ("db", "database"):
|
|
|
- converters.append(lambda cmd, db, msg: db)
|
|
|
- elif p in ("log", "logger"):
|
|
|
- converters.append(lambda cmd, db, msg: cmd.logger)
|
|
|
- elif p in ("bot", "rollbot"):
|
|
|
- converters.append(lambda cmd, db, msg: cmd.bot)
|
|
|
- elif p.startswith("data") or p.endswith("data") or p in ("group_singleton", "singleton"):
|
|
|
- annot = fn.__annotations__.get(p, p)
|
|
|
- if isinstance(annot, str):
|
|
|
- converters.append(lambda cmd, db, msg, subp=annot: GroupBasedSingleton.get_or_create(db, msg.group_id, cmd.command, subp))
|
|
|
- else:
|
|
|
- converters.append(lambda cmd, db, msg, sing_cls=annot: sing_cls.get_or_create(db, msg.group_id))
|
|
|
- else:
|
|
|
- raise ValueError(f"Illegal argument name {p} in decorated plugin {command_name}")
|
|
|
-
|
|
|
- def on_command_standin(self, db, msg):
|
|
|
- res = fn(*[c(self, db, msg) for c in converters])
|
|
|
- if isinstance(res, RollbotResponse):
|
|
|
- return res
|
|
|
- else:
|
|
|
- return RollbotResponse(msg, txt=str(res))
|
|
|
-
|
|
|
- return type(
|
|
|
- f"AutoGenerated`{command_name}`Command",
|
|
|
- (RollbotPlugin,),
|
|
|
- dict(
|
|
|
- __init__=init_standin,
|
|
|
- on_command=on_command_standin,
|
|
|
- )
|
|
|
- )
|
|
|
-
|
|
|
- if isinstance(command, str):
|
|
|
- return decorator
|
|
|
- else:
|
|
|
- return decorator(command)
|