import inspect from ..messaging import RollbotMessage, RollbotFailure class ArgConverter: def __init__(self, conv): self.conv = conv Message = ArgConverter(lambda _, __, msg: msg) Database = ArgConverter(lambda _, db, __: db) Logger = ArgConverter(lambda cmd, _, __: cmd.logger) Bot = ArgConverter(lambda cmd, _, __: cmd.bot) ArgString = ArgConverter(lambda _, __, msg: msg.raw_args) ArgList = ArgConverter(lambda _, __, msg: msg.arg_list()) Subcommand = ArgConverter(lambda _, __, msg: RollbotMessage.from_subcommand(msg)) Subcommand.ArgString = ArgConverter(lambda _, __, msg: RollbotMessage.from_subcommand(msg).raw_args) Subcommand.ArgList = ArgConverter(lambda _, __, msg: RollbotMessage.from_subcommand(msg).arg_list()) class Arg(ArgConverter): def __init__(self, index, conversion=str, fail_msg=None): super().__init__(self._convert) self.index = index self.conversion = conversion self.fail_msg = fail_msg def _convert(self, cmd, db, msg): try: arg = msg.arg_list()[self.index] except IndexError: RollbotFailure.INVALID_ARGUMENTS.with_reason(f"Missing argument {self.index}").raise_exc() try: return self.conversion(arg) except ValueError: RollbotFailure.INVALID_ARGUMENTS.with_reason(self.fail_msg.format(arg)).raise_exc() class _SubcArg(Arg): def _convert(self, cmd, db, msg): return super()._convert(self, cmd, db, RollbotMessage.from_subcommand(msg)) Subcommand.Arg = _SubcArg class Config(ArgConverter): def __init__(self, key=None): if key is None: super().__init__(lambda cmd, _, __: cmd.bot.config) else: super().__init__(lambda cmd, _, __, key=key: cmd.bot.config.get(key)) def _run_converter_function(fn, cmd, db, msg): return fn(*[fn.__annotations__[param].conv(cmd, db, msg) for param in inspect.signature(fn).parameters]) class Singleton(ArgConverter): def __init__(self, model_cls): super().__init__(lambda _, db, msg, model_cls=model_cls: model_cls.get_or_create(db, msg)) self.model_cls = model_cls def by(self, fn): def do_get(cmd, db, msg): key = _run_converter_function(fn, cmd, db, msg) return db.query(self.model_cls).get(key) or self.model_cls.create_from_key(key) return ArgConverter(do_get) def by_all(self, fn): def do_get(cmd, db, msg): return [db.query(self.model_cls).get(x) or self.model_cls.create_from_key(x) for x in _run_converter_function(fn, cmd, db, msg)] return ArgConverter(do_get) class Query(ArgConverter): def __init__(self, model_cls): super().__init__(self._convert) self.model_cls = model_cls self.filters = [] def filter(self, fn): self.filters.append(fn) return self def _convert(self, cmd, db, msg): query = db.query(self.model_cls) for fn in self.filters: query = query.filter(_run_converter_function(fn, cmd, db, msg)) return query.all() class Lazy(ArgConverter): def __init__(self, child): super().__init__(self._convert) self.child = child self.result = None self.run = False def _convert(self, cmd, db, msg): self.cmd = cmd self.db = db self.msg = msg return self def get(self): if not self.run: self.result = self.child.conv(self.cmd, self.db, self.msg) return self.result class _Executor: def __init__(self, cmd, db, msg): self.cmd = cmd self.db = db self.msg = msg def run_with_deps(self, fn): return _run_converter_function(fn, cmd, db, msg) Injector = ArgConverter(_Executor)