import logging import time from .messaging import RollbotResponse, RollbotFailure from .plugins import as_plugin def lift_response(call, response): @as_plugin(call) def response_func(db, msg): return RollbotResponse(msg, txt=response) return response_func class Rollbot: def __init__(self, logger=logging.getLogger(__name__), plugin_classes={}, aliases={}, responses={}, sleep_time=0.0, session_factory=None, callback=None): self.logger = logger self.session_factory = session_factory or (lambda: None) self.post_callback = callback or (lambda txt, gid: self.logger.info(f"Responding to {gid} with {txt}")) self.commands = {} self.to_start = set() self.to_stop = set() self.logger.info("Loading command plugins") for plugin_class in plugin_classes: plugin_instance = plugin_class(self, logger=logger) if plugin_instance.command in self.commands: self.logger.error(f"Duplicate command word '{plugin_instance.command}'") raise ValueError(f"Duplicate command word '{plugin_instance.command}'") self.commands[plugin_instance.command] = plugin_instance if "on_start" in plugin_class.__dict__: self.to_start.add(plugin_instance) if "on_shutdown" in plugin_class.__dict__: self.to_stop.add(plugin_instance) self.logger.info(f"Finished loading plugins, {len(self.commands)} commands found") self.logger.info("Loading simple responses") for cmd, response in responses.items(): if cmd in self.commands: self.logger.error(f"Duplicate command word '{cmd}'") raise ValueError(f"Duplicate command word '{cmd}'") self.commands[cmd] = lift_response(cmd, response)(self, logger=logger) self.logger.info(f"Finished loading simple responses, {len(self.commands)} total commands available") self.logger.info("Loading aliases") for alias, cmd in aliases.items(): if cmd not in self.commands: self.logger.error(f"Missing aliased command word '{cmd}'") raise ValueError(f"Missing aliased command word '{cmd}'") if alias in self.commands: self.logger.error(f"Duplicate command word '{alias}'") raise ValueError(f"Duplicate command word '{alias}'") self.commands[alias] = self.commands[cmd] self.logger.info(f"Finished loading aliases, {len(self.commands)} total commands + aliases available") self.sleep_time = sleep_time def start_plugins(self): self.logger.info("Starting plugins") with self.session_factory() as session: for cmd in self.to_start: cmd.on_start(session) self.logger.info("Finished starting plugins") def shutdown_plugins(self): self.logger.info("Shutting down plugins") with self.session_factory() as session: for cmd in self.to_stop: cmd.on_shutdown(session) self.logger.info("Finished shutting down plugins") def run_command(self, message): if not message.is_command: self.logger.warn(f"Tried to run non-command message {message.message_id}") return RollbotResponse(message, failure=RollbotFailure.INTERNAL_ERROR) plugin = self.commands.get(message.command, None) if plugin is None: self.logger.warn(f"Message {message.message_id} had a command {message.command} that could not be run.") return RollbotResponse(message, failure=RollbotFailure.INVALID_COMMAND) with self.session_factory() as session: response = plugin.on_command(session, message) if not response.is_success: self.logger.warn(f"Message {message.message_id} caused failure") self.logger.warn(response.info) return response def handle_command(self, message): if not message.is_command: self.logger.debug("Ignoring non-command message") return self.logger.info(f"Handling message {message.message_id}") t = time.time() try: response = self.run_command(message) except Exception as e: self.logger.exception(f"Exception during command execution for message {message.message_id}") response = RollbotResponse(message, failure=RollbotFailure.INTERNAL_ERROR) if not response.respond: self.logger.info(f"Skipping response to message {message.message_id}") return self.logger.info(f"Responding to message {message.message_id}") sleep = self.sleep_time - time.time() + t if sleep > 0: self.logger.info(f"Sleeping for {sleep:.3f}s before responding") time.sleep(sleep) if response.is_success: if response.txt is not None: self.post_callback(response.txt, message.group_id) if response.img is not None: self.post_callback(response.img, message.group_id) else: self.post_callback(response.failure_msg, message.group_id) self.logger.warning(f"Failed command response: {response}") t = time.time() - t self.logger.info(f"Exiting command thread for {message.message_id} after {t:.3f}s") def manually_post_message(self, message_text, group_id): self.post_callback(message_text, group_id)