Coverage for melissa/scheduler/slurm_openmpi.py: 20%

54 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31import logging 

32import os 

33from typing import List, Tuple 

34from pathlib import Path 

35 

36from melissa.utility.process import ArgumentList, Environment 

37 

38from .scheduler import Options 

39from .slurm import SlurmScheduler 

40 

41logger = logging.getLogger(__name__) 

42 

43 

44class OpenmpiSlurmScheduler(SlurmScheduler): 

45 

46 def _sanity_check_impl(self, options: Options) -> List[str]: 

47 args = options.raw_arguments 

48 es = [] 

49 

50 for a in args: 

51 e = None 

52 if "do-not-launch" in a: 

53 e = f"remove `{a}` argument" 

54 elif a in ["-N", "-c", "-n", "--n", "-np"]: 

55 e = f"remove `{a}` argument" 

56 

57 if e is not None: 

58 es.append(e) 

59 

60 return es 

61 

62 def _submit_job_impl( 

63 self, commands: List[ArgumentList], 

64 env: Environment, options: Options, 

65 name: str, 

66 uid: int 

67 ) -> Tuple[ArgumentList, Environment]: 

68 

69 # job submission with mpirun MPMD syntax 

70 # by mixing slurm and openmpi submit functions 

71 sbatch_env = os.environ.copy() 

72 sbatch_env.update(env) 

73 

74 output_filename = f"./stdout/job.{uid}.{name}.out" 

75 error_filename = f"./stdout/job.{uid}.{name}.err" 

76 

77 propagated_options = [ 

78 f"--output={output_filename}", 

79 f"--error={error_filename}" 

80 ] 

81 

82 # get #SBATCH options from scheduler_arg options: 

83 # -> sbatch options (e.g. -N or --nodes) correspond to JOB values 

84 job_options: List[str] = [] 

85 env_args: List[str] = [] 

86 job_options = options.raw_arguments 

87 

88 sbatch_options = propagated_options + job_options 

89 

90 # serialize sbatch options 

91 def options2str(options: str) -> str: 

92 return "#SBATCH " + options 

93 

94 sbatch_options_str = [options2str(o) for o in sbatch_options] 

95 

96 # assemble mpirun arguments 

97 ompi_env = os.environ.copy() 

98 for key in sorted(env.keys()): 

99 ompi_env[key] = env[key] 

100 env_args += ["-x", key] 

101 

102 # get mpirun options from scheduler_command_options: 

103 # -> mpirun options (e.g. -n NTASKS, --oversubscribe, -x MELISSA_COUPLING=1) 

104 # where per-tasks options are SIMULATION values 

105 ompi_options = options.sched_cmd_opt 

106 

107 ompi_commands: List[str] = [] 

108 for i, cmd in enumerate(commands): 

109 ompi_cmd = ( 

110 ompi_options 

111 + env_args 

112 + ["--"] 

113 + cmd 

114 + ([":"] if i + 1 < len(commands) else []) 

115 ) 

116 

117 ompi_commands = ompi_commands + ompi_cmd 

118 

119 sched_cmd = options.sched_cmd 

120 if not sched_cmd: 

121 raise ValueError("the executable command cannot be empty with this scheduler") 

122 

123 # write srun calls to file 

124 Path('./sbatch').mkdir(parents=True, exist_ok=True) 

125 sbatch_script_filename = f"./sbatch/sbatch.{uid}.sh" 

126 sbatch_script = ["#!/bin/sh"] \ 

127 + [f"# sbatch script for job {name}"] \ 

128 + sbatch_options_str \ 

129 + [""] \ 

130 + [f"exec {sched_cmd} \\"] \ 

131 + [" ".join(ompi_commands)] 

132 

133 sbatch_script_str_noeol = "\n".join(sbatch_script) 

134 

135 with open(sbatch_script_filename, "w") as f: 

136 print(sbatch_script_str_noeol, file=f) 

137 

138 sbatch_call = ( 

139 ["sbatch"] 

140 + ["--parsable"] 

141 + ["--job-name={:s}".format(name)] 

142 + [sbatch_script_filename] 

143 ) 

144 

145 return sbatch_call, sbatch_env