Coverage for melissa/launcher/__main__.py: 17%
193 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-07-09 14:19 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-07-09 14:19 +0200
1#!/usr/bin/python3
3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31import argparse
32import contextlib
33import logging
34import os
35import shutil
36import signal
37import socket
38import sys
39import threading
40from types import FrameType
41from typing import Optional, Union, Dict, Any
43from melissa.utility import networking, time, timer
44from melissa.scheduler.scheduler import Options as SchedulerOptions
46from datetime import datetime
47# from jsonschema import Draft4Validator, validators
48# from jsonschema.exceptions import ValidationError
49from pathlib import Path
50import rapidjson
51from melissa.utility.logger import configure_logger, get_log_level_from_verbosity
52from melissa.launcher.parser import (homogenize_arguments,
53 check_protocol_choice,
54 setup_scheduler,
55 locate_executable)
57from . import event, state_machine, monitoring, processor
58from . import __version__ as launcher_version
59from .io import IoMaster
60from .queue import Queue
61from .monitoring import RestHttpServer
62from melissa.utility.bcolors import TextColor
63from melissa.launcher.schema import validate_config, CONFIG_PARSE_MODE, print_options
65logger = logging.getLogger(__name__)
68def main() -> Union[int, str]:
69 parser = argparse.ArgumentParser(
70 prog="melissa-launcher", description="Melissa Launcher"
71 )
73 # CLI flags
75 parser.add_argument(
76 "--config_name",
77 "-c",
78 help="User defined configuration file. Path can be "
79 "relative or absolute.",
80 type=str
81 )
83 parser.add_argument(
84 "--version",
85 "-v",
86 action="version",
87 help="show the Melissa version",
88 version="%(prog)s {:s}".format(launcher_version)
89 )
91 parser.add_argument(
92 "--print-options",
93 help="Show the available configuration options",
94 action="store_true",
95 default=None
96 )
97 print_head()
99 args = parser.parse_args()
101 if args.print_options:
102 print_options()
104 conf_name, project_dir = homogenize_arguments(args.config_name)
106 with open(Path(project_dir) / f"{conf_name}.json") as json_file:
107 config_dict = rapidjson.load(json_file, parse_mode=CONFIG_PARSE_MODE)
109 args, config_dict = validate_config(args, config_dict)
110 lconfig = config_dict['launcher_config']
112 # make the output directory path
113 output_dir = make_and_configure_output_dir(config_dict, project_dir, conf_name)
115 log_level = get_log_level_from_verbosity(lconfig.get("verbosity", 3))
116 configure_logger(str(output_dir / "melissa_launcher.log"), log_level)
118 logger.info(f"melissa-launcher PID={os.getpid()}")
120 try:
121 server_executable = locate_executable(output_dir / lconfig['server_executable'])
122 except Exception as e:
123 return str(e)
125 logger.debug(f"server executable {server_executable}")
127 protocol = check_protocol_choice(lconfig)
129 scheduler_impl, sched_client_cmd, sched_server_cmd = setup_scheduler(lconfig)
131 # set up options to pass directly to the scheduler command
132 sched_client_cmd_opt = lconfig["scheduler_client_command_options"]
133 sched_server_cmd_opt = lconfig["scheduler_server_command_options"]
135 scheduler_is_available, info = scheduler_impl.is_available()
136 if scheduler_is_available:
137 logger.info(f"found {info[0]} (version {info[1]}")
138 if lconfig['scheduler'] == "oar":
139 scheduler = scheduler_impl("openmpi")
140 elif lconfig['scheduler'] == "oar-hybrid":
141 container_options = lconfig['scheduler_arg_container']
142 container_client_size = lconfig['container_client_size']
143 besteffort_alloc_freq = lconfig['besteffort_allocation_frequency']
144 scheduler = scheduler_impl(
145 "openmpi",
146 container_options,
147 container_client_size,
148 besteffort_alloc_freq
149 ) # type: ignore
150 else:
151 scheduler = scheduler_impl() # type: ignore
152 else:
153 logger.error(f"scheduler {lconfig['scheduler']} unusable: {info}")
154 return 1
156 del scheduler_impl
158 client_options = SchedulerOptions(
159 sched_client_cmd,
160 sched_client_cmd_opt,
161 lconfig['scheduler_arg'] + lconfig['scheduler_arg_client']
162 )
163 server_options = SchedulerOptions(
164 sched_server_cmd,
165 sched_server_cmd_opt,
166 lconfig['scheduler_arg'] + lconfig['scheduler_arg_server']
167 )
169 # set up std output files option
170 std_output = lconfig["std_output"]
171 logger.info(f"std outputs {std_output}")
173 # set up launcher scheduling constraints
174 job_limit = lconfig["job_limit"]
175 timer_delay = lconfig.get("timer_delay", 5)
176 job_update_interval = lconfig.get("timer_delay", 30)
178 # CHANGE DIRECTORY
179 # KEEP THIS IN MIND WHEN HANDLING RELATIVE USER-PROVIDED PATHS.
180 os.chdir(output_dir)
182 # set up sockets
183 signalfd_r, signalfd_w = networking.pipe()
184 timerfd_0, timerfd_1 = networking.socketpair()
185 eventfd_r, eventfd_w = networking.pipe()
186 listenfd = networking.make_passive_socket(node=lconfig['bind'], protocol=protocol)
188 # do not limit the queue size because the thread reading from the queue
189 # will also put items into it
190 event_fifo = Queue(eventfd_w)
191 event_fifo.put(event.Timeout())
193 def handle_signal(signo: int, _: Optional[FrameType]) -> None:
194 signame = signal.Signals(signo).name
195 logger.info(f"received signal {signo} {signame}")
196 bs = signo.to_bytes(1, byteorder=sys.byteorder)
197 signalfd_w.send(bs)
199 with contextlib.ExitStack() as cm:
200 # reset the signal handlers before closing the file descriptors
201 signalfd_r = cm.enter_context(signalfd_r)
202 signalfd_w = cm.enter_context(signalfd_w)
203 timerfd_0 = cm.enter_context(timerfd_0)
204 # timerfd_1 is closed manually after Timer thread stopped
205 eventfd_r = cm.enter_context(eventfd_r)
206 eventfd_w = cm.enter_context(eventfd_w)
207 listenfd = cm.enter_context(listenfd)
209 signals = [signal.SIGINT, signal.SIGPIPE, signal.SIGTERM]
210 for s in signals:
211 signal.signal(s, handle_signal)
212 # reset the signal handlers before closing the signal file
213 # descriptors
214 cm.callback(signal.signal, s, signal.SIG_DFL)
216 # set up state machine
217 _, port = listenfd.getsockname()
218 logger.info(f"fault-tolerance {bool(lconfig['fault_tolerance'])}")
219 sm_config = state_machine.Configuration(
220 cwd=os.getcwd(),
221 launcher_host=socket.gethostname(),
222 launcher_port=port,
223 launcher_protocol=networking.protocol2str(protocol),
224 server_executable=server_executable,
225 no_fault_tolerance=not lconfig['fault_tolerance'],
226 server_ping_interval=lconfig.get('server_timeout', 60.) / 2.,
227 job_update_interval=job_update_interval,
228 )
229 initial_state = state_machine.State()
231 # set up web server, processor
232 http_server_address = (
233 "" if lconfig['bind'] is None else lconfig['bind'], lconfig['http_port']
234 )
235 web_token = monitoring.generate_token(
236 ) if not lconfig['http_token'] else lconfig['http_token']
237 logger.info(f"generated web token {web_token}")
239 print(f"{TextColor.BOLD}\nCurrent outputs directory: {output_dir}\n{TextColor.ENDC}")
241 print(f"{TextColor.UNDERLINE}Monitor melissa runs using the following command in "
242 f"a different terminal{TextColor.ENDC}:\n\n"
243 "melissa-monitor "
244 f"--http_bind={lconfig['bind']} \\\n"
245 f"\t--http_port={lconfig['http_port']} \\\n"
246 f"\t--http_token={web_token} \\\n"
247 f"\t--output_dir={output_dir}\n\n"
248 )
250 if config_dict.get("vscode_debugging", False):
251 print("**`vscode_debugger` active in user config**\n"
252 "Waiting for debugger attach, please start the "
253 "debugger by navigating to debugger pane, "
254 "ctrl+shift+d, and selecting:\n"
255 "Python: Remote Attach")
257 print(f"{TextColor.YELLOW}Melissa study running...{TextColor.ENDC}\n")
258 if lconfig["load_from_checkpoint"]:
259 # look for checkpoint files in the current directory
260 # set the MELISSA_RESTART environment variable accordingly
261 check_checkpoint_metadata(sm_config)
263 web_server = RestHttpServer(
264 http_server_address, web_token, initial_state
265 )
266 event_processor = processor.DefaultProcessor(
267 sm_config, initial_state, web_server
268 )
269 webfd = socket.fromfd(
270 web_server.fileno(), socket.AF_INET, socket.SOCK_STREAM
271 )
273 _, web_port = webfd.getsockname()
274 logger.info(f"webserver listening on port {web_port}")
276 timer_thread = timer.Timer(timerfd_1, interval=time.Time(seconds=timer_delay))
277 t_timer = threading.Thread(
278 target=lambda: timer_thread.run(), daemon=True
279 )
280 io_master = IoMaster(
281 listenfd, signalfd_r, timerfd_0, webfd, eventfd_r, event_fifo,
282 scheduler, client_options, server_options, event_processor, protocol,
283 job_limit=job_limit, std_output=std_output
284 )
285 t_timer.start()
286 status = io_master.run()
288 t_timer.join(timeout=1)
289 if t_timer.is_alive():
290 logger.warning("timer thread did not terminate")
291 else:
292 timerfd_1.close()
294 if status > 0:
295 print(f"{TextColor.FAIL}Error(s) encountered during Melissa study. "
296 f"Check the log files.\n{TextColor.ENDC}")
297 else:
298 print(f"{TextColor.OKGREEN}Melissa study completed.\n{TextColor.ENDC}")
299 return status
302def make_and_configure_output_dir(config: Dict[str, Any],
303 project_dir: str, conf_name: str) -> Path:
304 """
305 Create the user defined output directory, copy the client/server templates to the directory,
306 then
307 modify the scripts to point to the proper executables.
308 """
310 default_out_dir = datetime.now().strftime('melissa-%Y%m%dT%H%M%S')
311 output_dir = config.get('output_dir', default_out_dir)
312 logger.info(f'Writing output to {str(output_dir)}')
314 # allow relative or absolute path passing for output dir
315 proj_path = Path(project_dir)
316 if output_dir.startswith('/'):
317 out_path = Path(output_dir)
318 else:
319 out_path = Path(proj_path) / output_dir
321 out_path.mkdir(parents=True, exist_ok=True)
322 melissa_src_path = Path(__file__).parent.parent.parent.resolve()
323 melissa_server_bin_path = config['server_config'].get(
324 'melissa_server_env',
325 Path(__file__).parent.parent.parent.resolve()
326 )
327 melissa_client_bin_path = config['client_config'].get(
328 'melissa_client_env',
329 Path(__file__).parent.parent.parent.resolve()
330 )
331 bash_script_path = melissa_src_path / 'melissa/utility/bash_scripts'
333 # create output dir structure
334 Path(out_path / 'client_scripts').mkdir(parents=True, exist_ok=True)
335 Path(out_path / 'stdout').mkdir(parents=True, exist_ok=True)
337 # copy all the launch files to the output dir
338 shutil.copy(proj_path / f"{conf_name}.json", out_path / f"{conf_name}.json")
339 os.chmod(out_path / f"{conf_name}.json", 0o644)
340 shutil.copy(bash_script_path / "server.sh", out_path / "server.sh")
341 os.chmod(out_path / "server.sh", 0o765)
342 shutil.copy(bash_script_path / "client.sh", out_path / "client.sh")
343 os.chmod(out_path / "client.sh", 0o765)
345 with open(out_path / "client.sh", "r") as f:
346 data = f.read()
347 if os.path.exists(str(melissa_client_bin_path) + "/melissa_set_env.sh"):
348 data = data.replace(
349 "{melissa_set_env_file}", str(melissa_client_bin_path) + "/melissa_set_env.sh"
350 )
351 else:
352 data = data.replace(". {melissa_set_env_file}", "")
353 data = data.replace("{executable_command}", config['client_config']['executable_command'])
354 command_default_args = config['client_config'].get('command_default_args', [])
355 command_default_args = " ".join(command_default_args)
356 data = data.replace("{command_default_args}", command_default_args)
358 with open(out_path / "client.sh", "w") as f:
359 f.write(data)
361 preproc_instr = config['client_config'].get("preprocessing_commands")
362 set_user_defined_preprocessing_instructions(preproc_instr, out_path / "client.sh")
364 with open(out_path / "server.sh", "r") as f:
365 data = f.read()
366 if os.path.exists(str(melissa_server_bin_path) + "/melissa_set_env.sh"):
367 data = data.replace(
368 '{melissa_set_env_file}', str(melissa_server_bin_path) + '/melissa_set_env.sh'
369 )
370 else:
371 data = data.replace(". {melissa_set_env_file}", "")
372 data = data.replace('{path_to_usecase}', project_dir)
373 data = data.replace('{config_name}', conf_name)
375 with open(out_path / "server.sh", "w") as f:
376 f.write(data)
378 preproc_instr = config['server_config'].get("preprocessing_commands")
379 set_user_defined_preprocessing_instructions(preproc_instr, out_path / "server.sh")
381 return out_path
384def set_user_defined_preprocessing_instructions(instructions: list, file_path: Path):
385 lines = open(file_path, 'r').readlines()
386 out = open(file_path, 'w')
387 lines[4] = "\n".join(instructions) + "\n"
388 out.writelines(lines)
389 out.close()
392def check_checkpoint_metadata(sm_config: state_machine.Configuration):
393 """
394 Look for the restart metadata file and set the environment variable accordingly
395 """
396 if not os.path.exists('checkpoints/restart_metadata.json'):
397 raise FileNotFoundError("No restart metadata file found. Please deactivate "
398 "`load_from_checkpoint` in your config.")
400 with open('checkpoints/restart_metadata.json', 'r') as f:
401 metadata = rapidjson.load(f)
403 sm_config.server_submission = metadata["MELISSA_RESTART"]
405 return
408def print_head():
409 header = \
410 f"""
412 {TextColor.HEADER}+----------------------------------------------------------+{TextColor.ENDC}
413 {TextColor.HEADER}| |{TextColor.ENDC}
414 {TextColor.HEADER}| ______ _____________________________________________ |{TextColor.ENDC}
415 {TextColor.HEADER}| ___ |/ /__ ____/__ /____ _/_ ___/_ ___/__ | |{TextColor.ENDC}
416 {TextColor.HEADER}| __ /|_/ /__ __/ __ / __ / _____ \\_____ \\__ /| | |{TextColor.ENDC}
417 {TextColor.HEADER}| _ / / / _ /___ _ /____/ / ____/ /____/ /_ ___ | |{TextColor.ENDC}
418 {TextColor.HEADER}| /_/ /_/ /_____/ /_____/___/ /____/ /____/ /_/ |_| |{TextColor.ENDC}
419 {TextColor.HEADER}| |{TextColor.ENDC}
420 {TextColor.HEADER}+----------------------------------------------------------+{TextColor.ENDC}
422 """
424 print(header)
427if __name__ == "__main__":
428 sys.exit(main())