import logging from sqlalchemy import Column, Integer, String from command_system import RollbotResponse, RollbotFailure, RollbotPlugin, as_plugin, ModelBase, pop_arg from config import get_secret class CurseBlessScore(ModelBase): __tablename__ = "curse_bless_score" group_id = Column(String, primary_key=True) person_id = Column(Integer, primary_key=True) curses = Column(Integer) blessings = Column(Integer) def get_score(db, group_id, person_id): score = db.query(CurseBlessScore).get((group_id, person_id)) if not score: score = CurseBlessScore(group_id=group_id, person_id=person_id, curses=0, blessings=0) db.add(score) return score def is_banned(msg, person_id): return msg.sender_id in BAN_LIST and person_id in BAN_LIST[msg.sender_id] def fmt_times(n): return str(n) + (" time" if n == 1 else " times") def get_response(msg, name, score): return RollbotResponse(msg, txt=f"It is done! {name} has been cursed {fmt_times(score.curses)} and blessed {fmt_times(score.blessings)} in this chat.") def curse_agg(db, msg, args): name, _ = pop_arg(args) person_id = name.strip().lower() curses = 0 blessings = 0 for cbs in db.query(CurseBlessScore).filter(CurseBlessScore.person_id == person_id): curses += cbs.curses blessings += cbs.blessings return RollbotResponse(msg, txt=f"From my records, {name} has been cursed {fmt_times(curses)} and blessed {fmt_times(blessings)} across all chats.") def curse_clear(db, msg, args): name, _ = pop_arg(args) if name is None: return RollbotResponse(msg, txt=f"Sorry! I need to know who's records you're trying to clear!") person_id = name.strip().lower() if not msg.from_admin: return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS) cbs = db.query(CurseBlessScore).get((msg.group_id, person_id)) if cbs is None: return RollbotResponse(msg, txt=f"Sorry! I don't know who {name} is!") oc = cbs.curses ob = cbs.blessings cbs.curses = 0 cbs.blessings = 0 return RollbotResponse(msg, txt=f"Done! I have cleared all records of {name} in this chat, who was previously cursed {fmt_times(oc)} and blessed {fmt_times(ob)}!") def curse_clearall(db, msg, args): name, _ = pop_arg(args) if name is None: return RollbotResponse(msg, txt=f"Sorry! I need to know who's records you're trying to clear!") person_id = name.strip().lower() if not msg.from_admin: return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS) cnt = 0 oc = 0 ob = 0 for cbs in db.query(CurseBlessScore).filter(CurseBlessScore.person_id == person_id): oc += cbs.curses ob += cbs.blessings cbs.curses = 0 cbs.blessings = 0 cnt += 1 if cnt == 0: return RollbotResponse(msg, txt=f"Sorry! I don't know who {name} is!") return RollbotResponse(msg, txt=f"Done! I have cleared all records of {name} in all chats, who was previously cursed {fmt_times(oc)} and blessed {fmt_times(ob)} in total!") def curse_top(db, msg, args): rankings = "\n".join(f"{i + 1}. {cbs.person_id.capitalize()} was cursed {fmt_times(cbs.curses)}" for i, cbs in enumerate(db.query(CurseBlessScore ).filter(CurseBlessScore.group_id == msg.group_id ).filter(CurseBlessScore.curses != 0 ).order_by(CurseBlessScore.curses.desc() ).limit(10))) if len(rankings) == 0: return RollbotResponse(msg, txt="Sorry! I don't have enough curse history in this chat to give statistics.") return RollbotResponse(msg, txt=f"The worst offenders of this chat:\n{rankings}") SUBC_MAP = { "aggregate": curse_agg, "clear": curse_clear, "clearall": curse_clearall, "top": curse_top, } BAN_LIST = get_secret("curse.banlist") @as_plugin def curse(db, msg): # TODO might be nice to add subcommands to this later name, args = pop_arg(msg.raw_args) if not name.startswith("!"): person_id = name.strip().lower() if is_banned(msg, person_id): return RollbotResponse(msg, txt="Hey! You aren't allowed to affect that person's score!") score = get_score(db, msg.group_id, person_id) # Note we do this instead of += b/c that can create race conditions in sqlalchemy score.curses = score.curses + 1 return get_response(msg, name, score) # strip off the '!' subc = name[1:].strip() if len(subc) == 0: # handle the case of spaces between ! and subcommand subc, args = pop_arg(args) subc = subc.lower() return SUBC_MAP.get(subc, lambda *a: RollbotResponse(msg, failure=RollbotFailure.INVALID_SUBCOMMAND))(db, msg, args) class Bless(RollbotPlugin): def __init__(self, bot, logger=logging.getLogger(__name__)): RollbotPlugin.__init__(self, "bless", bot, logger=logger) def do_score_change(self, score): score.blessings = score.blessings + 1 def on_command(self, db, msg): name, _ = pop_arg(msg.raw_args) if name.startswith("!"): return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!" person_id = name.strip().lower() if is_banned(msg, person_id): self.bot.manually_post_message("Hey! You aren't allowed to affect that person's score! And cheaters never propser!", msg.group_id) return RollbotResponse(msg, txt=f"!curse {BAN_LIST[msg.sender_id][0]} for trying to bless someone they can't!") score = get_score(db, msg.group_id, person_id) self.do_score_change(score) return get_response(msg, name, score) class Blurse(Bless): def __init__(self, bot, logger=logging.getLogger(__name__)): RollbotPlugin.__init__(self, "blurse", bot, logger=logger) def do_score_change(self, score): score.blessings = score.blessings + 1 score.curses = score.curses + 1 @as_plugin def unbless(db, msg): name, _ = pop_arg(msg.raw_args) if name.startswith("!"): return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!" if not msg.from_admin: return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS, debugging={ "explain": "For now, only admins can unbless or uncurse people!" }) person_id = name.strip().lower() if is_banned(msg, person_id): return "Hey! You aren't allowed to affect that person's score!" score = get_score(db, msg.group_id, person_id) score.blessings = score.blessings - 1 return get_response(msg, name, score) @as_plugin def uncurse(db, msg): name, _ = pop_arg(msg.raw_args) if name.startswith("!"): return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!" if not msg.from_admin: return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS, debugging={ "explain": "For now, only admins can unbless or uncurse people!" }) person_id = name.strip().lower() if is_banned(msg, person_id): return "Hey! You aren't allowed to affect that person's score!" score = get_score(db, msg.group_id, person_id) score.curses = score.curses - 1 return get_response(msg, name, score)