123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- #!/usr/bin/env python3
- import tempfile
- import os.path
- import os
- from telnetlib import Telnet
- from collections import defaultdict
- from datetime import timedelta
- import toml
- from flask import Flask, jsonify, request, render_template_string
- from flask_cors import CORS
- IDLE_TIMEOUT = timedelta(minutes=5)
- REFRESH_RATE = 60 # seconds
- app = Flask(__name__)
- CORS(app)
- with open(os.environ.get("TS_SECRET_FILE", "secret.toml")) as infile:
- cfg = toml.load(infile)
- for k in ("host", "port", "user", "pass"):
- app.config[k] = os.environ.get(f"TS_SERVER_{k.upper()}", cfg[k])
- def parse_ts_response(response):
- # entries separated by |'s
- entries = response.split("|")
- # entries contain key=value pairs separated by spaces
- pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries]
- # rearrange these into maps for convenience
- return [{k: v for k, v in pr} for pr in pairs]
- def login(tn, username, password):
- login = f"login {username} {password}\n".encode("utf-8")
- print("connection")
- print(tn.read_until(b"\n").decode("utf-8"))
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- tn.write(login)
- print("after login")
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- tn.write(b"use 1 -virtual\n")
- print("after use")
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- def get_client_info(tn):
- tn.write(b"clientlist\n")
- response = tn.read_until(b"\n").decode("utf-8")
- print("after clientlist")
- print(response)
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- entry_maps = parse_ts_response(response)
- # combine the maps into one large map, ignoring serveradmin query user
- return {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]}
- def post_to_channel(host, port, username, password, message):
- message_sanitized = message.strip().replace(" ", r"\s")
- with Telnet(host, port, 5) as tn:
- login(tn, username, password)
- client_info = get_client_info(tn)
- audiobot_id = client_info["AudioBot"]["clid"]
- tn.write(f"sendtextmessage targetmode=1 target={audiobot_id} msg={message_sanitized}\n".encode("utf-8"))
- # tn.write(f"sendtextmessage targetmode=1 target={audiobot_id} msg=!clear\n".encode("utf-8"))
- tn.write(b"quit\n")
- def query_ts(host, port, username, password):
- with Telnet(host, port, 5) as tn:
- login(tn, username, password)
- tn.write(b"channellist\n")
- response = tn.read_until(b"\n").decode("utf-8")
- print("after channellist")
- print(response)
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- channel_maps = parse_ts_response(response)
- # rearrange the maps into one large channel lookup map
- channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps}
- client_info = get_client_info(tn)
- for k, v in client_info.items():
- tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8"))
- response = tn.read_until(b"\n").decode("utf-8")
- print(f"after clientinfo for {k}")
- print(response)
- print(tn.read_until(b"\n").decode("utf-8"))
- print("----")
- # info is key=value pairs separated by spaces
- pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent]
- # rearrange into a map and put in the client_info
- v["client_info"] = {k: v for k, v in pairs}
- tn.write(b"quit\n")
- print("after quit")
- print(tn.read_until(b"\n").decode("utf-8"))
- return client_info, channels
- def get_users():
- client_info, channels = query_ts(
- app.config["host"],
- app.config["port"],
- app.config["user"],
- app.config["pass"]
- )
- users = []
- channel_users = defaultdict(list)
- for name, info in client_info.items():
- user_text = name
- audio_status = []
- if info["client_info"].get("client_input_muted", "0") == "1":
- audio_status.append("Mic muted")
- if info["client_info"].get("client_output_muted", "0") == "1":
- audio_status.append("Sound muted")
- if len(audio_status) > 0:
- user_text += f" ({', '.join(audio_status)})"
- idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0")))
- if idle >= IDLE_TIMEOUT:
- # strip out the sub-second resolution
- idle -= timedelta(microseconds=idle.microseconds)
- user_text += f" (Idle for {idle})"
- users.append(user_text)
- channel_users[channels[info["cid"]]].append(user_text)
- return sorted(users), {k: sorted(v) for k, v in channel_users.items()}
- @app.route("/")
- def get_status():
- return jsonify({"users": [u.replace(r"\s", " ") for u in get_users()[0]]})
- @app.route("/page")
- def get_page():
- return render_template_string("""
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <title>Teamspeak Status</title>
- <style>
- body {
- margin: 0;
- font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen',
- 'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue',
- sans-serif;
- -webkit-font-smoothing: antialiased;
- -moz-osx-font-smoothing: grayscale;
- margin: 0px 0px 0px 0px;
- padding: 0px 0px 0px 0px;
- color: #ddd;
- background-color: #111;
- }
- .page {
- display: flex;
- flex-flow: row nowrap;
- justify-content: flex-start;
- align-items: flex-start;
- }
- .refresh {
- width: 25px;
- height: 25px;
- padding-top: 4px;
- padding-left: 4px;
- }
- .users {
- list-style-type: none;
- padding: 0;
- margin: 0;
- margin-top: 0.4em;
- }
- .user {
- font-size: 0.6em;
- margin-left: 0.8em;
- padding-bottom: 0.2em;
- }
- </style>
- </head>
- <body>
- <div class="page">
- <a href="https://hiram.services/teamspeak/clean" class="refresh">
- <svg viewbox="0 0 16 16" width="25" height="25">
- <g
- inkscape:label="Layer 1"
- inkscape:groupmode="layer"
- id="layer1"
- transform="translate(0,-1036.3622)">
- <path
- style="opacity:1;fill:#ffffff;fill-opacity:1;stroke:none;stroke-width:0.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
- d="M 8.0878906 2 A 6 6 0 0 0 3.7578125 3.7578125 A 6 6 0 0 0 3.7578125 12.242188 L 3.171875 12.828125 L 6 12.828125 L 6 10 L 5.28125 10.71875 A 3.84375 3.84375 0 0 1 5.28125 5.28125 A 3.84375 3.84375 0 0 1 9.8222656 4.6171875 L 9.8222656 3.171875 A 0.17817269 0.17817269 0 0 1 9.9824219 2.9941406 A 0.17817269 0.17817269 0 0 1 10 2.9941406 L 11.304688 2.9941406 A 6 6 0 0 0 8.0878906 2 z "
- transform="translate(0,1036.3622)"
- id="path8586" />
- <use
- x="0"
- y="0"
- xlink:href="#path8586"
- id="use8607"
- inkscape:transform-center-x="-1.3423979"
- inkscape:transform-center-y="0.58552377"
- width="100%"
- height="100%"
- preserveAspectRatio="none"
- transform="matrix(-1,0,0,-1,15.99039,2088.723)" />
- </g>
- </svg>
- </a>
- <ul class="users">
- {% for user in users %}
- <li class="user">{{ user }}</li>
- {% endfor %}
- </ul>
- </div>
- </body>
- </html>
- """, users=[u.replace(r"\s", " ") for u in get_users()[0]])
- @app.route("/audiobot", methods=["POST"])
- def message_audiobot():
- temp_dir = tempfile.mkdtemp()
- temp_file = os.path.join(temp_dir, "received.mp3")
- with open(temp_file, "wb") as f:
- f.write(request.data)
- f.close()
- post_to_channel(
- app.config["host"],
- app.config["port"],
- app.config["user"],
- app.config["pass"],
- f"!play {temp_file}"
- )
- return "", 204
- if __name__ == "__main__":
- app.run("0.0.0.0", 5000, debug=True, threaded=True)
|