#!/usr/bin/env python3 import tempfile import os.path from telnetlib import Telnet from collections import defaultdict from datetime import timedelta import toml from flask import Flask, jsonify, render_template, request from flask_cors import CORS IDLE_TIMEOUT = timedelta(minutes=5) REFRESH_RATE = 60 # seconds app = Flask(__name__) CORS(app) with open("secret.toml") as infile: cfg = toml.load(infile) for k in ("host", "port", "user", "pass"): app.config[k] = cfg[k] def parse_ts_response(response): # entries separated by |'s entries = response.split("|") # entries contain key=value pairs separated by spaces pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries] # rearrange these into maps for convenience return [{k: v for k, v in pr} for pr in pairs] def login(tn, username, password): login = f"login {username} {password}\n".encode("utf-8") print("connection") print(tn.read_until(b"\n").decode("utf-8")) print(tn.read_until(b"\n").decode("utf-8")) print("----") tn.write(login) print("after login") print(tn.read_until(b"\n").decode("utf-8")) print("----") tn.write(b"use 1 -virtual\n") print("after use") print(tn.read_until(b"\n").decode("utf-8")) print("----") def get_client_info(tn): tn.write(b"clientlist\n") response = tn.read_until(b"\n").decode("utf-8") print("after clientlist") print(response) print(tn.read_until(b"\n").decode("utf-8")) print("----") entry_maps = parse_ts_response(response) # combine the maps into one large map, ignoring serveradmin query user return {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]} def post_to_channel(host, port, username, password, message): message_sanitized = message.strip().replace(" ", r"\s") with Telnet(host, port, 5) as tn: login(tn, username, password) client_info = get_client_info(tn) audiobot_id = client_info["AudioBot"]["clid"] tn.write(f"sendtextmessage targetmode=1 target={audiobot_id} msg={message_sanitized}\n".encode("utf-8")) # tn.write(f"sendtextmessage targetmode=1 target={audiobot_id} msg=!clear\n".encode("utf-8")) tn.write(b"quit\n") def query_ts(host, port, username, password): with Telnet(host, port, 5) as tn: login(tn, username, password) tn.write(b"channellist\n") response = tn.read_until(b"\n").decode("utf-8") print("after channellist") print(response) print(tn.read_until(b"\n").decode("utf-8")) print("----") channel_maps = parse_ts_response(response) # rearrange the maps into one large channel lookup map channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps} client_info = get_client_info(tn) for k, v in client_info.items(): tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8")) response = tn.read_until(b"\n").decode("utf-8") print(f"after clientinfo for {k}") print(response) print(tn.read_until(b"\n").decode("utf-8")) print("----") # info is key=value pairs separated by spaces pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent] # rearrange into a map and put in the client_info v["client_info"] = {k: v for k, v in pairs} tn.write(b"quit\n") print("after quit") print(tn.read_until(b"\n").decode("utf-8")) return client_info, channels def get_users(): client_info, channels = query_ts( app.config["host"], app.config["port"], app.config["user"], app.config["pass"] ) users = [] channel_users = defaultdict(list) for name, info in client_info.items(): user_text = name audio_status = [] if info["client_info"].get("client_input_muted", "0") == "1": audio_status.append("Mic muted") if info["client_info"].get("client_output_muted", "0") == "1": audio_status.append("Sound muted") if len(audio_status) > 0: user_text += f" ({', '.join(audio_status)})" idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0"))) if idle >= IDLE_TIMEOUT: # strip out the sub-second resolution idle -= timedelta(microseconds=idle.microseconds) user_text += f" (Idle for {idle})" users.append(user_text) channel_users[channels[info["cid"]]].append(user_text) return sorted(users), {k: sorted(v) for k, v in channel_users.items()} @app.route("/") def get_status(): return jsonify({"users": get_users()[0]}) @app.route("/page") def get_status_page(): return render_template("embedded_body.html", users=get_users()[1], refresh_rate=REFRESH_RATE) @app.route("/audiobot", methods=["POST"]) def message_audiobot(): temp_dir = tempfile.mkdtemp() temp_file = os.path.join(temp_dir, "received.mp3") with open(temp_file, "wb") as f: f.write(request.data) f.close() post_to_channel( app.config["host"], app.config["port"], app.config["user"], app.config["pass"], f"!play {temp_file}" ) return "", 204 if __name__ == "__main__": app.run("0.0.0.0", 5000, debug=True, threaded=True)