#!/usr/bin/env python3 import tempfile import os.path import os from telnetlib import Telnet from collections import defaultdict from datetime import timedelta, datetime import toml from flask import Flask, jsonify, request, render_template_string from flask_cors import CORS IDLE_TIMEOUT = timedelta(minutes=5) REFRESH_RATE = 60 # seconds app = Flask(__name__) CORS(app) with open(os.environ.get("TS_SECRET_FILE", "secret.toml")) as infile: cfg = toml.load(infile) for k in ("host", "port", "user", "pass"): app.config[k] = os.environ.get(f"TS_SERVER_{k.upper()}", cfg[k]) def parse_ts_response(response): # entries separated by |'s entries = response.split("|") # entries contain key=value pairs separated by spaces pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries] # rearrange these into maps for convenience return [{k: v for k, v in pr} for pr in pairs] def login(tn, username, password): login = f"login {username} {password}\n".encode("utf-8") print("connection") print(tn.read_until(b"\n").decode("utf-8")) print(tn.read_until(b"\n").decode("utf-8")) print("----") tn.write(login) print("after login") print(tn.read_until(b"\n").decode("utf-8")) print("----") tn.write(b"use 1 -virtual\n") print("after use") print(tn.read_until(b"\n").decode("utf-8")) print("----") def get_client_info(tn): tn.write(b"clientlist\n") response = tn.read_until(b"\n").decode("utf-8") print("after clientlist") print(response) print(tn.read_until(b"\n").decode("utf-8")) print("----") entry_maps = parse_ts_response(response) # combine the maps into one large map, ignoring serveradmin query user return {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]} def post_to_channel(host, port, username, password, message): message_sanitized = message.strip().replace(" ", r"\s") with Telnet(host, port, 5) as tn: login(tn, username, password) client_info = get_client_info(tn) audiobot_id = client_info["AudioBot"]["clid"] tn.write(f"sendtextmessage targetmode=1 target={audiobot_id} msg={message_sanitized}\n".encode("utf-8")) # tn.write(f"sendtextmessage targetmode=1 target={audiobot_id} msg=!clear\n".encode("utf-8")) tn.write(b"quit\n") def query_ts(host, port, username, password): with Telnet(host, port, 5) as tn: login(tn, username, password) tn.write(b"channellist\n") response = tn.read_until(b"\n").decode("utf-8") print("after channellist") print(response) print(tn.read_until(b"\n").decode("utf-8")) print("----") channel_maps = parse_ts_response(response) # rearrange the maps into one large channel lookup map channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps} client_info = get_client_info(tn) for k, v in client_info.items(): tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8")) response = tn.read_until(b"\n").decode("utf-8") print(f"after clientinfo for {k}") print(response) print(tn.read_until(b"\n").decode("utf-8")) print("----") # info is key=value pairs separated by spaces pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent] # rearrange into a map and put in the client_info v["client_info"] = {k: v for k, v in pairs} tn.write(b"quit\n") print("after quit") print(tn.read_until(b"\n").decode("utf-8")) return client_info, channels def get_users(): client_info, channels = query_ts( app.config["host"], app.config["port"], app.config["user"], app.config["pass"] ) users = [] channel_users = defaultdict(list) for name, info in client_info.items(): user_text = name audio_status = [] if info["client_info"].get("client_input_muted", "0") == "1": audio_status.append("Mic muted") if info["client_info"].get("client_output_muted", "0") == "1": audio_status.append("Sound muted") if len(audio_status) > 0: user_text += f" ({', '.join(audio_status)})" idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0"))) if idle >= IDLE_TIMEOUT: # strip out the sub-second resolution idle -= timedelta(microseconds=idle.microseconds) user_text += f" (Idle for {idle})" users.append(user_text) channel_users[channels[info["cid"]]].append(user_text) return sorted(users), {k: sorted(v) for k, v in channel_users.items()} @app.route("/") def get_status(): return jsonify({"users": [u.replace(r"\s", " ") for u in get_users()[0]]}) def default_to(hex_in, hex_def): if hex_in is None or len(hex_in) > 6: return hex_def try: int(hex_in, 16) return hex_in except ValueError: return hex_def @app.route("/page") def get_page(): bg = default_to(request.args.get("bg", None), "111") tc = default_to(request.args.get("tc", None), "ccc") timestamp = "" if request.args.get("ts", None) != "1" else f"Updated at {datetime.now().strftime('%I:%M %p')}" return render_template_string( """