123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import logging
- import time
- from .messaging import RollbotResponse, RollbotFailure
- from .plugins import as_plugin
- def lift_response(call, response):
- @as_plugin(call)
- def response_func(db, msg):
- return RollbotResponse(msg, txt=response)
- return response_func
- class Rollbot:
- def __init__(self, logger=logging.getLogger(__name__), plugin_classes={}, aliases={}, responses={}, sleep_time=0.0, session_factory=None, callback=None):
- self.logger = logger
- self.session_factory = session_factory or (lambda: None)
- self.post_callback = callback or (lambda txt, gid: self.logger.info(f"Responding to {gid} with {txt}"))
- self.commands = {}
- self.to_start = set()
- self.to_stop = set()
- self.logger.info("Loading command plugins")
- for plugin_class in plugin_classes:
- plugin_instance = plugin_class(self, logger=logger)
- if plugin_instance.command in self.commands:
- self.logger.error(f"Duplicate command word '{plugin_instance.command}'")
- raise ValueError(f"Duplicate command word '{plugin_instance.command}'")
- self.commands[plugin_instance.command] = plugin_instance
- if "on_start" in plugin_class.__dict__:
- self.to_start.add(plugin_instance)
- if "on_shutdown" in plugin_class.__dict__:
- self.to_stop.add(plugin_instance)
- self.logger.info(f"Finished loading plugins, {len(self.commands)} commands found")
- self.logger.info("Loading simple responses")
- for cmd, response in responses.items():
- if cmd in self.commands:
- self.logger.error(f"Duplicate command word '{cmd}'")
- raise ValueError(f"Duplicate command word '{cmd}'")
- self.commands[cmd] = lift_response(cmd, response)(self, logger=logger)
- self.logger.info(f"Finished loading simple responses, {len(self.commands)} total commands available")
- self.logger.info("Loading aliases")
- for alias, cmd in aliases.items():
- if cmd not in self.commands:
- self.logger.error(f"Missing aliased command word '{cmd}'")
- raise ValueError(f"Missing aliased command word '{cmd}'")
- if alias in self.commands:
- self.logger.error(f"Duplicate command word '{alias}'")
- raise ValueError(f"Duplicate command word '{alias}'")
- self.commands[alias] = self.commands[cmd]
- self.logger.info(f"Finished loading aliases, {len(self.commands)} total commands + aliases available")
- self.sleep_time = sleep_time
- def start_plugins(self):
- self.logger.info("Starting plugins")
- with self.session_factory() as session:
- for cmd in self.to_start:
- cmd.on_start(session)
- self.logger.info("Finished starting plugins")
- def shutdown_plugins(self):
- self.logger.info("Shutting down plugins")
- with self.session_factory() as session:
- for cmd in self.to_stop:
- cmd.on_shutdown(session)
- self.logger.info("Finished shutting down plugins")
- def run_command(self, message):
- if not message.is_command:
- self.logger.warn(f"Tried to run non-command message {message.message_id}")
- return RollbotResponse(message, failure=RollbotFailure.INTERNAL_ERROR)
- plugin = self.commands.get(message.command, None)
- if plugin is None:
- self.logger.warn(f"Message {message.message_id} had a command {message.command} that could not be run.")
- return RollbotResponse(message, failure=RollbotFailure.INVALID_COMMAND)
- with self.session_factory() as session:
- response = plugin.on_command(session, message)
- if not response.is_success:
- self.logger.warn(f"Message {message.message_id} caused failure")
- self.logger.warn(response.info)
- return response
- def handle_command(self, message):
- if not message.is_command:
- self.logger.debug("Ignoring non-command message")
- return
- self.logger.info(f"Handling message {message.message_id}")
- t = time.time()
- try:
- response = self.run_command(message)
- except Exception as e:
- self.logger.exception(f"Exception during command execution for message {message.message_id}")
- response = RollbotResponse(message, failure=RollbotFailure.INTERNAL_ERROR)
- if not response.respond:
- self.logger.info(f"Skipping response to message {message.message_id}")
- return
- self.logger.info(f"Responding to message {message.message_id}")
- sleep = self.sleep_time - time.time() + t
- if sleep > 0:
- self.logger.info(f"Sleeping for {sleep:.3f}s before responding")
- time.sleep(sleep)
- if response.is_success:
- if response.txt is not None:
- self.post_callback(response.txt, message.group_id)
- if response.img is not None:
- self.post_callback(response.img, message.group_id)
- else:
- self.post_callback(response.failure_msg, message.group_id)
- self.logger.warning(f"Failed command response: {response}")
- t = time.time() - t
- self.logger.info(f"Exiting command thread for {message.message_id} after {t:.3f}s")
- def manually_post_message(self, message_text, group_id):
- self.post_callback(message_text, group_id)
|