123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import logging
- import time
- import traceback
- from dataclasses import dataclass
- from typing import List, Type, Dict, Callable, Any
- from .messaging import RollbotResponse, RollbotFailure
- from .plugins import as_plugin, RollbotPlugin
- from .database import init_db_at_url
- def lift_response(call, response):
- @as_plugin(call)
- def response_func(db, msg):
- return RollbotResponse(msg, txt=response)
- return response_func
- @dataclass
- class RollbotConfig:
- plugins: List[Type[RollbotPlugin]]
- db_url: str
- reply_callback: Callable[[str, str], None]
- aliases: Dict[str, str]
- responses: Dict[str, str]
- sleep_time: float
- other: Dict[str, Any]
- def get(self, key):
- c = self.other
- for k in key.split("."):
- c = c[k]
- return c
- class Rollbot:
- def __init__(self, config, logger=logging.getLogger(__name__)):
- self.logger = logger
- self.session_manager_factory = lambda: None
- self.post_callback = config.reply_callback or (lambda txt, gid: self.logger.info(f"Responding to {gid} with {txt}"))
- self.commands = {}
- self.to_start = set()
- self.to_stop = set()
- self.sleep_time = config.sleep_time
- self.last_exception = None
- self.config = config
- self.logger.info("Loading command plugins")
- for plugin_class in config.plugins:
- plugin_instance = plugin_class(self, logger=logger)
- if plugin_instance.command in self.commands:
- self.logger.error(f"Duplicate command word '{plugin_instance.command}'")
- raise ValueError(f"Duplicate command word '{plugin_instance.command}'")
- self.commands[plugin_instance.command] = plugin_instance
- if "on_start" in plugin_class.__dict__:
- self.to_start.add(plugin_instance)
- if "on_shutdown" in plugin_class.__dict__:
- self.to_stop.add(plugin_instance)
- self.logger.info(f"Finished loading plugins, {len(self.commands)} commands found")
- self.logger.info("Loading simple responses")
- for cmd, response in config.responses.items():
- if cmd in self.commands:
- self.logger.error(f"Duplicate command word '{cmd}'")
- raise ValueError(f"Duplicate command word '{cmd}'")
- self.commands[cmd] = lift_response(cmd, response)(self, logger=logger)
- self.logger.info(f"Finished loading simple responses, {len(self.commands)} total commands available")
- self.logger.info("Loading aliases")
- for alias, cmd in config.aliases.items():
- if cmd not in self.commands:
- self.logger.error(f"Missing aliased command word '{cmd}'")
- raise ValueError(f"Missing aliased command word '{cmd}'")
- if alias in self.commands:
- self.logger.error(f"Duplicate command word '{alias}'")
- raise ValueError(f"Duplicate command word '{alias}'")
- self.commands[alias] = self.commands[cmd]
- self.logger.info(f"Finished loading aliases, {len(self.commands)} total commands + aliases available")
- def init_db(self):
- self.session_manager_factory = init_db_at_url(self.config.db_url)
- def start_plugins(self):
- self.logger.info("Starting plugins")
- with self.session_manager_factory() as session:
- for cmd in self.to_start:
- cmd.on_start(session)
- self.logger.info("Finished starting plugins")
- def shutdown_plugins(self):
- self.logger.info("Shutting down plugins")
- with self.session_manager_factory() as session:
- for cmd in self.to_stop:
- cmd.on_shutdown(session)
- self.logger.info("Finished shutting down plugins")
- def run_command(self, message):
- if not message.is_command:
- self.logger.warn(f"Tried to run non-command message {message.message_id}")
- return RollbotResponse(message, failure=RollbotFailure.INTERNAL_ERROR)
- if message.command == "help":
- topic = next(message.args())
- targeted = self.commands.get(topic, None)
- if targeted is None:
- return RollbotResponse(message, failure=RollbotFailure.INVALID_ARGUMENTS, debugging={"explain": f"Could not find command {topic}"})
- return RollbotResponse(message, txt=targeted.help_msg())
- plugin = self.commands.get(message.command, None)
- if plugin is None:
- self.logger.warn(f"Message {message.message_id} had a command {message.command} that could not be run.")
- return RollbotResponse(message, failure=RollbotFailure.INVALID_COMMAND)
- with self.session_manager_factory() as session:
- response = plugin.on_command(session, message)
- if not response.is_success:
- self.logger.warn(f"Message {message.message_id} caused failure")
- self.logger.warn(response.info)
- return response
- def handle_command(self, message):
- if not message.is_command:
- self.logger.debug("Ignoring non-command message")
- return
- self.logger.info(f"Handling message {message.message_id}")
- t = time.time()
- try:
- response = self.run_command(message)
- except Exception as e:
- self.logger.exception(f"Exception during command execution for message {message.message_id}")
- response = RollbotResponse(message, failure=RollbotFailure.INTERNAL_ERROR)
- self.last_exception = "".join(traceback.format_exc())
- if not response.respond:
- self.logger.info(f"Skipping response to message {message.message_id}")
- return
- self.logger.info(f"Responding to message {message.message_id}")
- sleep = self.sleep_time - time.time() + t
- if sleep > 0:
- self.logger.info(f"Sleeping for {sleep:.3f}s before responding")
- time.sleep(sleep)
- if response.is_success:
- if response.txt is not None:
- self.post_callback(response.txt, message.group_id)
- if response.img is not None:
- self.post_callback(response.img, message.group_id)
- else:
- self.post_callback(response.failure_msg, message.group_id)
- self.logger.warning(f"Failed command response: {response}")
- t = time.time() - t
- self.logger.info(f"Exiting command thread for {message.message_id} after {t:.3f}s")
- def manually_post_message(self, message_text, group_id):
- self.post_callback(message_text, group_id)
|