123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- import logging
- from sqlalchemy import Column, Integer, String
- from command_system import RollbotResponse, RollbotFailure, RollbotPlugin, as_plugin, ModelBase, pop_arg
- from config import get_secret
- class CurseBlessScore(ModelBase):
- __tablename__ = "curse_bless_score"
- group_id = Column(String, primary_key=True)
- person_id = Column(Integer, primary_key=True)
- curses = Column(Integer)
- blessings = Column(Integer)
- def get_score(db, group_id, person_id):
- score = db.query(CurseBlessScore).get((group_id, person_id))
- if not score:
- score = CurseBlessScore(group_id=group_id, person_id=person_id, curses=0, blessings=0)
- db.add(score)
- return score
- def is_banned(msg, person_id):
- return msg.sender_id in BAN_LIST and person_id in BAN_LIST[msg.sender_id]
- def fmt_times(n):
- return str(n) + (" time" if n == 1 else " times")
- def get_response(msg, name, score):
- return RollbotResponse(msg, txt=f"It is done! {name} has been cursed {fmt_times(score.curses)} and blessed {fmt_times(score.blessings)} in this chat.")
- def curse_agg(db, msg, args):
- name, _ = pop_arg(args)
- person_id = name.strip().lower()
- curses = 0
- blessings = 0
- for cbs in db.query(CurseBlessScore).filter(CurseBlessScore.person_id == person_id):
- curses += cbs.curses
- blessings += cbs.blessings
- return RollbotResponse(msg, txt=f"From my records, {name} has been cursed {fmt_times(curses)} and blessed {fmt_times(blessings)} across all chats.")
- def curse_clear(db, msg, args):
- name, _ = pop_arg(args)
- if name is None:
- return RollbotResponse(msg, txt=f"Sorry! I need to know who's records you're trying to clear!")
- person_id = name.strip().lower()
- if not msg.from_admin:
- return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS)
- cbs = db.query(CurseBlessScore).get((msg.group_id, person_id))
- if cbs is None:
- return RollbotResponse(msg, txt=f"Sorry! I don't know who {name} is!")
- oc = cbs.curses
- ob = cbs.blessings
- cbs.curses = 0
- cbs.blessings = 0
- return RollbotResponse(msg, txt=f"Done! I have cleared all records of {name} in this chat, who was previously cursed {fmt_times(oc)} and blessed {fmt_times(ob)}!")
- def curse_clearall(db, msg, args):
- name, _ = pop_arg(args)
- if name is None:
- return RollbotResponse(msg, txt=f"Sorry! I need to know who's records you're trying to clear!")
- person_id = name.strip().lower()
- if not msg.from_admin:
- return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS)
- cnt = 0
- oc = 0
- ob = 0
- for cbs in db.query(CurseBlessScore).filter(CurseBlessScore.person_id == person_id):
- oc += cbs.curses
- ob += cbs.blessings
- cbs.curses = 0
- cbs.blessings = 0
- cnt += 1
- if cnt == 0:
- return RollbotResponse(msg, txt=f"Sorry! I don't know who {name} is!")
- return RollbotResponse(msg, txt=f"Done! I have cleared all records of {name} in all chats, who was previously cursed {fmt_times(oc)} and blessed {fmt_times(ob)} in total!")
- def curse_top(db, msg, args):
- rankings = "\n".join(f"{i + 1}. {cbs.person_id.capitalize()} was cursed {fmt_times(cbs.curses)}"
- for i, cbs in enumerate(db.query(CurseBlessScore
- ).filter(CurseBlessScore.group_id == msg.group_id
- ).filter(CurseBlessScore.curses != 0
- ).order_by(CurseBlessScore.curses.desc()
- ).limit(10)))
- if len(rankings) == 0:
- return RollbotResponse(msg, txt="Sorry! I don't have enough curse history in this chat to give statistics.")
- return RollbotResponse(msg, txt=f"The worst offenders of this chat:\n{rankings}")
- SUBC_MAP = {
- "aggregate": curse_agg,
- "clear": curse_clear,
- "clearall": curse_clearall,
- "top": curse_top,
- }
- BAN_LIST = get_secret("curse.banlist")
- @as_plugin
- def curse(db, msg):
- # TODO might be nice to add subcommands to this later
- name, args = pop_arg(msg.raw_args)
- if not name.startswith("!"):
- person_id = name.strip().lower()
- if is_banned(msg, person_id):
- return RollbotResponse(msg, txt="Hey! You aren't allowed to affect that person's score!")
- score = get_score(db, msg.group_id, person_id)
- # Note we do this instead of += b/c that can create race conditions in sqlalchemy
- score.curses = score.curses + 1
- return get_response(msg, name, score)
- # strip off the '!'
- subc = name[1:].strip()
- if len(subc) == 0:
- # handle the case of spaces between ! and subcommand
- subc, args = pop_arg(args)
- subc = subc.lower()
- return SUBC_MAP.get(subc, lambda *a: RollbotResponse(msg, failure=RollbotFailure.INVALID_SUBCOMMAND))(db, msg, args)
- class Bless(RollbotPlugin):
- def __init__(self, bot, logger=logging.getLogger(__name__)):
- RollbotPlugin.__init__(self, "bless", bot, logger=logger)
- def do_score_change(self, score):
- score.blessings = score.blessings + 1
- def on_command(self, db, msg):
- name, _ = pop_arg(msg.raw_args)
- if name.startswith("!"):
- return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!"
- person_id = name.strip().lower()
- if is_banned(msg, person_id):
- self.bot.manually_post_message("Hey! You aren't allowed to affect that person's score! And cheaters never propser!", msg.group_id)
- return RollbotResponse(msg, txt=f"!curse {BAN_LIST[msg.sender_id][0]} for trying to bless someone they can't!")
- score = get_score(db, msg.group_id, person_id)
- self.do_score_change(score)
- return get_response(msg, name, score)
- class Blurse(Bless):
- def __init__(self, bot, logger=logging.getLogger(__name__)):
- RollbotPlugin.__init__(self, "blurse", bot, logger=logger)
- def do_score_change(self, score):
- score.blessings = score.blessings + 1
- score.curses = score.curses + 1
- @as_plugin
- def unbless(db, msg):
- name, _ = pop_arg(msg.raw_args)
- if name.startswith("!"):
- return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!"
- if not msg.from_admin:
- return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS, debugging={ "explain": "For now, only admins can unbless or uncurse people!" })
- person_id = name.strip().lower()
- if is_banned(msg, person_id):
- return "Hey! You aren't allowed to affect that person's score!"
- score = get_score(db, msg.group_id, person_id)
- score.blessings = score.blessings - 1
- return get_response(msg, name, score)
- @as_plugin
- def uncurse(db, msg):
- name, _ = pop_arg(msg.raw_args)
- if name.startswith("!"):
- return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!"
- if not msg.from_admin:
- return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS, debugging={ "explain": "For now, only admins can unbless or uncurse people!" })
- person_id = name.strip().lower()
- if is_banned(msg, person_id):
- return "Hey! You aren't allowed to affect that person's score!"
- score = get_score(db, msg.group_id, person_id)
- score.curses = score.curses - 1
- return get_response(msg, name, score)
|