app.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. #!/usr/bin/env python3
  2. from telnetlib import Telnet
  3. from collections import defaultdict
  4. from datetime import timedelta
  5. import toml
  6. from flask import Flask, jsonify, render_template
  7. from flask_cors import CORS
  8. IDLE_TIMEOUT = timedelta(minutes=5)
  9. REFRESH_RATE = 60 # seconds
  10. app = Flask(__name__)
  11. CORS(app)
  12. def parse_ts_response(response):
  13. # entries separated by |'s
  14. entries = response.split("|")
  15. # entries contain key=value pairs separated by spaces
  16. pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries]
  17. # rearrange these into maps for convenience
  18. return [{k: v for k, v in pr} for pr in pairs]
  19. def login(tn, username, password):
  20. login = f"login {username} {password}\n".encode("utf-8")
  21. print("connection")
  22. print(tn.read_until(b"\n").decode("utf-8"))
  23. print(tn.read_until(b"\n").decode("utf-8"))
  24. print("----")
  25. tn.write(login)
  26. print("after login")
  27. print(tn.read_until(b"\n").decode("utf-8"))
  28. print("----")
  29. tn.write(b"use 1 -virtual\n")
  30. print("after use")
  31. print(tn.read_until(b"\n").decode("utf-8"))
  32. print("----")
  33. def post_to_channel(host, port, username, password, message):
  34. message_santized = message.strip().replace(" ", r"\s")
  35. message_command = f"sendtextmessage targetmode=2 msg={message_santized}\n".encode("utf-8")
  36. print(message_command.decode("utf-8"))
  37. with Telnet(host, port, 5) as tn:
  38. login(tn, username, password)
  39. tn.write(message_command)
  40. response = tn.read_until(b"\n").decode("utf-8")
  41. print("after sendtextmessage")
  42. print(response)
  43. print(tn.read_until(b"\n").decode("utf-8"))
  44. print("----")
  45. tn.write(b"quit\n")
  46. print("after quit")
  47. print(tn.read_until(b"\n").decode("utf-8"))
  48. def query_ts(host, port, username, password):
  49. with Telnet(host, port, 5) as tn:
  50. login(tn, username, password)
  51. tn.write(b"channellist\n")
  52. response = tn.read_until(b"\n").decode("utf-8")
  53. print("after channellist")
  54. print(response)
  55. print(tn.read_until(b"\n").decode("utf-8"))
  56. print("----")
  57. channel_maps = parse_ts_response(response)
  58. # rearrange the maps into one large channel lookup map
  59. channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps}
  60. tn.write(b"clientlist\n")
  61. response = tn.read_until(b"\n").decode("utf-8")
  62. print("after clientlist")
  63. print(response)
  64. print(tn.read_until(b"\n").decode("utf-8"))
  65. print("----")
  66. entry_maps = parse_ts_response(response)
  67. # combine the maps into one large map, ignoring serveradmin query user
  68. client_info = {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]}
  69. for k, v in client_info.items():
  70. tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8"))
  71. response = tn.read_until(b"\n").decode("utf-8")
  72. print(f"after clientinfo for {k}")
  73. print(response)
  74. print(tn.read_until(b"\n").decode("utf-8"))
  75. print("----")
  76. # info is key=value pairs separated by spaces
  77. pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent]
  78. # rearrange into a map and put in the client_info
  79. v["client_info"] = {k: v for k, v in pairs}
  80. tn.write(b"quit\n")
  81. print("after quit")
  82. print(tn.read_until(b"\n").decode("utf-8"))
  83. return client_info, channels
  84. def get_users():
  85. client_info, channels = query_ts(
  86. app.config["host"],
  87. app.config["port"],
  88. app.config["user"],
  89. app.config["pass"]
  90. )
  91. users = []
  92. channel_users = defaultdict(list)
  93. for name, info in client_info.items():
  94. user_text = name
  95. audio_status = []
  96. if info["client_info"].get("client_input_muted", "0") == "1":
  97. audio_status.append("Mic muted")
  98. if info["client_info"].get("client_output_muted", "0") == "1":
  99. audio_status.append("Sound muted")
  100. if len(audio_status) > 0:
  101. user_text += f" ({', '.join(audio_status)})"
  102. idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0")))
  103. if idle >= IDLE_TIMEOUT:
  104. # strip out the sub-second resolution
  105. idle -= timedelta(microseconds=idle.microseconds)
  106. user_text += f" (Idle for {idle})"
  107. users.append(user_text)
  108. channel_users[channels[info["cid"]]].append(user_text)
  109. return sorted(users), {k: sorted(v) for k, v in channel_users.items()}
  110. @app.route("/")
  111. def get_status():
  112. return jsonify({"users": get_users()[0]})
  113. @app.route("/page")
  114. def get_status_page():
  115. return render_template("embedded_body.html", users=get_users()[1], refresh_rate=REFRESH_RATE)
  116. @app.route("/audiobot")
  117. def message_audiobot():
  118. post_to_channel(
  119. app.config["host"],
  120. app.config["port"],
  121. app.config["user"],
  122. app.config["pass"],
  123. "Hello, world!"
  124. )
  125. return "", 204
  126. if __name__ == "__main__":
  127. with open("secret.toml") as infile:
  128. cfg = toml.load(infile)
  129. for k in ("host", "port", "user", "pass"):
  130. app.config[k] = cfg[k]
  131. app.run("0.0.0.0", 5000, debug=True, threaded=True)