Coverage for melissa/launcher/__main__.py: 17%

193 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-07-09 14:19 +0200

1#!/usr/bin/python3 

2 

3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31import argparse 

32import contextlib 

33import logging 

34import os 

35import shutil 

36import signal 

37import socket 

38import sys 

39import threading 

40from types import FrameType 

41from typing import Optional, Union, Dict, Any 

42 

43from melissa.utility import networking, time, timer 

44from melissa.scheduler.scheduler import Options as SchedulerOptions 

45 

46from datetime import datetime 

47# from jsonschema import Draft4Validator, validators 

48# from jsonschema.exceptions import ValidationError 

49from pathlib import Path 

50import rapidjson 

51from melissa.utility.logger import configure_logger, get_log_level_from_verbosity 

52from melissa.launcher.parser import (homogenize_arguments, 

53 check_protocol_choice, 

54 setup_scheduler, 

55 locate_executable) 

56 

57from . import event, state_machine, monitoring, processor 

58from . import __version__ as launcher_version 

59from .io import IoMaster 

60from .queue import Queue 

61from .monitoring import RestHttpServer 

62from melissa.utility.bcolors import TextColor 

63from melissa.launcher.schema import validate_config, CONFIG_PARSE_MODE, print_options 

64 

65logger = logging.getLogger(__name__) 

66 

67 

68def main() -> Union[int, str]: 

69 parser = argparse.ArgumentParser( 

70 prog="melissa-launcher", description="Melissa Launcher" 

71 ) 

72 

73 # CLI flags 

74 

75 parser.add_argument( 

76 "--config_name", 

77 "-c", 

78 help="User defined configuration file. Path can be " 

79 "relative or absolute.", 

80 type=str 

81 ) 

82 

83 parser.add_argument( 

84 "--version", 

85 "-v", 

86 action="version", 

87 help="show the Melissa version", 

88 version="%(prog)s {:s}".format(launcher_version) 

89 ) 

90 

91 parser.add_argument( 

92 "--print-options", 

93 help="Show the available configuration options", 

94 action="store_true", 

95 default=None 

96 ) 

97 print_head() 

98 

99 args = parser.parse_args() 

100 

101 if args.print_options: 

102 print_options() 

103 

104 conf_name, project_dir = homogenize_arguments(args.config_name) 

105 

106 with open(Path(project_dir) / f"{conf_name}.json") as json_file: 

107 config_dict = rapidjson.load(json_file, parse_mode=CONFIG_PARSE_MODE) 

108 

109 args, config_dict = validate_config(args, config_dict) 

110 lconfig = config_dict['launcher_config'] 

111 

112 # make the output directory path 

113 output_dir = make_and_configure_output_dir(config_dict, project_dir, conf_name) 

114 

115 log_level = get_log_level_from_verbosity(lconfig.get("verbosity", 3)) 

116 configure_logger(str(output_dir / "melissa_launcher.log"), log_level) 

117 

118 logger.info(f"melissa-launcher PID={os.getpid()}") 

119 

120 try: 

121 server_executable = locate_executable(output_dir / lconfig['server_executable']) 

122 except Exception as e: 

123 return str(e) 

124 

125 logger.debug(f"server executable {server_executable}") 

126 

127 protocol = check_protocol_choice(lconfig) 

128 

129 scheduler_impl, sched_client_cmd, sched_server_cmd = setup_scheduler(lconfig) 

130 

131 # set up options to pass directly to the scheduler command 

132 sched_client_cmd_opt = lconfig["scheduler_client_command_options"] 

133 sched_server_cmd_opt = lconfig["scheduler_server_command_options"] 

134 

135 scheduler_is_available, info = scheduler_impl.is_available() 

136 if scheduler_is_available: 

137 logger.info(f"found {info[0]} (version {info[1]}") 

138 if lconfig['scheduler'] == "oar": 

139 scheduler = scheduler_impl("openmpi") 

140 elif lconfig['scheduler'] == "oar-hybrid": 

141 container_options = lconfig['scheduler_arg_container'] 

142 container_client_size = lconfig['container_client_size'] 

143 besteffort_alloc_freq = lconfig['besteffort_allocation_frequency'] 

144 scheduler = scheduler_impl( 

145 "openmpi", 

146 container_options, 

147 container_client_size, 

148 besteffort_alloc_freq 

149 ) # type: ignore 

150 else: 

151 scheduler = scheduler_impl() # type: ignore 

152 else: 

153 logger.error(f"scheduler {lconfig['scheduler']} unusable: {info}") 

154 return 1 

155 

156 del scheduler_impl 

157 

158 client_options = SchedulerOptions( 

159 sched_client_cmd, 

160 sched_client_cmd_opt, 

161 lconfig['scheduler_arg'] + lconfig['scheduler_arg_client'] 

162 ) 

163 server_options = SchedulerOptions( 

164 sched_server_cmd, 

165 sched_server_cmd_opt, 

166 lconfig['scheduler_arg'] + lconfig['scheduler_arg_server'] 

167 ) 

168 

169 # set up std output files option 

170 std_output = lconfig["std_output"] 

171 logger.info(f"std outputs {std_output}") 

172 

173 # set up launcher scheduling constraints 

174 job_limit = lconfig["job_limit"] 

175 timer_delay = lconfig.get("timer_delay", 5) 

176 job_update_interval = lconfig.get("timer_delay", 30) 

177 

178 # CHANGE DIRECTORY 

179 # KEEP THIS IN MIND WHEN HANDLING RELATIVE USER-PROVIDED PATHS. 

180 os.chdir(output_dir) 

181 

182 # set up sockets 

183 signalfd_r, signalfd_w = networking.pipe() 

184 timerfd_0, timerfd_1 = networking.socketpair() 

185 eventfd_r, eventfd_w = networking.pipe() 

186 listenfd = networking.make_passive_socket(node=lconfig['bind'], protocol=protocol) 

187 

188 # do not limit the queue size because the thread reading from the queue 

189 # will also put items into it 

190 event_fifo = Queue(eventfd_w) 

191 event_fifo.put(event.Timeout()) 

192 

193 def handle_signal(signo: int, _: Optional[FrameType]) -> None: 

194 signame = signal.Signals(signo).name 

195 logger.info(f"received signal {signo} {signame}") 

196 bs = signo.to_bytes(1, byteorder=sys.byteorder) 

197 signalfd_w.send(bs) 

198 

199 with contextlib.ExitStack() as cm: 

200 # reset the signal handlers before closing the file descriptors 

201 signalfd_r = cm.enter_context(signalfd_r) 

202 signalfd_w = cm.enter_context(signalfd_w) 

203 timerfd_0 = cm.enter_context(timerfd_0) 

204 # timerfd_1 is closed manually after Timer thread stopped 

205 eventfd_r = cm.enter_context(eventfd_r) 

206 eventfd_w = cm.enter_context(eventfd_w) 

207 listenfd = cm.enter_context(listenfd) 

208 

209 signals = [signal.SIGINT, signal.SIGPIPE, signal.SIGTERM] 

210 for s in signals: 

211 signal.signal(s, handle_signal) 

212 # reset the signal handlers before closing the signal file 

213 # descriptors 

214 cm.callback(signal.signal, s, signal.SIG_DFL) 

215 

216 # set up state machine 

217 _, port = listenfd.getsockname() 

218 logger.info(f"fault-tolerance {bool(lconfig['fault_tolerance'])}") 

219 sm_config = state_machine.Configuration( 

220 cwd=os.getcwd(), 

221 launcher_host=socket.gethostname(), 

222 launcher_port=port, 

223 launcher_protocol=networking.protocol2str(protocol), 

224 server_executable=server_executable, 

225 no_fault_tolerance=not lconfig['fault_tolerance'], 

226 server_ping_interval=lconfig.get('server_timeout', 60.) / 2., 

227 job_update_interval=job_update_interval, 

228 ) 

229 initial_state = state_machine.State() 

230 

231 # set up web server, processor 

232 http_server_address = ( 

233 "" if lconfig['bind'] is None else lconfig['bind'], lconfig['http_port'] 

234 ) 

235 web_token = monitoring.generate_token( 

236 ) if not lconfig['http_token'] else lconfig['http_token'] 

237 logger.info(f"generated web token {web_token}") 

238 

239 print(f"{TextColor.BOLD}\nCurrent outputs directory: {output_dir}\n{TextColor.ENDC}") 

240 

241 print(f"{TextColor.UNDERLINE}Monitor melissa runs using the following command in " 

242 f"a different terminal{TextColor.ENDC}:\n\n" 

243 "melissa-monitor " 

244 f"--http_bind={lconfig['bind']} \\\n" 

245 f"\t--http_port={lconfig['http_port']} \\\n" 

246 f"\t--http_token={web_token} \\\n" 

247 f"\t--output_dir={output_dir}\n\n" 

248 ) 

249 

250 if config_dict.get("vscode_debugging", False): 

251 print("**`vscode_debugger` active in user config**\n" 

252 "Waiting for debugger attach, please start the " 

253 "debugger by navigating to debugger pane, " 

254 "ctrl+shift+d, and selecting:\n" 

255 "Python: Remote Attach") 

256 

257 print(f"{TextColor.YELLOW}Melissa study running...{TextColor.ENDC}\n") 

258 if lconfig["load_from_checkpoint"]: 

259 # look for checkpoint files in the current directory 

260 # set the MELISSA_RESTART environment variable accordingly 

261 check_checkpoint_metadata(sm_config) 

262 

263 web_server = RestHttpServer( 

264 http_server_address, web_token, initial_state 

265 ) 

266 event_processor = processor.DefaultProcessor( 

267 sm_config, initial_state, web_server 

268 ) 

269 webfd = socket.fromfd( 

270 web_server.fileno(), socket.AF_INET, socket.SOCK_STREAM 

271 ) 

272 

273 _, web_port = webfd.getsockname() 

274 logger.info(f"webserver listening on port {web_port}") 

275 

276 timer_thread = timer.Timer(timerfd_1, interval=time.Time(seconds=timer_delay)) 

277 t_timer = threading.Thread( 

278 target=lambda: timer_thread.run(), daemon=True 

279 ) 

280 io_master = IoMaster( 

281 listenfd, signalfd_r, timerfd_0, webfd, eventfd_r, event_fifo, 

282 scheduler, client_options, server_options, event_processor, protocol, 

283 job_limit=job_limit, std_output=std_output 

284 ) 

285 t_timer.start() 

286 status = io_master.run() 

287 

288 t_timer.join(timeout=1) 

289 if t_timer.is_alive(): 

290 logger.warning("timer thread did not terminate") 

291 else: 

292 timerfd_1.close() 

293 

294 if status > 0: 

295 print(f"{TextColor.FAIL}Error(s) encountered during Melissa study. " 

296 f"Check the log files.\n{TextColor.ENDC}") 

297 else: 

298 print(f"{TextColor.OKGREEN}Melissa study completed.\n{TextColor.ENDC}") 

299 return status 

300 

301 

302def make_and_configure_output_dir(config: Dict[str, Any], 

303 project_dir: str, conf_name: str) -> Path: 

304 """ 

305 Create the user defined output directory, copy the client/server templates to the directory, 

306 then 

307 modify the scripts to point to the proper executables. 

308 """ 

309 

310 default_out_dir = datetime.now().strftime('melissa-%Y%m%dT%H%M%S') 

311 output_dir = config.get('output_dir', default_out_dir) 

312 logger.info(f'Writing output to {str(output_dir)}') 

313 

314 # allow relative or absolute path passing for output dir 

315 proj_path = Path(project_dir) 

316 if output_dir.startswith('/'): 

317 out_path = Path(output_dir) 

318 else: 

319 out_path = Path(proj_path) / output_dir 

320 

321 out_path.mkdir(parents=True, exist_ok=True) 

322 melissa_src_path = Path(__file__).parent.parent.parent.resolve() 

323 melissa_server_bin_path = config['server_config'].get( 

324 'melissa_server_env', 

325 Path(__file__).parent.parent.parent.resolve() 

326 ) 

327 melissa_client_bin_path = config['client_config'].get( 

328 'melissa_client_env', 

329 Path(__file__).parent.parent.parent.resolve() 

330 ) 

331 bash_script_path = melissa_src_path / 'melissa/utility/bash_scripts' 

332 

333 # create output dir structure 

334 Path(out_path / 'client_scripts').mkdir(parents=True, exist_ok=True) 

335 Path(out_path / 'stdout').mkdir(parents=True, exist_ok=True) 

336 

337 # copy all the launch files to the output dir 

338 shutil.copy(proj_path / f"{conf_name}.json", out_path / f"{conf_name}.json") 

339 os.chmod(out_path / f"{conf_name}.json", 0o644) 

340 shutil.copy(bash_script_path / "server.sh", out_path / "server.sh") 

341 os.chmod(out_path / "server.sh", 0o765) 

342 shutil.copy(bash_script_path / "client.sh", out_path / "client.sh") 

343 os.chmod(out_path / "client.sh", 0o765) 

344 

345 with open(out_path / "client.sh", "r") as f: 

346 data = f.read() 

347 if os.path.exists(str(melissa_client_bin_path) + "/melissa_set_env.sh"): 

348 data = data.replace( 

349 "{melissa_set_env_file}", str(melissa_client_bin_path) + "/melissa_set_env.sh" 

350 ) 

351 else: 

352 data = data.replace(". {melissa_set_env_file}", "") 

353 data = data.replace("{executable_command}", config['client_config']['executable_command']) 

354 command_default_args = config['client_config'].get('command_default_args', []) 

355 command_default_args = " ".join(command_default_args) 

356 data = data.replace("{command_default_args}", command_default_args) 

357 

358 with open(out_path / "client.sh", "w") as f: 

359 f.write(data) 

360 

361 preproc_instr = config['client_config'].get("preprocessing_commands") 

362 set_user_defined_preprocessing_instructions(preproc_instr, out_path / "client.sh") 

363 

364 with open(out_path / "server.sh", "r") as f: 

365 data = f.read() 

366 if os.path.exists(str(melissa_server_bin_path) + "/melissa_set_env.sh"): 

367 data = data.replace( 

368 '{melissa_set_env_file}', str(melissa_server_bin_path) + '/melissa_set_env.sh' 

369 ) 

370 else: 

371 data = data.replace(". {melissa_set_env_file}", "") 

372 data = data.replace('{path_to_usecase}', project_dir) 

373 data = data.replace('{config_name}', conf_name) 

374 

375 with open(out_path / "server.sh", "w") as f: 

376 f.write(data) 

377 

378 preproc_instr = config['server_config'].get("preprocessing_commands") 

379 set_user_defined_preprocessing_instructions(preproc_instr, out_path / "server.sh") 

380 

381 return out_path 

382 

383 

384def set_user_defined_preprocessing_instructions(instructions: list, file_path: Path): 

385 lines = open(file_path, 'r').readlines() 

386 out = open(file_path, 'w') 

387 lines[4] = "\n".join(instructions) + "\n" 

388 out.writelines(lines) 

389 out.close() 

390 

391 

392def check_checkpoint_metadata(sm_config: state_machine.Configuration): 

393 """ 

394 Look for the restart metadata file and set the environment variable accordingly 

395 """ 

396 if not os.path.exists('checkpoints/restart_metadata.json'): 

397 raise FileNotFoundError("No restart metadata file found. Please deactivate " 

398 "`load_from_checkpoint` in your config.") 

399 

400 with open('checkpoints/restart_metadata.json', 'r') as f: 

401 metadata = rapidjson.load(f) 

402 

403 sm_config.server_submission = metadata["MELISSA_RESTART"] 

404 

405 return 

406 

407 

408def print_head(): 

409 header = \ 

410 f""" 

411 

412 {TextColor.HEADER}+----------------------------------------------------------+{TextColor.ENDC} 

413 {TextColor.HEADER}| |{TextColor.ENDC} 

414 {TextColor.HEADER}| ______ _____________________________________________ |{TextColor.ENDC} 

415 {TextColor.HEADER}| ___ |/ /__ ____/__ /____ _/_ ___/_ ___/__ | |{TextColor.ENDC} 

416 {TextColor.HEADER}| __ /|_/ /__ __/ __ / __ / _____ \\_____ \\__ /| | |{TextColor.ENDC} 

417 {TextColor.HEADER}| _ / / / _ /___ _ /____/ / ____/ /____/ /_ ___ | |{TextColor.ENDC} 

418 {TextColor.HEADER}| /_/ /_/ /_____/ /_____/___/ /____/ /____/ /_/ |_| |{TextColor.ENDC} 

419 {TextColor.HEADER}| |{TextColor.ENDC} 

420 {TextColor.HEADER}+----------------------------------------------------------+{TextColor.ENDC} 

421 

422 """ 

423 

424 print(header) 

425 

426 

427if __name__ == "__main__": 

428 sys.exit(main())