Coverage for melissa/launcher/monitoring/terminal_monitor.py: 0%
98 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
1import requests
2import argparse
3import plotext as plt
4from typing import Dict, Any
5import time
6import os
7import sys
8from subprocess import check_output
9from pathlib import Path
11"""
12Helper script for tracking job status live in the terminal
13Dependencies:
14sudo apt-get install gnuplot
15pip3 install termplotlib termgraph
16"""
19def update_job_dict(full_job_dict: Dict[int, Dict[str, Any]],
20 response: dict,
21 args: argparse.Namespace, header: dict) -> Dict[str, int]:
22 """
23 Scan REST API for jobs, aggregate state information
24 """
25 for job in response['jobs']:
26 job_dict = requests.get(
27 f'http://{args.http_bind}:{args.http_port}/jobs/{job}', headers=header).json()
28 uid = job_dict['unique_id']
30 full_job_dict[uid] = job_dict
32 # aggregate the job statuses
33 state_dict = {'RUNNING': 0, 'TERMINATED': 0, 'WAITING': 0, 'FAILED': 0}
34 fjd = full_job_dict
35 for uid in fjd.keys():
36 state_dict[fjd[uid]['state']] += 1
38 return state_dict
41def plot_state_dict(state_dict: Dict[str, int]):
42 """
43 Plot state_dict to terminal
44 """
45 plt.clear_figure()
46 os.system('clear')
47 print('-------------------------`melissa-monitor`-------------------------\n')
48 plt.simple_bar(state_dict.keys(), state_dict.values(), color="green")
49 plt.show()
52def print_tail(fname, lines=5):
53 """"
54 Log files can become large in melissa, so
55 we take extra precaution to only ever load the
56 tail of the log files to memory
57 """
58 with open(fname, "rb") as f:
59 f.seek(0, 2)
60 bytes_in_file = f.tell()
61 lines_found, total_bytes_scanned = 0, 0
62 while (lines + 1 > lines_found
63 and bytes_in_file > total_bytes_scanned):
64 byte_block = min(
65 4096,
66 bytes_in_file - total_bytes_scanned)
67 f.seek(-(byte_block + total_bytes_scanned), 2)
68 total_bytes_scanned += byte_block
69 lines_found += f.read(4096).count(str.encode('\n'))
70 f.seek(-total_bytes_scanned, 2)
71 line_list = list(f.readlines())
72 print_list = [li.decode("utf-8").strip() for li in line_list]
73 print(*print_list[-lines:], sep='\n')
76def get_server_launcher_logs(args: argparse.Namespace):
77 server_log_path = Path(args.output_dir) / 'melissa_server_0.log'
78 launcher_log_path = Path(args.output_dir) / 'melissa_launcher.log'
79 print('\n-------------------------Server log tail-------------------------\n')
80 print_tail(server_log_path, 6)
81 print('\n------------------------Launcher log tail------------------------\n')
82 print_tail(launcher_log_path, 6)
85def get_eacct_ear_output(jobs: dict):
86 job_ids = [str(job_id) for job_id in jobs.keys()]
87 args = ["eacct", "-n", str(len(job_ids))]
88 out = check_output(args).decode()
90 total_energy = 0.
91 total_time = 0.
92 total_jobs = 0.
93 lines = out.splitlines()[1:]
94 for line in lines:
95 if not line == "":
96 data = line.split()
97 job_id = data[0].split("-")[0]
98 if int(job_id) in jobs:
99 total_jobs += 1
100 try:
101 total_energy += float(data[10])
102 total_time += float(data[6])
103 except Exception as e:
104 print(f"exception encountered {e}.")
105 pass
107 print('\n---------------------------EAR Metrics---------------------------\n')
108 print(f"Total energy consumed: {total_energy:.2f} (J), "
109 f"Average client time: {total_time/total_jobs:.2f} (s)")
112def get_parsed_args() -> argparse.Namespace:
113 """
114 Parse CLI args and return them
115 """
116 parser = argparse.ArgumentParser(
117 prog="melissa-monitor",
118 description="A helper tool for monitoring melissa-launcher job status"
119 )
121 parser.add_argument(
122 "--http_bind",
123 help="Host address of the melissa-launcher http server "
124 "defined in `launcher_config` as 'http_bind",
125 default="frontend"
126 )
128 parser.add_argument(
129 "--http_port",
130 help="Port on host to access REST API "
131 "defined in `launcher_config` as 'http_port",
132 default="8888"
133 )
135 parser.add_argument(
136 "--http_token",
137 help="Token set in `launcher_config to access "
138 "REST API",
139 default="study1324"
140 )
142 parser.add_argument(
143 "--output_dir",
144 help="Output dir for current simulation",
145 default=""
146 )
148 parser.add_argument(
149 "--report_eacct_metrics",
150 help="Report eacct metrics if available."
151 )
153 return parser.parse_args()
156def main():
157 """
158 Helper script for plotting job status to terminal
159 """
161 args = get_parsed_args()
162 header = {'token': args.http_token}
163 full_job_dict = {}
165 while True:
167 # Ping the launcher REST API to get the jobs list
168 try:
169 response = requests.get(f'http://{args.http_bind}:{args.http_port}/jobs',
170 headers=header).json()
171 except Exception:
172 print('Melissa study completed.')
173 break
175 # Create a full job dict
176 state_dict = update_job_dict(full_job_dict, response, args, header)
178 # Create the terminal page
179 # plot the job states as a bar chart
180 plot_state_dict(state_dict)
181 # parse and summarize ear info if available
182 if args.report_eacct_metrics:
183 try:
184 get_eacct_ear_output(full_job_dict)
185 except Exception as e:
186 print(f'Unable to get eacct output {e}.')
187 # print the tail of the launcher/server logs
188 get_server_launcher_logs(args)
190 time.sleep(5)
193if __name__ == "__main__":
194 sys.exit(main())