app.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #!/usr/bin/env python3
  2. from telnetlib import Telnet
  3. from collections import defaultdict
  4. from datetime import timedelta
  5. import toml
  6. from flask import Flask, jsonify, render_template
  7. from flask_cors import CORS
  8. IDLE_TIMEOUT = timedelta(minutes=5)
  9. REFRESH_RATE = 60 # seconds
  10. app = Flask(__name__)
  11. CORS(app)
  12. def parse_ts_response(response):
  13. # entries separated by |'s
  14. entries = response.split("|")
  15. # entries contain key=value pairs separated by spaces
  16. pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries]
  17. # rearrange these into maps for convenience
  18. return [{k: v for k, v in pr} for pr in pairs]
  19. def query_ts(host, port, username, password):
  20. login = f"login {username} {password}\n".encode("utf-8")
  21. with Telnet(host, port, 5) as tn:
  22. print("connection")
  23. print(tn.read_until(b"\n").decode("utf-8"))
  24. print(tn.read_until(b"\n").decode("utf-8"))
  25. print("----")
  26. tn.write(login)
  27. print("after login")
  28. print(tn.read_until(b"\n").decode("utf-8"))
  29. print("----")
  30. tn.write(b"use 1 -virtual\n")
  31. print("after use")
  32. print(tn.read_until(b"\n").decode("utf-8"))
  33. print("----")
  34. tn.write(b"channellist\n")
  35. response = tn.read_until(b"\n").decode("utf-8")
  36. print("after channellist")
  37. print(response)
  38. print(tn.read_until(b"\n").decode("utf-8"))
  39. print("----")
  40. channel_maps = parse_ts_response(response)
  41. # rearrange the maps into one large channel lookup map
  42. channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps}
  43. tn.write(b"clientlist\n")
  44. response = tn.read_until(b"\n").decode("utf-8")
  45. print("after clientlist")
  46. print(response)
  47. print(tn.read_until(b"\n").decode("utf-8"))
  48. print("----")
  49. entry_maps = parse_ts_response(response)
  50. # combine the maps into one large map, ignoring serveradmin query user
  51. client_info = {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]}
  52. for k, v in client_info.items():
  53. tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8"))
  54. response = tn.read_until(b"\n").decode("utf-8")
  55. print(f"after clientinfo for {k}")
  56. print(response)
  57. print(tn.read_until(b"\n").decode("utf-8"))
  58. print("----")
  59. # info is key=value pairs separated by spaces
  60. pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent]
  61. # rearrange into a map and put in the client_info
  62. v["client_info"] = {k: v for k, v in pairs}
  63. tn.write(b"quit\n")
  64. print("after quit")
  65. print(tn.read_until(b"\n").decode("utf-8"))
  66. return client_info, channels
  67. def get_users():
  68. with open("secret.toml") as infile:
  69. cfg = toml.load(infile)
  70. client_info, channels = query_ts(cfg["host"], cfg["port"], cfg["user"], cfg["pass"])
  71. users = []
  72. channel_users = defaultdict(list)
  73. for name, info in client_info.items():
  74. user_text = name
  75. audio_status = []
  76. if info["client_info"].get("client_input_muted", "0") == "1":
  77. audio_status.append("Mic muted")
  78. if info["client_info"].get("client_output_muted", "0") == "1":
  79. audio_status.append("Sound muted")
  80. if len(audio_status) > 0:
  81. user_text += f" ({', '.join(audio_status)})"
  82. idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0")))
  83. if idle >= IDLE_TIMEOUT:
  84. # strip out the sub-second resolution
  85. idle -= timedelta(microseconds=idle.microseconds)
  86. user_text += f" (Idle for {idle})"
  87. users.append(user_text)
  88. channel_users[channels[info["cid"]]].append(user_text)
  89. return sorted(users), {k: sorted(v) for k, v in channel_users.items()}
  90. @app.route("/")
  91. def get_status():
  92. return jsonify({"users": get_users()[0]})
  93. @app.route("/page")
  94. def get_status_page():
  95. return render_template("embedded_body.html", users=get_users()[1], refresh_rate=REFRESH_RATE)
  96. if __name__ == "__main__":
  97. app.run("0.0.0.0", 5000, debug=True, threaded=True)