app.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. #!/usr/bin/env python3
  2. from telnetlib import Telnet
  3. from collections import defaultdict
  4. from datetime import timedelta
  5. import toml
  6. from flask import Flask, jsonify, render_template
  7. IDLE_TIMEOUT = timedelta(minutes=5)
  8. REFRESH_RATE = 60 # seconds
  9. app = Flask(__name__)
  10. def parse_ts_response(response):
  11. # entries separated by |'s
  12. entries = response.split("|")
  13. # entries contain key=value pairs separated by spaces
  14. pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries]
  15. # rearrange these into maps for convenience
  16. return [{k: v for k, v in pr} for pr in pairs]
  17. def query_ts(host, port, username, password):
  18. login = f"login {username} {password}\n".encode("utf-8")
  19. with Telnet(host, port, 5) as tn:
  20. print("connection")
  21. print(tn.read_until(b"\n").decode("utf-8"))
  22. print(tn.read_until(b"\n").decode("utf-8"))
  23. print("----")
  24. tn.write(login)
  25. print("after login")
  26. print(tn.read_until(b"\n").decode("utf-8"))
  27. print("----")
  28. tn.write(b"use 1 -virtual\n")
  29. print("after use")
  30. print(tn.read_until(b"\n").decode("utf-8"))
  31. print("----")
  32. tn.write(b"channellist\n")
  33. response = tn.read_until(b"\n").decode("utf-8")
  34. print("after channellist")
  35. print(response)
  36. print(tn.read_until(b"\n").decode("utf-8"))
  37. print("----")
  38. channel_maps = parse_ts_response(response)
  39. # rearrange the maps into one large channel lookup map
  40. channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps}
  41. tn.write(b"clientlist\n")
  42. response = tn.read_until(b"\n").decode("utf-8")
  43. print("after clientlist")
  44. print(response)
  45. print(tn.read_until(b"\n").decode("utf-8"))
  46. print("----")
  47. entry_maps = parse_ts_response(response)
  48. # combine the maps into one large map, ignoring serveradmin query user
  49. client_info = {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]}
  50. for k, v in client_info.items():
  51. tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8"))
  52. response = tn.read_until(b"\n").decode("utf-8")
  53. print(f"after clientinfo for {k}")
  54. print(response)
  55. print(tn.read_until(b"\n").decode("utf-8"))
  56. print("----")
  57. # info is key=value pairs separated by spaces
  58. pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent]
  59. # rearrange into a map and put in the client_info
  60. v["client_info"] = {k: v for k, v in pairs}
  61. tn.write(b"quit\n")
  62. print("after quit")
  63. print(tn.read_until(b"\n").decode("utf-8"))
  64. return client_info, channels
  65. def get_users():
  66. with open("secret.toml") as infile:
  67. cfg = toml.load(infile)
  68. client_info, channels = query_ts(cfg["host"], cfg["port"], cfg["user"], cfg["pass"])
  69. users = []
  70. channel_users = defaultdict(list)
  71. for name, info in client_info.items():
  72. user_text = name
  73. audio_status = []
  74. if info["client_info"].get("client_input_muted", "0") == "1":
  75. audio_status.append("Mic muted")
  76. if info["client_info"].get("client_output_muted", "0") == "1":
  77. audio_status.append("Sound muted")
  78. if len(audio_status) > 0:
  79. user_text += f" ({', '.join(audio_status)})"
  80. idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0")))
  81. if idle >= IDLE_TIMEOUT:
  82. # strip out the sub-second resolution
  83. idle -= timedelta(microseconds=idle.microseconds)
  84. user_text += f" (Idle for {idle})"
  85. users.append(user_text)
  86. channel_users[channels[info["cid"]]].append(user_text)
  87. return sorted(users), {k: sorted(v) for k, v in channel_users.items()}
  88. @app.route("/")
  89. def get_status():
  90. return jsonify({"users": get_users()[0]})
  91. @app.route("/page")
  92. def get_status_page():
  93. return render_template("main_page.html")
  94. @app.route("/page_body")
  95. def get_page_body():
  96. return render_template("embedded_body.html", users=get_users()[1], refresh_rate=REFRESH_RATE)
  97. if __name__ == "__main__":
  98. app.run("0.0.0.0", 5000, debug=True, threaded=True)