app.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. #!/usr/bin/env python3
  2. import tempfile
  3. import os.path
  4. from telnetlib import Telnet
  5. from collections import defaultdict
  6. from datetime import timedelta
  7. import toml
  8. from flask import Flask, jsonify, render_template, request
  9. from flask_cors import CORS
  10. IDLE_TIMEOUT = timedelta(minutes=5)
  11. REFRESH_RATE = 60 # seconds
  12. app = Flask(__name__)
  13. CORS(app)
  14. with open("secret.toml") as infile:
  15. cfg = toml.load(infile)
  16. for k in ("host", "port", "user", "pass"):
  17. app.config[k] = cfg[k]
  18. def parse_ts_response(response):
  19. # entries separated by |'s
  20. entries = response.split("|")
  21. # entries contain key=value pairs separated by spaces
  22. pairs = [[pr.split("=", 1) for pr in ent.split()] for ent in entries]
  23. # rearrange these into maps for convenience
  24. return [{k: v for k, v in pr} for pr in pairs]
  25. def login(tn, username, password):
  26. login = f"login {username} {password}\n".encode("utf-8")
  27. print("connection")
  28. print(tn.read_until(b"\n").decode("utf-8"))
  29. print(tn.read_until(b"\n").decode("utf-8"))
  30. print("----")
  31. tn.write(login)
  32. print("after login")
  33. print(tn.read_until(b"\n").decode("utf-8"))
  34. print("----")
  35. tn.write(b"use 1 -virtual\n")
  36. print("after use")
  37. print(tn.read_until(b"\n").decode("utf-8"))
  38. print("----")
  39. def post_to_channel(host, port, username, password, message):
  40. message_santized = message.strip().replace(" ", r"\s")
  41. message_command = f"sendtextmessage targetmode=2 msg={message_santized}\n".encode("utf-8")
  42. print(message_command.decode("utf-8"))
  43. with Telnet(host, port, 5) as tn:
  44. login(tn, username, password)
  45. tn.write(message_command)
  46. tn.write(b"quit\n")
  47. def query_ts(host, port, username, password):
  48. with Telnet(host, port, 5) as tn:
  49. login(tn, username, password)
  50. tn.write(b"channellist\n")
  51. response = tn.read_until(b"\n").decode("utf-8")
  52. print("after channellist")
  53. print(response)
  54. print(tn.read_until(b"\n").decode("utf-8"))
  55. print("----")
  56. channel_maps = parse_ts_response(response)
  57. # rearrange the maps into one large channel lookup map
  58. channels = {info["cid"]: info["channel_name"].replace(r"\s", " ") for info in channel_maps}
  59. tn.write(b"clientlist\n")
  60. response = tn.read_until(b"\n").decode("utf-8")
  61. print("after clientlist")
  62. print(response)
  63. print(tn.read_until(b"\n").decode("utf-8"))
  64. print("----")
  65. entry_maps = parse_ts_response(response)
  66. # combine the maps into one large map, ignoring serveradmin query user
  67. client_info = {info["client_nickname"]: info for info in entry_maps if "serveradmin" not in info["client_nickname"]}
  68. for k, v in client_info.items():
  69. tn.write(f"clientinfo clid={v['clid']}\n".encode("utf-8"))
  70. response = tn.read_until(b"\n").decode("utf-8")
  71. print(f"after clientinfo for {k}")
  72. print(response)
  73. print(tn.read_until(b"\n").decode("utf-8"))
  74. print("----")
  75. # info is key=value pairs separated by spaces
  76. pairs = [ent.split("=", 1) for ent in response.split() if "=" in ent]
  77. # rearrange into a map and put in the client_info
  78. v["client_info"] = {k: v for k, v in pairs}
  79. tn.write(b"quit\n")
  80. print("after quit")
  81. print(tn.read_until(b"\n").decode("utf-8"))
  82. return client_info, channels
  83. def get_users():
  84. client_info, channels = query_ts(
  85. app.config["host"],
  86. app.config["port"],
  87. app.config["user"],
  88. app.config["pass"]
  89. )
  90. users = []
  91. channel_users = defaultdict(list)
  92. for name, info in client_info.items():
  93. user_text = name
  94. audio_status = []
  95. if info["client_info"].get("client_input_muted", "0") == "1":
  96. audio_status.append("Mic muted")
  97. if info["client_info"].get("client_output_muted", "0") == "1":
  98. audio_status.append("Sound muted")
  99. if len(audio_status) > 0:
  100. user_text += f" ({', '.join(audio_status)})"
  101. idle = timedelta(milliseconds=int(info["client_info"].get("client_idle_time", "0")))
  102. if idle >= IDLE_TIMEOUT:
  103. # strip out the sub-second resolution
  104. idle -= timedelta(microseconds=idle.microseconds)
  105. user_text += f" (Idle for {idle})"
  106. users.append(user_text)
  107. channel_users[channels[info["cid"]]].append(user_text)
  108. return sorted(users), {k: sorted(v) for k, v in channel_users.items()}
  109. @app.route("/")
  110. def get_status():
  111. return jsonify({"users": get_users()[0]})
  112. @app.route("/page")
  113. def get_status_page():
  114. return render_template("embedded_body.html", users=get_users()[1], refresh_rate=REFRESH_RATE)
  115. @app.route("/audiobot", methods=["POST"])
  116. def message_audiobot():
  117. temp_dir = tempfile.mkdtemp()
  118. temp_file = os.path.join(temp_dir, "received.mp3")
  119. with open(temp_file, "wb") as f:
  120. f.write(request.data)
  121. f.close()
  122. post_to_channel(
  123. app.config["host"],
  124. app.config["port"],
  125. app.config["user"],
  126. app.config["pass"],
  127. f"!play {temp_file}"
  128. )
  129. return "", 204
  130. if __name__ == "__main__":
  131. app.run("0.0.0.0", 5000, debug=True, threaded=True)