app.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. #!/usr/bin/env python3
  2. import tempfile
  3. import os.path
  4. import os
  5. from telnetlib import Telnet
  6. from collections import defaultdict
  7. from datetime import timedelta
  8. import toml
  9. from flask import Flask, jsonify, request, render_template_string
  10. from flask_cors import CORS
  11. IDLE_TIMEOUT = timedelta(minutes=5)
  12. REFRESH_RATE = 60 # seconds
  13. app = Flask(__name__)
  14. CORS(app)
  15. with open(os.environ.get("TS_SECRET_FILE", "secret.toml")) as infile:
  16. cfg = toml.load(infile)
  17. for k in ("host", "port", "user", "pass"):
  18. app.config[k] = os.environ.get(f"TS_SERVER_{k.upper()}", cfg[k])
  19. def parse_ts_response(response):
  20. # entries separated by |'s
  21. entries = response.split("|")
  22. # entries contain key=value pairs separated by spaces
  23. pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries]
  24. # rearrange these into maps for convenience
  25. return [{k: v for k, v in pr} for pr in pairs]
  26. def login(tn, username, password):
  27. login = f"login {username} {password}\n".encode("utf-8")
  28. print("connection")
  29. print(tn.read_until(b"\n").decode("utf-8"))
  30. print(tn.read_until(b"\n").decode("utf-8"))
  31. print("----")
  32. tn.write(login)
  33. print("after login")
  34. print(tn.read_until(b"\n").decode("utf-8"))
  35. print("----")
  36. tn.write(b"use 1 -virtual\n")
  37. print("after use")
  38. print(tn.read_until(b"\n").decode("utf-8"))
  39. print("----")
  40. def get_client_info(tn):
  41. tn.write(b"clientlist\n")
  42. response = tn.read_until(b"\n").decode("utf-8")
  43. print("after clientlist")
  44. print(response)
  45. print(tn.read_until(b"\n").decode("utf-8"))
  46. print("----")
  47. entry_maps = parse_ts_response(response)
  48. # combine the maps into one large map, ignoring serveradmin query user
  49. return {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]}
  50. def post_to_channel(host, port, username, password, message):
  51. message_sanitized = message.strip().replace(" ", r"\s")
  52. with Telnet(host, port, 5) as tn:
  53. login(tn, username, password)
  54. client_info = get_client_info(tn)
  55. audiobot_id = client_info["AudioBot"]["clid"]
  56. tn.write(f"sendtextmessage targetmode=1 target={audiobot_id} msg={message_sanitized}\n".encode("utf-8"))
  57. # tn.write(f"sendtextmessage targetmode=1 target={audiobot_id} msg=!clear\n".encode("utf-8"))
  58. tn.write(b"quit\n")
  59. def query_ts(host, port, username, password):
  60. with Telnet(host, port, 5) as tn:
  61. login(tn, username, password)
  62. tn.write(b"channellist\n")
  63. response = tn.read_until(b"\n").decode("utf-8")
  64. print("after channellist")
  65. print(response)
  66. print(tn.read_until(b"\n").decode("utf-8"))
  67. print("----")
  68. channel_maps = parse_ts_response(response)
  69. # rearrange the maps into one large channel lookup map
  70. channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps}
  71. client_info = get_client_info(tn)
  72. for k, v in client_info.items():
  73. tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8"))
  74. response = tn.read_until(b"\n").decode("utf-8")
  75. print(f"after clientinfo for {k}")
  76. print(response)
  77. print(tn.read_until(b"\n").decode("utf-8"))
  78. print("----")
  79. # info is key=value pairs separated by spaces
  80. pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent]
  81. # rearrange into a map and put in the client_info
  82. v["client_info"] = {k: v for k, v in pairs}
  83. tn.write(b"quit\n")
  84. print("after quit")
  85. print(tn.read_until(b"\n").decode("utf-8"))
  86. return client_info, channels
  87. def get_users():
  88. client_info, channels = query_ts(
  89. app.config["host"],
  90. app.config["port"],
  91. app.config["user"],
  92. app.config["pass"]
  93. )
  94. users = []
  95. channel_users = defaultdict(list)
  96. for name, info in client_info.items():
  97. user_text = name
  98. audio_status = []
  99. if info["client_info"].get("client_input_muted", "0") == "1":
  100. audio_status.append("Mic muted")
  101. if info["client_info"].get("client_output_muted", "0") == "1":
  102. audio_status.append("Sound muted")
  103. if len(audio_status) > 0:
  104. user_text += f" ({', '.join(audio_status)})"
  105. idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0")))
  106. if idle >= IDLE_TIMEOUT:
  107. # strip out the sub-second resolution
  108. idle -= timedelta(microseconds=idle.microseconds)
  109. user_text += f" (Idle for {idle})"
  110. users.append(user_text)
  111. channel_users[channels[info["cid"]]].append(user_text)
  112. return sorted(users), {k: sorted(v) for k, v in channel_users.items()}
  113. @app.route("/")
  114. def get_status():
  115. return jsonify({"users": [u.replace(r"\s", " ") for u in get_users()[0]]})
  116. @app.route("/page")
  117. def get_page():
  118. print(get_users()[0])
  119. return render_template_string("""
  120. <!DOCTYPE html>
  121. <html lang="en">
  122. <head>
  123. <title>Teamspeak Status</title>
  124. <style>
  125. body {
  126. margin: 0;
  127. font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen',
  128. 'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue',
  129. sans-serif;
  130. -webkit-font-smoothing: antialiased;
  131. -moz-osx-font-smoothing: grayscale;
  132. margin: 0px 0px 0px 0px;
  133. padding: 0px 0px 0px 0px;
  134. color: #ddd;
  135. background-color: #111;
  136. }
  137. .users {
  138. list-style-type: none;
  139. padding: 0;
  140. margin: 0;
  141. margin-top: 0.4em;
  142. }
  143. .user {
  144. font-size: 0.6em;
  145. margin-left: 0.8em;
  146. padding-bottom: 0.2em;
  147. }
  148. </style>
  149. </head>
  150. <body>
  151. <ul class="users">
  152. {% for user in users %}
  153. <li class="user">{{ user }}</li>
  154. {% endfor %}
  155. </ul>
  156. </body>
  157. </html>
  158. """, users=[u.replace(r"\s", " ") for u in get_users()[0]])
  159. @app.route("/audiobot", methods=["POST"])
  160. def message_audiobot():
  161. temp_dir = tempfile.mkdtemp()
  162. temp_file = os.path.join(temp_dir, "received.mp3")
  163. with open(temp_file, "wb") as f:
  164. f.write(request.data)
  165. f.close()
  166. post_to_channel(
  167. app.config["host"],
  168. app.config["port"],
  169. app.config["user"],
  170. app.config["pass"],
  171. f"!play {temp_file}"
  172. )
  173. return "", 204
  174. if __name__ == "__main__":
  175. app.run("0.0.0.0", 5000, debug=True, threaded=True)