Coverage for melissa/launcher/monitoring/terminal_monitor.py: 0%

98 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1import requests 

2import argparse 

3import plotext as plt 

4from typing import Dict, Any 

5import time 

6import os 

7import sys 

8from subprocess import check_output 

9from pathlib import Path 

10 

11""" 

12Helper script for tracking job status live in the terminal 

13Dependencies: 

14sudo apt-get install gnuplot 

15pip3 install termplotlib termgraph 

16""" 

17 

18 

19def update_job_dict(full_job_dict: Dict[int, Dict[str, Any]], 

20 response: dict, 

21 args: argparse.Namespace, header: dict) -> Dict[str, int]: 

22 """ 

23 Scan REST API for jobs, aggregate state information 

24 """ 

25 for job in response['jobs']: 

26 job_dict = requests.get( 

27 f'http://{args.http_bind}:{args.http_port}/jobs/{job}', headers=header).json() 

28 uid = job_dict['unique_id'] 

29 

30 full_job_dict[uid] = job_dict 

31 

32 # aggregate the job statuses 

33 state_dict = {'RUNNING': 0, 'TERMINATED': 0, 'WAITING': 0, 'FAILED': 0} 

34 fjd = full_job_dict 

35 for uid in fjd.keys(): 

36 state_dict[fjd[uid]['state']] += 1 

37 

38 return state_dict 

39 

40 

41def plot_state_dict(state_dict: Dict[str, int]): 

42 """ 

43 Plot state_dict to terminal 

44 """ 

45 plt.clear_figure() 

46 os.system('clear') 

47 print('-------------------------`melissa-monitor`-------------------------\n') 

48 plt.simple_bar(state_dict.keys(), state_dict.values(), color="green") 

49 plt.show() 

50 

51 

52def print_tail(fname, lines=5): 

53 """" 

54 Log files can become large in melissa, so 

55 we take extra precaution to only ever load the 

56 tail of the log files to memory 

57 """ 

58 with open(fname, "rb") as f: 

59 f.seek(0, 2) 

60 bytes_in_file = f.tell() 

61 lines_found, total_bytes_scanned = 0, 0 

62 while (lines + 1 > lines_found 

63 and bytes_in_file > total_bytes_scanned): 

64 byte_block = min( 

65 4096, 

66 bytes_in_file - total_bytes_scanned) 

67 f.seek(-(byte_block + total_bytes_scanned), 2) 

68 total_bytes_scanned += byte_block 

69 lines_found += f.read(4096).count(str.encode('\n')) 

70 f.seek(-total_bytes_scanned, 2) 

71 line_list = list(f.readlines()) 

72 print_list = [li.decode("utf-8").strip() for li in line_list] 

73 print(*print_list[-lines:], sep='\n') 

74 

75 

76def get_server_launcher_logs(args: argparse.Namespace): 

77 server_log_path = Path(args.output_dir) / 'melissa_server_0.log' 

78 launcher_log_path = Path(args.output_dir) / 'melissa_launcher.log' 

79 print('\n-------------------------Server log tail-------------------------\n') 

80 print_tail(server_log_path, 6) 

81 print('\n------------------------Launcher log tail------------------------\n') 

82 print_tail(launcher_log_path, 6) 

83 

84 

85def get_eacct_ear_output(jobs: dict): 

86 job_ids = [str(job_id) for job_id in jobs.keys()] 

87 args = ["eacct", "-n", str(len(job_ids))] 

88 out = check_output(args).decode() 

89 

90 total_energy = 0. 

91 total_time = 0. 

92 total_jobs = 0. 

93 lines = out.splitlines()[1:] 

94 for line in lines: 

95 if not line == "": 

96 data = line.split() 

97 job_id = data[0].split("-")[0] 

98 if int(job_id) in jobs: 

99 total_jobs += 1 

100 try: 

101 total_energy += float(data[10]) 

102 total_time += float(data[6]) 

103 except Exception as e: 

104 print(f"exception encountered {e}.") 

105 pass 

106 

107 print('\n---------------------------EAR Metrics---------------------------\n') 

108 print(f"Total energy consumed: {total_energy:.2f} (J), " 

109 f"Average client time: {total_time/total_jobs:.2f} (s)") 

110 

111 

112def get_parsed_args() -> argparse.Namespace: 

113 """ 

114 Parse CLI args and return them 

115 """ 

116 parser = argparse.ArgumentParser( 

117 prog="melissa-monitor", 

118 description="A helper tool for monitoring melissa-launcher job status" 

119 ) 

120 

121 parser.add_argument( 

122 "--http_bind", 

123 help="Host address of the melissa-launcher http server " 

124 "defined in `launcher_config` as 'http_bind", 

125 default="frontend" 

126 ) 

127 

128 parser.add_argument( 

129 "--http_port", 

130 help="Port on host to access REST API " 

131 "defined in `launcher_config` as 'http_port", 

132 default="8888" 

133 ) 

134 

135 parser.add_argument( 

136 "--http_token", 

137 help="Token set in `launcher_config to access " 

138 "REST API", 

139 default="study1324" 

140 ) 

141 

142 parser.add_argument( 

143 "--output_dir", 

144 help="Output dir for current simulation", 

145 default="" 

146 ) 

147 

148 parser.add_argument( 

149 "--report_eacct_metrics", 

150 help="Report eacct metrics if available." 

151 ) 

152 

153 return parser.parse_args() 

154 

155 

156def main(): 

157 """ 

158 Helper script for plotting job status to terminal 

159 """ 

160 

161 args = get_parsed_args() 

162 header = {'token': args.http_token} 

163 full_job_dict = {} 

164 

165 while True: 

166 

167 # Ping the launcher REST API to get the jobs list 

168 try: 

169 response = requests.get(f'http://{args.http_bind}:{args.http_port}/jobs', 

170 headers=header).json() 

171 except Exception: 

172 print('Melissa study completed.') 

173 break 

174 

175 # Create a full job dict 

176 state_dict = update_job_dict(full_job_dict, response, args, header) 

177 

178 # Create the terminal page 

179 # plot the job states as a bar chart 

180 plot_state_dict(state_dict) 

181 # parse and summarize ear info if available 

182 if args.report_eacct_metrics: 

183 try: 

184 get_eacct_ear_output(full_job_dict) 

185 except Exception as e: 

186 print(f'Unable to get eacct output {e}.') 

187 # print the tail of the launcher/server logs 

188 get_server_launcher_logs(args) 

189 

190 time.sleep(5) 

191 

192 

193if __name__ == "__main__": 

194 sys.exit(main())