from datetime import datetime import json import traceback import asyncio import logging import os import toml from fastapi import FastAPI, BackgroundTasks, Response from pydantic import BaseModel import rollbot from commands import config logging.config.fileConfig("logging.conf", disable_existing_loggers=False) with open(os.environ.get("SECRET_FILE", "secrets.toml"), "r") as sfile: secrets = toml.load(sfile) database_file = os.environ.get("DATABASE_FILE", secrets["database_file"]) groupme_bots = secrets["groupme"]["bots"] groupme_token = secrets["groupme"]["token"] groupme_admins = secrets["admins"]["origin"] group_admins = secrets["admins"]["channel"] max_msg_len = 1000 split_text = "\n..." msg_slice = max_msg_len - len(split_text) class GroupMeMessage(BaseModel): sender_id: str group_id: str name: str text: str created_at: int attachments: list[dict[str, str]] class GroupMeBot(rollbot.Rollbot[GroupMeMessage]): def __init__(self): super().__init__(config.extend(rollbot.CommandConfiguration(bangs=("!",))), database_file) def read_config(self, key): cfg = secrets for part in key.split("."): cfg = cfg.get(part, None) if cfg is None: return None return cfg def parse(self, msg: GroupMeMessage): return rollbot.Message( origin_id="GROUPME", channel_id=msg.group_id, sender_id=msg.sender_id, timestamp=datetime.fromtimestamp(msg.created_at), origin_admin=msg.sender_id in groupme_admins, channel_admin=msg.sender_id in group_admins.get(msg.group_id, ()), sender_name=msg.name, text=msg.text, attachments=[rollbot.Attachment(d["type"], json.dumps(d)) for d in msg.attachments], ) async def upload_image(self, data: bytes): async with self.context.request.post( "https://image.groupme.com/pictures", headers={ "Content-Type": "image/png", "X-Access-Token": groupme_token, }, data=data, ) as upload: upload.raise_for_status() return (await upload.json())["payload"]["url"] async def post_message(self, bot_id: str, text: str, attachments: list[dict[str, str]]): await self.context.request.post( "https://api.groupme.com/v3/bots/post", json={ "bot_id": bot_id, "text": text, "attachments": attachments, }, timeout=10, ) async def respond(self, res: rollbot.Response): if res.origin_id != "GROUPME": self.context.logger.error(f"Unable to respond to {res.origin_id}") return bot_id = groupme_bots.get(res.channel_id, None) if bot_id is None: self.context.logger.error(f"Unable to respond to group {res.channel_id} in GroupMe") return message = "" attachments = [] try: if res.attachments is not None: for att in res.attachments: if att.name == "image": if isinstance(att.body, bytes): attachments.append( { "type": "image", "url": await self.upload_image(att.body), } ) else: attachments.append({"type": "image", "url": att.body}) except: self.context.debugging = "".join(traceback.format_exc()) message += "Failed to upload one or more attachments!\n" self.context.logger.exception("Failed to upload attachment") if res.text is not None: message += res.text msgs = [] while len(message) > max_msg_len: msgs.append(message[:msg_slice] + split_text) message = message[msg_slice:] msgs.append(message) await self.post_message(bot_id, msgs[0], attachments) for m in msgs[1:]: await asyncio.sleep(0.1) await self.post_message(bot_id, m, []) app = FastAPI() bot = GroupMeBot() @app.on_event("startup") async def startup(): await bot.on_startup() @app.on_event("shutdown") async def shutdown(): await bot.on_shutdown() @app.post("/", status_code=204) async def receive(message: GroupMeMessage, tasks: BackgroundTasks): tasks.add_task(bot.on_message, message) return Response(status_code=204) @app.get("/health", status_code=204) def health(): return Response(status_code=204)