123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- #!/usr/bin/env python3
- import tempfile
- import os.path
- from telnetlib import Telnet
- from collections import defaultdict
- from datetime import timedelta
- import toml
- from flask import Flask, jsonify, render_template, request
- from flask_cors import CORS
- IDLE_TIMEOUT = timedelta(minutes=5)
- REFRESH_RATE = 60 # seconds
- app = Flask(__name__)
- CORS(app)
- with open("secret.toml") as infile:
- cfg = toml.load(infile)
- for k in ("host", "port", "user", "pass"):
- app.config[k] = cfg[k]
- def parse_ts_response(response):
- # entries separated by |'s
- entries = response.split("|")
- # entries contain key=value pairs separated by spaces
- pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries]
- # rearrange these into maps for convenience
- return [{k: v for k, v in pr} for pr in pairs]
- def login(tn, username, password):
- login = f"login {username} {password}\n".encode("utf-8")
- print("connection")
- print(tn.read_until(b"\n").decode("utf-8"))
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- tn.write(login)
- print("after login")
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- tn.write(b"use 1 -virtual\n")
- print("after use")
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- def post_to_channel(host, port, username, password, message):
- message_santized = message.strip().replace(" ", r"\s")
- message_command = f"sendtextmessage targetmode=2 msg={message_santized}\n".encode("utf-8")
- print(message_command.decode("utf-8"))
- with Telnet(host, port, 5) as tn:
- login(tn, username, password)
- tn.write(message_command)
- tn.write(b"quit\n")
- def query_ts(host, port, username, password):
- with Telnet(host, port, 5) as tn:
- login(tn, username, password)
- tn.write(b"channellist\n")
- response = tn.read_until(b"\n").decode("utf-8")
- print("after channellist")
- print(response)
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- channel_maps = parse_ts_response(response)
- # rearrange the maps into one large channel lookup map
- channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps}
- tn.write(b"clientlist\n")
- response = tn.read_until(b"\n").decode("utf-8")
- print("after clientlist")
- print(response)
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- entry_maps = parse_ts_response(response)
- # combine the maps into one large map, ignoring serveradmin query user
- client_info = {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]}
- for k, v in client_info.items():
- tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8"))
- response = tn.read_until(b"\n").decode("utf-8")
- print(f"after clientinfo for {k}")
- print(response)
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- # info is key=value pairs separated by spaces
- pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent]
- # rearrange into a map and put in the client_info
- v["client_info"] = {k: v for k, v in pairs}
- tn.write(b"quit\n")
- print("after quit")
- print(tn.read_until(b"\n").decode("utf-8"))
- return client_info, channels
- def get_users():
- client_info, channels = query_ts(
- app.config["host"],
- app.config["port"],
- app.config["user"],
- app.config["pass"]
- )
- users = []
- channel_users = defaultdict(list)
- for name, info in client_info.items():
- user_text = name
- audio_status = []
- if info["client_info"].get("client_input_muted", "0") == "1":
- audio_status.append("Mic muted")
- if info["client_info"].get("client_output_muted", "0") == "1":
- audio_status.append("Sound muted")
- if len(audio_status) > 0:
- user_text += f" ({', '.join(audio_status)})"
- idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0")))
- if idle >= IDLE_TIMEOUT:
- # strip out the sub-second resolution
- idle -= timedelta(microseconds=idle.microseconds)
- user_text += f" (Idle for {idle})"
- users.append(user_text)
- channel_users[channels[info["cid"]]].append(user_text)
- return sorted(users), {k: sorted(v) for k, v in channel_users.items()}
- @app.route("/")
- def get_status():
- return jsonify({"users": get_users()[0]})
- @app.route("/page")
- def get_status_page():
- return render_template("embedded_body.html", users=get_users()[1], refresh_rate=REFRESH_RATE)
- @app.route("/audiobot", methods=["POST"])
- def message_audiobot():
- temp_dir = tempfile.mkdtemp()
- temp_file = os.path.join(temp_dir, "received.mp3")
- with open(temp_file, "wb") as f:
- f.write(request.data)
- f.close()
- post_to_channel(
- app.config["host"],
- app.config["port"],
- app.config["user"],
- app.config["pass"],
- f"!play {temp_file}"
- )
- return "", 204
- if __name__ == "__main__":
- app.run("0.0.0.0", 5000, debug=True, threaded=True)
|