import inspect from ...messaging import RollbotMessage class ArgConverter: def __init__(self, conv): self.conv = conv Message = ArgConverter(lambda _, __, msg: msg) Database = ArgConverter(lambda _, db, __: db) Logger = ArgConverter(lambda cmd, _, __: cmd.logger) Bot = ArgConverter(lambda cmd, _, __: cmd.bot) ArgList = ArgConverter(lambda _, __, msg: msg.arg_list()) Subcommand = ArgConverter(lambda _, __, msg: RollbotMessage.from_subcommand(msg)) class Config(ArgConverter): def __init__(self, key=None): if key is None: super().__init__(lambda cmd, _, __: cmd.bot.config) else: super().__init__(lambda cmd, _, __, key=key: cmd.bot.config.get(key)) def run_converter_function(fn, cmd, db, msg): return fn(*[fn.__annotations__[param].conv(cmd, db, msg) for param in inspect.signature(fn).parameters]) class Singleton(ArgConverter): def __init__(self, model_cls): super().__init__(lambda _, db, msg, model_cls=model_cls: model_cls.get_or_create(db, msg)) self.model_cls = model_cls def by(self, fn): def do_get(cmd, db, msg): key = run_converter_function(fn, cmd, db, msg) return db.query(self.model_cls).get(key) or self.model_cls.create_from_key(key) return ArgConverter(do_get) def by_all(self, fn): def do_get(cmd, db, msg): return [db.query(self.model_cls).get(x) or self.model_cls.create_from_key(x) for x in run_converter_function(fn, cmd, db, msg)] return ArgConverter(do_get) class Query(ArgConverter): def __init__(self, model_cls): super().__init__(self._convert) self.model_cls = model_cls self.filters = [] def filter(self, fn): self.filters.append(fn) return self def _convert(self, cmd, db, msg): query = db.query(self.model_cls) for fn in self.filters: query = query.filter(run_converter_function(fn, cmd, db, msg)) return query.all() class Lazy(ArgConverter): def __init__(self, child): super().__init__(self._convert) self.child = child self.result = None self.run = False def _convert(self, cmd, db, msg): self.cmd = cmd self.db = db self.msg = msg return self def get(self): if not self.run: self.result = self.child.conv(self.cmd, self.db, self.msg) return self.result def get_converters(parameters, annotations): converters = [] for p in parameters: annot = annotations.get(p, None) if isinstance(annot, ArgConverter): converters.append(annot.conv) elif p in ("msg", "message", "_msg"): converters.append(lambda cmd, db, msg: msg) elif p in ("db", "database"): converters.append(lambda cmd, db, msg: db) elif p in ("log", "logger"): converters.append(lambda cmd, db, msg: cmd.logger) elif p in ("bot", "rollbot"): converters.append(lambda cmd, db, msg: cmd.bot) elif p in ("args", "arg_list"): converters.append(lambda cmd, db, msg: msg.arg_list()) elif p in ("subc", "subcommand"): converters.append(lambda cmd, db, msg: RollbotMessage.from_subcommand(msg)) elif p in ("cfg", "config"): converters.append(lambda cmd, db, msg: cmd.bot.config) elif p.startswith("cfg") or p.endswith("cfg"): annot = annot or p converters.append(lambda cmd, db, msg, key=annot: cmd.bot.config.get(key)) elif p.startswith("data") or p.endswith("data"): converters.append(lambda cmd, db, msg, sing_cls=annot: sing_cls.get_or_create(db, msg)) else: raise ValueError(p) return converters