Coverage for melissa/launcher/__main__.py: 16%
193 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
1#!/usr/bin/python3
3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31import argparse
32import contextlib
33import logging
34import os
35import shutil
36import signal
37import socket
38import sys
39import threading
40from types import FrameType
41from typing import Optional, Union, Dict, Any
43from melissa.utility import networking, time, timer
44from melissa.scheduler.scheduler import Options as SchedulerOptions
46from datetime import datetime
47# from jsonschema import Draft4Validator, validators
48# from jsonschema.exceptions import ValidationError
49from pathlib import Path
50import rapidjson
51from melissa.utility.logger import configure_logger, get_log_level_from_verbosity
52from melissa.launcher.parser import (homogenize_arguments,
53 check_protocol_choice,
54 setup_scheduler,
55 locate_executable)
57from . import event, state_machine, monitoring, processor
58from . import __version__ as launcher_version
59from .io import IoMaster
60from .queue import Queue
61from .monitoring import RestHttpServer
62from melissa.launcher.schema import validate_config, CONFIG_PARSE_MODE, print_options
64logger = logging.getLogger(__name__)
67def main() -> Union[int, str]:
68 parser = argparse.ArgumentParser(
69 prog="melissa-launcher", description="Melissa Launcher"
70 )
72 # CLI flags
74 parser.add_argument(
75 "--config_name",
76 "-c",
77 help="User defined configuration file. Path can be "
78 "relative or absolute.",
79 type=str
80 )
82 parser.add_argument(
83 "--version",
84 "-v",
85 action="version",
86 help="show the Melissa version",
87 version="%(prog)s {:s}".format(launcher_version)
88 )
90 parser.add_argument(
91 "--print-options",
92 help="Show the available configuration options",
93 action="store_true",
94 default=None
95 )
97 print_head()
99 args = parser.parse_args()
101 if args.print_options:
102 print_options()
104 conf_name, project_dir = homogenize_arguments(args.config_name)
106 with open(Path(project_dir) / f"{conf_name}.json") as json_file:
107 config_dict = rapidjson.load(json_file, parse_mode=CONFIG_PARSE_MODE)
109 args, config_dict = validate_config(args, config_dict)
110 lconfig = config_dict['launcher_config']
112 # make the output directory path
113 output_dir = make_and_configure_output_dir(config_dict, project_dir, conf_name)
115 log_level = get_log_level_from_verbosity(lconfig.get("verbosity", 3))
116 configure_logger(str(output_dir / "melissa_launcher.log"), log_level)
118 logger.info(f"melissa-launcher PID={os.getpid()}")
120 try:
121 server_executable = locate_executable(output_dir / lconfig['server_executable'])
122 except Exception as e:
123 return str(e)
125 logger.debug(f"server executable {server_executable}")
127 protocol = check_protocol_choice(lconfig)
129 scheduler_impl, sched_client_cmd, sched_server_cmd = setup_scheduler(lconfig)
131 # set up options to pass directly to the scheduler command
132 sched_client_cmd_opt = lconfig["scheduler_client_command_options"]
133 sched_server_cmd_opt = lconfig["scheduler_server_command_options"]
135 scheduler_is_available, info = scheduler_impl.is_available()
136 if scheduler_is_available:
137 logger.info(f"found {info[0]} (version {info[1]}")
138 if lconfig['scheduler'] == "oar":
139 scheduler = scheduler_impl("openmpi")
140 elif lconfig['scheduler'] == "oar-hybrid":
141 container_options = lconfig['scheduler_arg_container']
142 container_client_size = lconfig['container_client_size']
143 besteffort_alloc_freq = lconfig['besteffort_allocation_frequency']
144 scheduler = scheduler_impl(
145 "openmpi",
146 container_options,
147 container_client_size,
148 besteffort_alloc_freq
149 ) # type: ignore
150 else:
151 scheduler = scheduler_impl() # type: ignore
152 else:
153 logger.error(f"scheduler {lconfig['scheduler']} unusable: {info}")
154 return 1
156 del scheduler_impl
158 client_options = SchedulerOptions(
159 sched_client_cmd,
160 sched_client_cmd_opt,
161 lconfig['scheduler_arg'] + lconfig['scheduler_arg_client']
162 )
163 server_options = SchedulerOptions(
164 sched_server_cmd,
165 sched_server_cmd_opt,
166 lconfig['scheduler_arg'] + lconfig['scheduler_arg_server']
167 )
169 # set up std output files option
170 std_output = lconfig["std_output"]
171 logger.info(f"std outputs {std_output}")
173 # set up launcher scheduling constraints
174 job_limit = lconfig["job_limit"]
175 timer_delay = lconfig.get("timer_delay", 5)
176 job_update_interval = lconfig.get("timer_delay", 30)
178 # CHANGE DIRECTORY
179 # KEEP THIS IN MIND WHEN HANDLING RELATIVE USER-PROVIDED PATHS.
180 os.chdir(output_dir)
182 # set up sockets
183 signalfd_r, signalfd_w = networking.pipe()
184 timerfd_0, timerfd_1 = networking.socketpair()
185 eventfd_r, eventfd_w = networking.pipe()
186 listenfd = networking.make_passive_socket(node=lconfig['bind'], protocol=protocol)
188 # do not limit the queue size because the thread reading from the queue
189 # will also put items into it
190 event_fifo = Queue(eventfd_w)
191 event_fifo.put(event.Timeout())
193 def handle_signal(signo: int, _: Optional[FrameType]) -> None:
194 signame = signal.Signals(signo).name
195 logger.info(f"received signal {signo} {signame}")
196 bs = signo.to_bytes(1, byteorder=sys.byteorder)
197 signalfd_w.send(bs)
199 with contextlib.ExitStack() as cm:
200 # reset the signal handlers before closing the file descriptors
201 signalfd_r = cm.enter_context(signalfd_r)
202 signalfd_w = cm.enter_context(signalfd_w)
203 timerfd_0 = cm.enter_context(timerfd_0)
204 # timerfd_1 is closed manually after Timer thread stopped
205 eventfd_r = cm.enter_context(eventfd_r)
206 eventfd_w = cm.enter_context(eventfd_w)
207 listenfd = cm.enter_context(listenfd)
209 signals = [signal.SIGINT, signal.SIGPIPE, signal.SIGTERM]
210 for s in signals:
211 signal.signal(s, handle_signal)
212 # reset the signal handlers before closing the signal file
213 # descriptors
214 cm.callback(signal.signal, s, signal.SIG_DFL)
216 # set up state machine
217 _, port = listenfd.getsockname()
218 logger.info(f"fault-tolerance {bool(lconfig['fault_tolerance'])}")
219 sm_config = state_machine.Configuration(
220 cwd=os.getcwd(),
221 launcher_host=socket.gethostname(),
222 launcher_port=port,
223 launcher_protocol=networking.protocol2str(protocol),
224 server_executable=server_executable,
225 no_fault_tolerance=not lconfig['fault_tolerance'],
226 server_ping_interval=lconfig.get('server_timeout', 60.) / 2.,
227 job_update_interval=job_update_interval,
228 )
229 initial_state = state_machine.State()
231 # set up web server, processor
232 http_server_address = (
233 "" if lconfig['bind'] is None else lconfig['bind'], lconfig['http_port']
234 )
235 web_token = monitoring.generate_token(
236 ) if not lconfig['http_token'] else lconfig['http_token']
237 logger.info(f"generated web token {web_token}")
239 print("Access the terminal-based Melissa monitor by opening a new "
240 "terminal and executing:\n\n"
241 "melissa-monitor "
242 f"--http_bind={lconfig['bind']} "
243 f"--http_port={lconfig['http_port']} "
244 f"--http_token={web_token} "
245 f"--output_dir={output_dir} \n"
246 )
248 print(f"All output for current run will be written to {output_dir}\n")
249 if config_dict.get("vscode_debugging", False):
250 print("**`vscode_debugger` active in user config**\n"
251 "Waiting for debugger attach, please start the "
252 "debugger by navigating to debugger pane, "
253 "ctrl+shift+d, and selecting:\n"
254 "Python: Remote Attach")
256 if lconfig["load_from_checkpoint"]:
257 # look for checkpoint files in the current directory
258 # set the MELISSA_RESTART environment variable accordingly
259 check_checkpoint_metadata(sm_config)
261 web_server = RestHttpServer(
262 http_server_address, web_token, initial_state
263 )
264 event_processor = processor.DefaultProcessor(
265 sm_config, initial_state, web_server
266 )
267 webfd = socket.fromfd(
268 web_server.fileno(), socket.AF_INET, socket.SOCK_STREAM
269 )
271 _, web_port = webfd.getsockname()
272 logger.info(f"webserver listening on port {web_port}")
274 timer_thread = timer.Timer(timerfd_1, interval=time.Time(seconds=timer_delay))
275 t_timer = threading.Thread(
276 target=lambda: timer_thread.run(), daemon=True
277 )
278 io_master = IoMaster(
279 listenfd, signalfd_r, timerfd_0, webfd, eventfd_r, event_fifo,
280 scheduler, client_options, server_options, event_processor, protocol,
281 job_limit=job_limit, std_output=std_output
282 )
283 t_timer.start()
284 status = io_master.run()
286 t_timer.join(timeout=1)
287 if t_timer.is_alive():
288 logger.warning("timer thread did not terminate")
289 else:
290 timerfd_1.close()
292 return status
295def make_and_configure_output_dir(config: Dict[str, Any],
296 project_dir: str, conf_name: str) -> Path:
297 """
298 Create the user defined output directory, copy the client/server templates to the directory,
299 then
300 modify the scripts to point to the proper executables.
301 """
303 default_out_dir = datetime.now().strftime('melissa-%Y%m%dT%H%M%S')
304 output_dir = config.get('output_dir', default_out_dir)
305 logger.info(f'Writing output to {str(output_dir)}')
307 # allow relative or absolute path passing for output dir
308 proj_path = Path(project_dir)
309 if output_dir.startswith('/'):
310 out_path = Path(output_dir)
311 else:
312 out_path = Path(proj_path) / output_dir
314 out_path.mkdir(parents=True, exist_ok=True)
315 melissa_src_path = Path(__file__).parent.parent.parent.resolve()
316 melissa_server_bin_path = config['server_config'].get(
317 'melissa_server_env',
318 Path(__file__).parent.parent.parent.resolve()
319 )
320 melissa_client_bin_path = config['client_config'].get(
321 'melissa_client_env',
322 Path(__file__).parent.parent.parent.resolve()
323 )
324 bash_script_path = melissa_src_path / 'melissa/utility/bash_scripts'
326 # create output dir structure
327 Path(out_path / 'client_scripts').mkdir(parents=True, exist_ok=True)
328 Path(out_path / 'stdout').mkdir(parents=True, exist_ok=True)
330 # copy all the launch files to the output dir
331 shutil.copy(proj_path / f"{conf_name}.json", out_path / f"{conf_name}.json")
332 shutil.copy(bash_script_path / "server.sh", out_path / "server.sh")
333 os.chmod(out_path / "server.sh", 0o765)
334 shutil.copy(bash_script_path / "client.sh", out_path / "client.sh")
335 os.chmod(out_path / "client.sh", 0o765)
337 with open(out_path / "client.sh", "r") as f:
338 data = f.read()
339 if os.path.exists(str(melissa_client_bin_path) + "/melissa_set_env.sh"):
340 data = data.replace(
341 "{melissa_set_env_file}", str(melissa_client_bin_path) + "/melissa_set_env.sh"
342 )
343 else:
344 data = data.replace(". {melissa_set_env_file}", "")
345 data = data.replace("{executable_command}", config['client_config']['executable_command'])
347 with open(out_path / "client.sh", "w") as f:
348 f.write(data)
350 preproc_instr = config['client_config'].get("preprocessing_commands")
351 set_user_defined_preprocessing_instructions(preproc_instr, out_path / "client.sh")
353 with open(out_path / "server.sh", "r") as f:
354 data = f.read()
355 if os.path.exists(str(melissa_server_bin_path) + "/melissa_set_env.sh"):
356 data = data.replace(
357 '{melissa_set_env_file}', str(melissa_server_bin_path) + '/melissa_set_env.sh'
358 )
359 else:
360 data = data.replace(". {melissa_set_env_file}", "")
361 data = data.replace('{path_to_usecase}', project_dir)
362 data = data.replace('{config_name}', conf_name)
364 with open(out_path / "server.sh", "w") as f:
365 f.write(data)
367 preproc_instr = config['server_config'].get("preprocessing_commands")
368 set_user_defined_preprocessing_instructions(preproc_instr, out_path / "server.sh")
370 return out_path
373def set_user_defined_preprocessing_instructions(instructions: list, file_path: Path):
374 lines = open(file_path, 'r').readlines()
375 out = open(file_path, 'w')
376 lines[4] = "\n".join(instructions) + "\n"
377 out.writelines(lines)
378 out.close()
381def check_checkpoint_metadata(sm_config: state_machine.Configuration):
382 """
383 Look for the restart metadata file and set the environment variable accordingly
384 """
385 if not os.path.exists('checkpoints/restart_metadata.json'):
386 raise FileNotFoundError("No restart metadata file found. Please deactivate "
387 "`load_from_checkpoint` in your config.")
389 with open('checkpoints/restart_metadata.json', 'r') as f:
390 metadata = rapidjson.load(f)
392 sm_config.server_submission = metadata["MELISSA_RESTART"]
394 return
397def print_head():
398 print("")
399 print("$! " + "-" * 46 + " $!")
400 print(
401 " __ __ ______ _ _____ _____ _____ " # noqa: E501,W605
402 )
403 print(
404 " | \/ | ____| | |_ _|/ ____/ ____| /\ " # noqa: E501,W605
405 )
406 print(
407 " | \ / | |__ | | | | | (___| (___ / \ " # noqa: E501,W605
408 )
409 print(
410 " | |\/| | __| | | | | \___ \\\\___ \ / /\ \ " # noqa: E501,W605
411 )
412 print(
413 " | | | | |____| |____ _| |_ ____) |___) / ____ \ " # noqa: E501,W605
414 )
415 print(
416 " |_| |_|______|______|_____|_____/_____/_/ \_\ " # noqa: E501,W605
417 )
418 print("")
419 print("$! " + "-" * 46 + " $!")
420 print("")
423if __name__ == "__main__":
424 sys.exit(main())