Coverage for melissa/launcher/__main__.py: 16%

193 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31import argparse 

32import contextlib 

33import logging 

34import os 

35import shutil 

36import signal 

37import socket 

38import sys 

39import threading 

40from types import FrameType 

41from typing import Optional, Union, Dict, Any 

42 

43from melissa.utility import networking, time, timer 

44from melissa.scheduler.scheduler import Options as SchedulerOptions 

45 

46from datetime import datetime 

47# from jsonschema import Draft4Validator, validators 

48# from jsonschema.exceptions import ValidationError 

49from pathlib import Path 

50import rapidjson 

51from melissa.utility.logger import configure_logger, get_log_level_from_verbosity 

52from melissa.launcher.parser import (homogenize_arguments, 

53 check_protocol_choice, 

54 setup_scheduler, 

55 locate_executable) 

56 

57from . import event, state_machine, monitoring, processor 

58from . import __version__ as launcher_version 

59from .io import IoMaster 

60from .queue import Queue 

61from .monitoring import RestHttpServer 

62from melissa.launcher.schema import validate_config, CONFIG_PARSE_MODE, print_options 

63 

64logger = logging.getLogger(__name__) 

65 

66 

67def main() -> Union[int, str]: 

68 parser = argparse.ArgumentParser( 

69 prog="melissa-launcher", description="Melissa Launcher" 

70 ) 

71 

72 # CLI flags 

73 

74 parser.add_argument( 

75 "--config_name", 

76 "-c", 

77 help="User defined configuration file. Path can be " 

78 "relative or absolute.", 

79 type=str 

80 ) 

81 

82 parser.add_argument( 

83 "--version", 

84 "-v", 

85 action="version", 

86 help="show the Melissa version", 

87 version="%(prog)s {:s}".format(launcher_version) 

88 ) 

89 

90 parser.add_argument( 

91 "--print-options", 

92 help="Show the available configuration options", 

93 action="store_true", 

94 default=None 

95 ) 

96 

97 print_head() 

98 

99 args = parser.parse_args() 

100 

101 if args.print_options: 

102 print_options() 

103 

104 conf_name, project_dir = homogenize_arguments(args.config_name) 

105 

106 with open(Path(project_dir) / f"{conf_name}.json") as json_file: 

107 config_dict = rapidjson.load(json_file, parse_mode=CONFIG_PARSE_MODE) 

108 

109 args, config_dict = validate_config(args, config_dict) 

110 lconfig = config_dict['launcher_config'] 

111 

112 # make the output directory path 

113 output_dir = make_and_configure_output_dir(config_dict, project_dir, conf_name) 

114 

115 log_level = get_log_level_from_verbosity(lconfig.get("verbosity", 3)) 

116 configure_logger(str(output_dir / "melissa_launcher.log"), log_level) 

117 

118 logger.info(f"melissa-launcher PID={os.getpid()}") 

119 

120 try: 

121 server_executable = locate_executable(output_dir / lconfig['server_executable']) 

122 except Exception as e: 

123 return str(e) 

124 

125 logger.debug(f"server executable {server_executable}") 

126 

127 protocol = check_protocol_choice(lconfig) 

128 

129 scheduler_impl, sched_client_cmd, sched_server_cmd = setup_scheduler(lconfig) 

130 

131 # set up options to pass directly to the scheduler command 

132 sched_client_cmd_opt = lconfig["scheduler_client_command_options"] 

133 sched_server_cmd_opt = lconfig["scheduler_server_command_options"] 

134 

135 scheduler_is_available, info = scheduler_impl.is_available() 

136 if scheduler_is_available: 

137 logger.info(f"found {info[0]} (version {info[1]}") 

138 if lconfig['scheduler'] == "oar": 

139 scheduler = scheduler_impl("openmpi") 

140 elif lconfig['scheduler'] == "oar-hybrid": 

141 container_options = lconfig['scheduler_arg_container'] 

142 container_client_size = lconfig['container_client_size'] 

143 besteffort_alloc_freq = lconfig['besteffort_allocation_frequency'] 

144 scheduler = scheduler_impl( 

145 "openmpi", 

146 container_options, 

147 container_client_size, 

148 besteffort_alloc_freq 

149 ) # type: ignore 

150 else: 

151 scheduler = scheduler_impl() # type: ignore 

152 else: 

153 logger.error(f"scheduler {lconfig['scheduler']} unusable: {info}") 

154 return 1 

155 

156 del scheduler_impl 

157 

158 client_options = SchedulerOptions( 

159 sched_client_cmd, 

160 sched_client_cmd_opt, 

161 lconfig['scheduler_arg'] + lconfig['scheduler_arg_client'] 

162 ) 

163 server_options = SchedulerOptions( 

164 sched_server_cmd, 

165 sched_server_cmd_opt, 

166 lconfig['scheduler_arg'] + lconfig['scheduler_arg_server'] 

167 ) 

168 

169 # set up std output files option 

170 std_output = lconfig["std_output"] 

171 logger.info(f"std outputs {std_output}") 

172 

173 # set up launcher scheduling constraints 

174 job_limit = lconfig["job_limit"] 

175 timer_delay = lconfig.get("timer_delay", 5) 

176 job_update_interval = lconfig.get("timer_delay", 30) 

177 

178 # CHANGE DIRECTORY 

179 # KEEP THIS IN MIND WHEN HANDLING RELATIVE USER-PROVIDED PATHS. 

180 os.chdir(output_dir) 

181 

182 # set up sockets 

183 signalfd_r, signalfd_w = networking.pipe() 

184 timerfd_0, timerfd_1 = networking.socketpair() 

185 eventfd_r, eventfd_w = networking.pipe() 

186 listenfd = networking.make_passive_socket(node=lconfig['bind'], protocol=protocol) 

187 

188 # do not limit the queue size because the thread reading from the queue 

189 # will also put items into it 

190 event_fifo = Queue(eventfd_w) 

191 event_fifo.put(event.Timeout()) 

192 

193 def handle_signal(signo: int, _: Optional[FrameType]) -> None: 

194 signame = signal.Signals(signo).name 

195 logger.info(f"received signal {signo} {signame}") 

196 bs = signo.to_bytes(1, byteorder=sys.byteorder) 

197 signalfd_w.send(bs) 

198 

199 with contextlib.ExitStack() as cm: 

200 # reset the signal handlers before closing the file descriptors 

201 signalfd_r = cm.enter_context(signalfd_r) 

202 signalfd_w = cm.enter_context(signalfd_w) 

203 timerfd_0 = cm.enter_context(timerfd_0) 

204 # timerfd_1 is closed manually after Timer thread stopped 

205 eventfd_r = cm.enter_context(eventfd_r) 

206 eventfd_w = cm.enter_context(eventfd_w) 

207 listenfd = cm.enter_context(listenfd) 

208 

209 signals = [signal.SIGINT, signal.SIGPIPE, signal.SIGTERM] 

210 for s in signals: 

211 signal.signal(s, handle_signal) 

212 # reset the signal handlers before closing the signal file 

213 # descriptors 

214 cm.callback(signal.signal, s, signal.SIG_DFL) 

215 

216 # set up state machine 

217 _, port = listenfd.getsockname() 

218 logger.info(f"fault-tolerance {bool(lconfig['fault_tolerance'])}") 

219 sm_config = state_machine.Configuration( 

220 cwd=os.getcwd(), 

221 launcher_host=socket.gethostname(), 

222 launcher_port=port, 

223 launcher_protocol=networking.protocol2str(protocol), 

224 server_executable=server_executable, 

225 no_fault_tolerance=not lconfig['fault_tolerance'], 

226 server_ping_interval=lconfig.get('server_timeout', 60.) / 2., 

227 job_update_interval=job_update_interval, 

228 ) 

229 initial_state = state_machine.State() 

230 

231 # set up web server, processor 

232 http_server_address = ( 

233 "" if lconfig['bind'] is None else lconfig['bind'], lconfig['http_port'] 

234 ) 

235 web_token = monitoring.generate_token( 

236 ) if not lconfig['http_token'] else lconfig['http_token'] 

237 logger.info(f"generated web token {web_token}") 

238 

239 print("Access the terminal-based Melissa monitor by opening a new " 

240 "terminal and executing:\n\n" 

241 "melissa-monitor " 

242 f"--http_bind={lconfig['bind']} " 

243 f"--http_port={lconfig['http_port']} " 

244 f"--http_token={web_token} " 

245 f"--output_dir={output_dir} \n" 

246 ) 

247 

248 print(f"All output for current run will be written to {output_dir}\n") 

249 if config_dict.get("vscode_debugging", False): 

250 print("**`vscode_debugger` active in user config**\n" 

251 "Waiting for debugger attach, please start the " 

252 "debugger by navigating to debugger pane, " 

253 "ctrl+shift+d, and selecting:\n" 

254 "Python: Remote Attach") 

255 

256 if lconfig["load_from_checkpoint"]: 

257 # look for checkpoint files in the current directory 

258 # set the MELISSA_RESTART environment variable accordingly 

259 check_checkpoint_metadata(sm_config) 

260 

261 web_server = RestHttpServer( 

262 http_server_address, web_token, initial_state 

263 ) 

264 event_processor = processor.DefaultProcessor( 

265 sm_config, initial_state, web_server 

266 ) 

267 webfd = socket.fromfd( 

268 web_server.fileno(), socket.AF_INET, socket.SOCK_STREAM 

269 ) 

270 

271 _, web_port = webfd.getsockname() 

272 logger.info(f"webserver listening on port {web_port}") 

273 

274 timer_thread = timer.Timer(timerfd_1, interval=time.Time(seconds=timer_delay)) 

275 t_timer = threading.Thread( 

276 target=lambda: timer_thread.run(), daemon=True 

277 ) 

278 io_master = IoMaster( 

279 listenfd, signalfd_r, timerfd_0, webfd, eventfd_r, event_fifo, 

280 scheduler, client_options, server_options, event_processor, protocol, 

281 job_limit=job_limit, std_output=std_output 

282 ) 

283 t_timer.start() 

284 status = io_master.run() 

285 

286 t_timer.join(timeout=1) 

287 if t_timer.is_alive(): 

288 logger.warning("timer thread did not terminate") 

289 else: 

290 timerfd_1.close() 

291 

292 return status 

293 

294 

295def make_and_configure_output_dir(config: Dict[str, Any], 

296 project_dir: str, conf_name: str) -> Path: 

297 """ 

298 Create the user defined output directory, copy the client/server templates to the directory, 

299 then 

300 modify the scripts to point to the proper executables. 

301 """ 

302 

303 default_out_dir = datetime.now().strftime('melissa-%Y%m%dT%H%M%S') 

304 output_dir = config.get('output_dir', default_out_dir) 

305 logger.info(f'Writing output to {str(output_dir)}') 

306 

307 # allow relative or absolute path passing for output dir 

308 proj_path = Path(project_dir) 

309 if output_dir.startswith('/'): 

310 out_path = Path(output_dir) 

311 else: 

312 out_path = Path(proj_path) / output_dir 

313 

314 out_path.mkdir(parents=True, exist_ok=True) 

315 melissa_src_path = Path(__file__).parent.parent.parent.resolve() 

316 melissa_server_bin_path = config['server_config'].get( 

317 'melissa_server_env', 

318 Path(__file__).parent.parent.parent.resolve() 

319 ) 

320 melissa_client_bin_path = config['client_config'].get( 

321 'melissa_client_env', 

322 Path(__file__).parent.parent.parent.resolve() 

323 ) 

324 bash_script_path = melissa_src_path / 'melissa/utility/bash_scripts' 

325 

326 # create output dir structure 

327 Path(out_path / 'client_scripts').mkdir(parents=True, exist_ok=True) 

328 Path(out_path / 'stdout').mkdir(parents=True, exist_ok=True) 

329 

330 # copy all the launch files to the output dir 

331 shutil.copy(proj_path / f"{conf_name}.json", out_path / f"{conf_name}.json") 

332 shutil.copy(bash_script_path / "server.sh", out_path / "server.sh") 

333 os.chmod(out_path / "server.sh", 0o765) 

334 shutil.copy(bash_script_path / "client.sh", out_path / "client.sh") 

335 os.chmod(out_path / "client.sh", 0o765) 

336 

337 with open(out_path / "client.sh", "r") as f: 

338 data = f.read() 

339 if os.path.exists(str(melissa_client_bin_path) + "/melissa_set_env.sh"): 

340 data = data.replace( 

341 "{melissa_set_env_file}", str(melissa_client_bin_path) + "/melissa_set_env.sh" 

342 ) 

343 else: 

344 data = data.replace(". {melissa_set_env_file}", "") 

345 data = data.replace("{executable_command}", config['client_config']['executable_command']) 

346 

347 with open(out_path / "client.sh", "w") as f: 

348 f.write(data) 

349 

350 preproc_instr = config['client_config'].get("preprocessing_commands") 

351 set_user_defined_preprocessing_instructions(preproc_instr, out_path / "client.sh") 

352 

353 with open(out_path / "server.sh", "r") as f: 

354 data = f.read() 

355 if os.path.exists(str(melissa_server_bin_path) + "/melissa_set_env.sh"): 

356 data = data.replace( 

357 '{melissa_set_env_file}', str(melissa_server_bin_path) + '/melissa_set_env.sh' 

358 ) 

359 else: 

360 data = data.replace(". {melissa_set_env_file}", "") 

361 data = data.replace('{path_to_usecase}', project_dir) 

362 data = data.replace('{config_name}', conf_name) 

363 

364 with open(out_path / "server.sh", "w") as f: 

365 f.write(data) 

366 

367 preproc_instr = config['server_config'].get("preprocessing_commands") 

368 set_user_defined_preprocessing_instructions(preproc_instr, out_path / "server.sh") 

369 

370 return out_path 

371 

372 

373def set_user_defined_preprocessing_instructions(instructions: list, file_path: Path): 

374 lines = open(file_path, 'r').readlines() 

375 out = open(file_path, 'w') 

376 lines[4] = "\n".join(instructions) + "\n" 

377 out.writelines(lines) 

378 out.close() 

379 

380 

381def check_checkpoint_metadata(sm_config: state_machine.Configuration): 

382 """ 

383 Look for the restart metadata file and set the environment variable accordingly 

384 """ 

385 if not os.path.exists('checkpoints/restart_metadata.json'): 

386 raise FileNotFoundError("No restart metadata file found. Please deactivate " 

387 "`load_from_checkpoint` in your config.") 

388 

389 with open('checkpoints/restart_metadata.json', 'r') as f: 

390 metadata = rapidjson.load(f) 

391 

392 sm_config.server_submission = metadata["MELISSA_RESTART"] 

393 

394 return 

395 

396 

397def print_head(): 

398 print("") 

399 print("$! " + "-" * 46 + " $!") 

400 print( 

401 " __ __ ______ _ _____ _____ _____ " # noqa: E501,W605 

402 ) 

403 print( 

404 " | \/ | ____| | |_ _|/ ____/ ____| /\ " # noqa: E501,W605 

405 ) 

406 print( 

407 " | \ / | |__ | | | | | (___| (___ / \ " # noqa: E501,W605 

408 ) 

409 print( 

410 " | |\/| | __| | | | | \___ \\\\___ \ / /\ \ " # noqa: E501,W605 

411 ) 

412 print( 

413 " | | | | |____| |____ _| |_ ____) |___) / ____ \ " # noqa: E501,W605 

414 ) 

415 print( 

416 " |_| |_|______|______|_____|_____/_____/_/ \_\ " # noqa: E501,W605 

417 ) 

418 print("") 

419 print("$! " + "-" * 46 + " $!") 

420 print("") 

421 

422 

423if __name__ == "__main__": 

424 sys.exit(main())