curse.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import logging
  2. from sqlalchemy import Column, Integer, String
  3. from command_system import RollbotResponse, RollbotFailure, RollbotPlugin, as_plugin, ModelBase, pop_arg
  4. from config import get_secret
  5. class CurseBlessScore(ModelBase):
  6. __tablename__ = "curse_bless_score"
  7. group_id = Column(String, primary_key=True)
  8. person_id = Column(Integer, primary_key=True)
  9. curses = Column(Integer)
  10. blessings = Column(Integer)
  11. def get_score(db, group_id, person_id):
  12. score = db.query(CurseBlessScore).get((group_id, person_id))
  13. if not score:
  14. score = CurseBlessScore(group_id=group_id, person_id=person_id, curses=0, blessings=0)
  15. db.add(score)
  16. return score
  17. def is_banned(msg, person_id):
  18. return msg.sender_id in BAN_LIST and person_id in BAN_LIST[msg.sender_id]
  19. def fmt_times(n):
  20. return str(n) + (" time" if n == 1 else " times")
  21. def get_response(msg, name, score):
  22. return RollbotResponse(msg, txt=f"It is done! {name} has been cursed {fmt_times(score.curses)} and blessed {fmt_times(score.blessings)} in this chat.")
  23. def curse_agg(db, msg, args):
  24. name, _ = pop_arg(args)
  25. person_id = name.strip().lower()
  26. curses = 0
  27. blessings = 0
  28. for cbs in db.query(CurseBlessScore).filter(CurseBlessScore.person_id == person_id):
  29. curses += cbs.curses
  30. blessings += cbs.blessings
  31. return RollbotResponse(msg, txt=f"From my records, {name} has been cursed {fmt_times(curses)} and blessed {fmt_times(blessings)} across all chats.")
  32. def curse_clear(db, msg, args):
  33. name, _ = pop_arg(args)
  34. if name is None:
  35. return RollbotResponse(msg, txt=f"Sorry! I need to know who's records you're trying to clear!")
  36. person_id = name.strip().lower()
  37. if not msg.from_admin:
  38. return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS)
  39. cbs = db.query(CurseBlessScore).get((msg.group_id, person_id))
  40. if cbs is None:
  41. return RollbotResponse(msg, txt=f"Sorry! I don't know who {name} is!")
  42. oc = cbs.curses
  43. ob = cbs.blessings
  44. cbs.curses = 0
  45. cbs.blessings = 0
  46. return RollbotResponse(msg, txt=f"Done! I have cleared all records of {name} in this chat, who was previously cursed {fmt_times(oc)} and blessed {fmt_times(ob)}!")
  47. def curse_clearall(db, msg, args):
  48. name, _ = pop_arg(args)
  49. if name is None:
  50. return RollbotResponse(msg, txt=f"Sorry! I need to know who's records you're trying to clear!")
  51. person_id = name.strip().lower()
  52. if not msg.from_admin:
  53. return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS)
  54. cnt = 0
  55. oc = 0
  56. ob = 0
  57. for cbs in db.query(CurseBlessScore).filter(CurseBlessScore.person_id == person_id):
  58. oc += cbs.curses
  59. ob += cbs.blessings
  60. cbs.curses = 0
  61. cbs.blessings = 0
  62. cnt += 1
  63. if cnt == 0:
  64. return RollbotResponse(msg, txt=f"Sorry! I don't know who {name} is!")
  65. return RollbotResponse(msg, txt=f"Done! I have cleared all records of {name} in all chats, who was previously cursed {fmt_times(oc)} and blessed {fmt_times(ob)} in total!")
  66. def curse_top(db, msg, args):
  67. rankings = "\n".join(f"{i + 1}. {cbs.person_id.capitalize()} was cursed {fmt_times(cbs.curses)}"
  68. for i, cbs in enumerate(db.query(CurseBlessScore
  69. ).filter(CurseBlessScore.group_id == msg.group_id
  70. ).filter(CurseBlessScore.curses != 0
  71. ).order_by(CurseBlessScore.curses.desc()
  72. ).limit(10)))
  73. if len(rankings) == 0:
  74. return RollbotResponse(msg, txt="Sorry! I don't have enough curse history in this chat to give statistics.")
  75. return RollbotResponse(msg, txt=f"The worst offenders of this chat:\n{rankings}")
  76. SUBC_MAP = {
  77. "aggregate": curse_agg,
  78. "clear": curse_clear,
  79. "clearall": curse_clearall,
  80. "top": curse_top,
  81. }
  82. BAN_LIST = get_secret("curse.banlist")
  83. @as_plugin
  84. def curse(db, msg):
  85. # TODO might be nice to add subcommands to this later
  86. name, args = pop_arg(msg.raw_args)
  87. if not name.startswith("!"):
  88. person_id = name.strip().lower()
  89. if is_banned(msg, person_id):
  90. return RollbotResponse(msg, txt="Hey! You aren't allowed to affect that person's score!")
  91. score = get_score(db, msg.group_id, person_id)
  92. # Note we do this instead of += b/c that can create race conditions in sqlalchemy
  93. score.curses = score.curses + 1
  94. return get_response(msg, name, score)
  95. # strip off the '!'
  96. subc = name[1:].strip()
  97. if len(subc) == 0:
  98. # handle the case of spaces between ! and subcommand
  99. subc, args = pop_arg(args)
  100. subc = subc.lower()
  101. return SUBC_MAP.get(subc, lambda *a: RollbotResponse(msg, failure=RollbotFailure.INVALID_SUBCOMMAND))(db, msg, args)
  102. class Bless(RollbotPlugin):
  103. def __init__(self, bot, logger=logging.getLogger(__name__)):
  104. RollbotPlugin.__init__(self, "bless", bot, logger=logger)
  105. def do_score_change(self, score):
  106. score.blessings = score.blessings + 1
  107. def on_command(self, db, msg):
  108. name, _ = pop_arg(msg.raw_args)
  109. if name.startswith("!"):
  110. return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!"
  111. person_id = name.strip().lower()
  112. if is_banned(msg, person_id):
  113. self.bot.manually_post_message("Hey! You aren't allowed to affect that person's score! And cheaters never propser!", msg.group_id)
  114. return RollbotResponse(msg, txt=f"!curse {BAN_LIST[msg.sender_id][0]} for trying to bless someone they can't!")
  115. score = get_score(db, msg.group_id, person_id)
  116. self.do_score_change(score)
  117. return get_response(msg, name, score)
  118. class Blurse(Bless):
  119. def __init__(self, bot, logger=logging.getLogger(__name__)):
  120. RollbotPlugin.__init__(self, "blurse", bot, logger=logger)
  121. def do_score_change(self, score):
  122. score.blessings = score.blessings + 1
  123. score.curses = score.curses + 1
  124. @as_plugin
  125. def unbless(db, msg):
  126. name, _ = pop_arg(msg.raw_args)
  127. if name.startswith("!"):
  128. return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!"
  129. if not msg.from_admin:
  130. return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS, debugging={ "explain": "For now, only admins can unbless or uncurse people!" })
  131. person_id = name.strip().lower()
  132. if is_banned(msg, person_id):
  133. return "Hey! You aren't allowed to affect that person's score!"
  134. score = get_score(db, msg.group_id, person_id)
  135. score.blessings = score.blessings - 1
  136. return get_response(msg, name, score)
  137. @as_plugin
  138. def uncurse(db, msg):
  139. name, _ = pop_arg(msg.raw_args)
  140. if name.startswith("!"):
  141. return "Sorry! Subcommands have to go on !curse for now - this will be fixed in the future!"
  142. if not msg.from_admin:
  143. return RollbotResponse(msg, failure=RollbotFailure.PERMISSIONS, debugging={ "explain": "For now, only admins can unbless or uncurse people!" })
  144. person_id = name.strip().lower()
  145. if is_banned(msg, person_id):
  146. return "Hey! You aren't allowed to affect that person's score!"
  147. score = get_score(db, msg.group_id, person_id)
  148. score.curses = score.curses - 1
  149. return get_response(msg, name, score)