Coverage for melissa/scheduler/slurm_global.py: 28%

138 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31import logging 

32import os 

33import re 

34import shutil 

35import subprocess 

36from types import ModuleType 

37from typing import cast, List, Tuple, Union 

38 

39from melissa.utility import time 

40from melissa.utility.process import ArgumentList, Environment, Process 

41 

42from .job import Id, Job, State 

43from .scheduler import DirectScheduler, Options 

44 

45logging = cast(ModuleType, logging.getLogger(__name__)) 

46 

47 

48class SrunJob(Job): 

49 def __init__(self, uid: Id, process: "subprocess.Popen[str]") -> None: 

50 super().__init__() 

51 self._uid = uid 

52 self._process = process 

53 self._state = State.RUNNING 

54 

55 def id(self) -> Id: 

56 return self._process.pid 

57 

58 def unique_id(self) -> Union[str, int]: 

59 return self._uid 

60 

61 def state(self) -> State: 

62 return self._state 

63 

64 def __repr__(self) -> str: 

65 r = "SrunJob(id={:d},state={:s})".format(self.id(), str(self._state)) 

66 return r 

67 

68 

69class SlurmGlobalScheduler(DirectScheduler[SrunJob]): 

70 # always compile regular expressions to enforce ASCII matching 

71 # allow matching things like version 1.2.3-rc4 

72 _srun_version_regexp = r"slurm (\d+)[.](\d+)[.](\d+\S*)" 

73 _srun_version_pattern = re.compile(_srun_version_regexp, re.ASCII) 

74 _sbatch_job_id_regexp = r"(\d+)" 

75 _sbatch_job_id_pattern = re.compile(_sbatch_job_id_regexp, re.ASCII) 

76 _sacct_line_regexp = r"(\d+)([+]0[.]batch)?[|](\w+)" 

77 _sacct_line_pattern = re.compile(_sacct_line_regexp, re.ASCII) 

78 _use_het_prefix: bool = False 

79 

80 @classmethod 

81 def _name_impl(cls) -> str: 

82 return "slurm" 

83 

84 @classmethod 

85 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

86 srun_path = shutil.which("srun") 

87 if srun_path is None: 

88 return False, "srun executable not found" 

89 

90 srun = subprocess.run( 

91 [srun_path, "--version"], 

92 stdin=subprocess.DEVNULL, 

93 stdout=subprocess.PIPE, 

94 stderr=subprocess.PIPE, 

95 universal_newlines=True, 

96 ) 

97 if srun.returncode != 0: 

98 return False, "failed to execute %s: %s" % (srun_path, srun.stderr) 

99 

100 # do not use pattern.fullmatch() because of the newline at the end of 

101 # the output 

102 match = cls._srun_version_pattern.match(srun.stdout) 

103 if match is None: 

104 e = "srun output '{:s}' does not match expected format" 

105 raise ValueError(e.format(srun.stdout)) 

106 

107 version_major = int(match.group(1)) 

108 # the minor version might be of the form `05` but the Python int() 

109 # function handles this correctly 

110 version_minor = int(match.group(2)) 

111 version_patch = int(match.group(3)) 

112 

113 if version_major < 19 or (version_major == 19 and version_minor < 5): 

114 logging.warn( 

115 "Melissa has not been tested with Slurm versions older than 19.05.5", RuntimeWarning 

116 ) 

117 

118 if version_major < 17 or (version_major == 17 and version_minor < 11): 

119 return ( 

120 False, 

121 ( 

122 "Expected at least Slurm 17.11, got" 

123 f"{version_major}.{version_minor}.{version_patch}" 

124 "which does not support heterogeneous jobs" 

125 ), 

126 ) 

127 

128 cls._use_het_prefix = version_major >= 20 

129 

130 version_str = srun.stdout[match.span(1)[0] : match.span(3)[1]] 

131 return True, (srun_path, version_str) 

132 

133 def __init__(self) -> None: 

134 is_available, info = self.is_available() 

135 if not is_available: 

136 raise RuntimeError("Slurm unavailable: %s" % (info,)) 

137 

138 assert self._use_het_prefix is not None 

139 

140 def _sanity_check_impl(self, options: Options) -> List[str]: 

141 args = options.raw_arguments 

142 errors = [] 

143 

144 for a in args: 

145 if a[0] != "-": 

146 errors.append("non-option argument '{:s}' detected".format(a)) 

147 elif a in ["-n", "--ntasks", "--test-only"]: 

148 errors.append("remove '{:s}' argument".format(a)) 

149 

150 command = ["srun", "--test-only", "--ntasks=1"] + args + ["--", "true"] 

151 srun = subprocess.run( 

152 command, 

153 stdin=subprocess.DEVNULL, 

154 stdout=subprocess.DEVNULL, 

155 stderr=subprocess.PIPE, 

156 universal_newlines=True, 

157 ) 

158 if srun.returncode != 0: 

159 e = "srun error on trial execution: {:s}".format(srun.stderr) 

160 errors.append(e) 

161 

162 return errors 

163 

164 def _submit_job_impl( 

165 self, 

166 commands: List[ArgumentList], 

167 env: Environment, 

168 options: Options, 

169 name: str, 

170 unique_id: int, 

171 ) -> Tuple[ArgumentList, Environment]: 

172 # Approach to environment variables: 

173 # By default all environment variables are propagated 

174 srun_env = os.environ.copy() 

175 srun_env.update(env) 

176 

177 output_filename = "./stdout/job.{:d}.{:s}.out".format(unique_id, name) 

178 error_filename = "./stdout/job.{:d}.{:s}.err".format(unique_id, name) 

179 

180 # build propagated options of the srun command line 

181 propagated_options = [ 

182 "--output={:s}".format(output_filename), 

183 "--error={:s}".format(error_filename), 

184 ] + options.raw_arguments 

185 

186 sched_cmd = options.sched_cmd 

187 sched_cmd_opt = options.sched_cmd_opt 

188 

189 assert len(commands) == 1, "non-unit groups are not supported in this configuration" 

190 srun_arguments: List[List[str]] = [] 

191 if not sched_cmd: 

192 srun_arguments = [commands[0]] 

193 else: 

194 srun_arguments = [["--"], commands[0]] 

195 

196 def args2str(args: List[str]) -> str: 

197 return "" + "".join(args) 

198 logging.debug(f"srun_arguments: {srun_arguments}") 

199 srun_arguments_str = [args2str(args) for args in srun_arguments] 

200 logging.debug(f"srun_arguments_str: {srun_arguments_str}") 

201 

202 # write srun/job execution call 

203 if not sched_cmd: 

204 srun_call = srun_arguments_str 

205 else: 

206 srun_call = [sched_cmd] + sched_cmd_opt + propagated_options + srun_arguments_str 

207 

208 return srun_call, srun_env 

209 

210 def _make_job_impl(self, proc: "Process[str]", unique_id: int) -> SrunJob: 

211 return SrunJob(unique_id, proc) 

212 

213 @classmethod 

214 def _update_jobs_impl(cls, jobs: List[SrunJob]) -> None: 

215 for j in jobs: 

216 returncode = j._process.poll() 

217 if returncode is None: 

218 state = State.RUNNING 

219 elif returncode == 0: 

220 state = State.TERMINATED 

221 else: 

222 state = State.FAILED 

223 j._state = state 

224 

225 @classmethod 

226 def _cancel_jobs_impl(cls, jobs: List[SrunJob]) -> None: 

227 # when the user presses ctrl+c, the shell will send all processes in 

228 # the same process group SIGINT. some programs respond intelligently to 

229 # signals by freeing ressources and exiting. these programs may also 

230 # exit _immediately_ if they receive a second signal within a short 

231 # time frame (e.g., srun or mpirun which won't terminate its child 

232 # processes in this case). for this reason, we wait before terminating 

233 # jobs here. 

234 max_wait_time = time.Time(seconds=5) 

235 

236 # wait at most max_wait_time overall 

237 def compute_timeout(t_start: time.Time) -> time.Time: 

238 t_waited = time.monotonic() - t_start 

239 if t_waited < max_wait_time: 

240 return max_wait_time - t_waited 

241 return time.Time(seconds=0) 

242 

243 # terminate processes 

244 t_0 = time.monotonic() 

245 for j in jobs: 

246 try: 

247 timeout = compute_timeout(t_0) 

248 j._process.wait(timeout.total_seconds()) 

249 except subprocess.TimeoutExpired: 

250 logging.debug("Slurm srun scheduler terminating process %d", j.id()) 

251 j._process.terminate() 

252 

253 # kill processes if necessary 

254 t_1 = time.monotonic() 

255 for j in jobs: 

256 try: 

257 timeout = compute_timeout(t_1) 

258 j._process.wait(timeout.total_seconds()) 

259 except subprocess.TimeoutExpired: 

260 logging.debug("Slurm srun scheduler killing process %d", j.id()) 

261 j._process.kill() 

262 

263 j._state = State.FAILED