app.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. #!/usr/bin/env python3
  2. import tempfile
  3. import os.path
  4. from telnetlib import Telnet
  5. from collections import defaultdict
  6. from datetime import timedelta
  7. import toml
  8. from flask import Flask, jsonify, render_template, request
  9. from flask_cors import CORS
  10. IDLE_TIMEOUT = timedelta(minutes=5)
  11. REFRESH_RATE = 60 # seconds
  12. app = Flask(__name__)
  13. CORS(app)
  14. def parse_ts_response(response):
  15. # entries separated by |'s
  16. entries = response.split("|")
  17. # entries contain key=value pairs separated by spaces
  18. pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries]
  19. # rearrange these into maps for convenience
  20. return [{k: v for k, v in pr} for pr in pairs]
  21. def login(tn, username, password):
  22. login = f"login {username} {password}\n".encode("utf-8")
  23. print("connection")
  24. print(tn.read_until(b"\n").decode("utf-8"))
  25. print(tn.read_until(b"\n").decode("utf-8"))
  26. print("----")
  27. tn.write(login)
  28. print("after login")
  29. print(tn.read_until(b"\n").decode("utf-8"))
  30. print("----")
  31. tn.write(b"use 1 -virtual\n")
  32. print("after use")
  33. print(tn.read_until(b"\n").decode("utf-8"))
  34. print("----")
  35. def post_to_channel(host, port, username, password, message):
  36. message_santized = message.strip().replace(" ", r"\s")
  37. message_command = f"sendtextmessage targetmode=2 msg={message_santized}\n".encode("utf-8")
  38. print(message_command.decode("utf-8"))
  39. with Telnet(host, port, 5) as tn:
  40. login(tn, username, password)
  41. tn.write(message_command)
  42. tn.write(b"quit\n")
  43. def query_ts(host, port, username, password):
  44. with Telnet(host, port, 5) as tn:
  45. login(tn, username, password)
  46. tn.write(b"channellist\n")
  47. response = tn.read_until(b"\n").decode("utf-8")
  48. print("after channellist")
  49. print(response)
  50. print(tn.read_until(b"\n").decode("utf-8"))
  51. print("----")
  52. channel_maps = parse_ts_response(response)
  53. # rearrange the maps into one large channel lookup map
  54. channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps}
  55. tn.write(b"clientlist\n")
  56. response = tn.read_until(b"\n").decode("utf-8")
  57. print("after clientlist")
  58. print(response)
  59. print(tn.read_until(b"\n").decode("utf-8"))
  60. print("----")
  61. entry_maps = parse_ts_response(response)
  62. # combine the maps into one large map, ignoring serveradmin query user
  63. client_info = {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]}
  64. for k, v in client_info.items():
  65. tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8"))
  66. response = tn.read_until(b"\n").decode("utf-8")
  67. print(f"after clientinfo for {k}")
  68. print(response)
  69. print(tn.read_until(b"\n").decode("utf-8"))
  70. print("----")
  71. # info is key=value pairs separated by spaces
  72. pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent]
  73. # rearrange into a map and put in the client_info
  74. v["client_info"] = {k: v for k, v in pairs}
  75. tn.write(b"quit\n")
  76. print("after quit")
  77. print(tn.read_until(b"\n").decode("utf-8"))
  78. return client_info, channels
  79. def get_users():
  80. client_info, channels = query_ts(
  81. app.config["host"],
  82. app.config["port"],
  83. app.config["user"],
  84. app.config["pass"]
  85. )
  86. users = []
  87. channel_users = defaultdict(list)
  88. for name, info in client_info.items():
  89. user_text = name
  90. audio_status = []
  91. if info["client_info"].get("client_input_muted", "0") == "1":
  92. audio_status.append("Mic muted")
  93. if info["client_info"].get("client_output_muted", "0") == "1":
  94. audio_status.append("Sound muted")
  95. if len(audio_status) > 0:
  96. user_text += f" ({', '.join(audio_status)})"
  97. idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0")))
  98. if idle >= IDLE_TIMEOUT:
  99. # strip out the sub-second resolution
  100. idle -= timedelta(microseconds=idle.microseconds)
  101. user_text += f" (Idle for {idle})"
  102. users.append(user_text)
  103. channel_users[channels[info["cid"]]].append(user_text)
  104. return sorted(users), {k: sorted(v) for k, v in channel_users.items()}
  105. @app.route("/")
  106. def get_status():
  107. return jsonify({"users": get_users()[0]})
  108. @app.route("/page")
  109. def get_status_page():
  110. return render_template("embedded_body.html", users=get_users()[1], refresh_rate=REFRESH_RATE)
  111. @app.route("/audiobot", methods=["POST"])
  112. def message_audiobot():
  113. temp_dir = tempfile.mkdtemp()
  114. temp_file = os.path.join(temp_dir, "received.mp3")
  115. with open(temp_file, "wb") as f:
  116. f.write(request.data)
  117. f.close()
  118. post_to_channel(
  119. app.config["host"],
  120. app.config["port"],
  121. app.config["user"],
  122. app.config["pass"],
  123. f"!play {temp_file}"
  124. )
  125. return "", 204
  126. if __name__ == "__main__":
  127. with open("secret.toml") as infile:
  128. cfg = toml.load(infile)
  129. for k in ("host", "port", "user", "pass"):
  130. app.config[k] = cfg[k]
  131. app.run("0.0.0.0", 5000, debug=True, threaded=True)