Coverage for melissa/scheduler/slurm.py: 40%

172 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31import logging 

32import os 

33import re 

34import shutil 

35import subprocess 

36from typing import Dict, List, Optional, Tuple, Union 

37from pathlib import Path 

38 

39from melissa.utility.process import ArgumentList, CompletedProcess, Environment 

40 

41from .job import Id, Job, State 

42from .scheduler import IndirectScheduler, Options 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47class SlurmJob(Job): 

48 def __init__(self, job_id: Id) -> None: 

49 self._id = job_id 

50 self._state = State.WAITING 

51 

52 def id(self) -> Id: 

53 return self._id 

54 

55 def unique_id(self) -> Union[str, int]: 

56 return self._id 

57 

58 def state(self) -> State: 

59 return self._state 

60 

61 def __repr__(self) -> str: 

62 r = "SlurmJob(id={:d},state={:s})".format(self.id(), str(self.state())) 

63 return r 

64 

65 

66class SlurmScheduler(IndirectScheduler[SlurmJob]): 

67 # always compile regular expressions to enforce ASCII matching 

68 # allow matching things like version 1.2.3-rc4 

69 _srun_version_regexp = r"slurm (\d+)[.](\d+)[.](\d+\S*)" 

70 _srun_version_pattern = re.compile(_srun_version_regexp, re.ASCII) 

71 _sbatch_job_id_regexp = r"(\d+)" 

72 _sbatch_job_id_pattern = re.compile(_sbatch_job_id_regexp, re.ASCII) 

73 _sacct_line_regexp = r"(\d+)([+]0[.]batch)?[|](\w+)" 

74 _sacct_line_pattern = re.compile(_sacct_line_regexp, re.ASCII) 

75 

76 _use_het_prefix: Optional[bool] = None 

77 

78 @classmethod 

79 def _name_impl(cls) -> str: 

80 return "slurm" 

81 

82 @classmethod 

83 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

84 srun_path = shutil.which("srun") 

85 if srun_path is None: 

86 return False, "srun executable not found" 

87 

88 srun = subprocess.run( 

89 [srun_path, "--version"], 

90 stdin=subprocess.DEVNULL, 

91 stdout=subprocess.PIPE, 

92 stderr=subprocess.PIPE, 

93 universal_newlines=True, 

94 ) 

95 if srun.returncode != 0: 

96 return False, "failed to execute %s: %s" % (srun_path, srun.stderr) 

97 

98 # do not use pattern.fullmatch() because of the newline at the end of 

99 # the output 

100 match = cls._srun_version_pattern.match(srun.stdout) 

101 if match is None: 

102 e = "srun output '{:s}' does not match expected format" 

103 raise ValueError(e.format(srun.stdout)) 

104 

105 version_major = int(match.group(1)) 

106 # the minor version might be of the form `05` but the Python int() 

107 # function handles this correctly 

108 version_minor = int(match.group(2)) 

109 version_patch = match.group(3) 

110 

111 if version_major < 19 or (version_major == 19 and version_minor < 5): 

112 logger.warn( 

113 "Melissa has not been tested with Slurm versions older than 19.05.5", RuntimeWarning 

114 ) 

115 

116 if version_major < 17 or (version_major == 17 and version_minor < 11): 

117 return ( 

118 False, 

119 "expected at least Slurm 17.11," 

120 f"got {version_major}.{version_minor}.{version_patch}" 

121 "which does not support heterogeneous jobs", 

122 ) 

123 

124 cls._use_het_prefix = version_major >= 20 

125 

126 version_str = srun.stdout[match.span(1)[0] : match.span(3)[1]] 

127 return True, (srun_path, version_str) 

128 

129 def __init__(self) -> None: 

130 is_available, info = self.is_available() 

131 if not is_available: 

132 raise RuntimeError("Slurm unavailable: %s" % (info,)) 

133 

134 assert self._use_het_prefix is not None 

135 

136 def _sanity_check_impl(self, options: Options) -> List[str]: 

137 args = options.raw_arguments 

138 errors = [] 

139 

140 for a in args: 

141 if a[0] != "-": 

142 errors.append("non-option argument '{:s}' detected".format(a)) 

143 elif a in ["-n", "--ntasks", "--test-only"]: 

144 errors.append("remove '{:s}' argument".format(a)) 

145 

146 command = ["srun", "--test-only", "--ntasks=1"] + args + ["--", "true"] 

147 srun = subprocess.run( 

148 command, 

149 stdin=subprocess.DEVNULL, 

150 stdout=subprocess.DEVNULL, 

151 stderr=subprocess.PIPE, 

152 universal_newlines=True, 

153 ) 

154 if srun.returncode != 0: 

155 e = "srun error on trial execution: {:s}".format(srun.stderr) 

156 errors.append(e) 

157 

158 return errors 

159 

160 def _submit_job_impl( 

161 self, commands: List[ArgumentList], env: Environment, options: Options, name: str, uid: int 

162 ) -> Tuple[ArgumentList, Environment]: 

163 sbatch_env = os.environ.copy() 

164 sbatch_env.update(env) 

165 

166 output_filename = "./stdout/job.{:d}.{:s}.out".format(uid, name) 

167 error_filename = "./stdout/job.{:d}.{:s}.err".format(uid, name) 

168 

169 # sbatch Script and srun Arguments Assembly 

170 # 

171 # Command arguments are assembled initially as lists of lists of 

172 # strings (`List[List[str]]`) to avoid ambiguities, to allow sanity 

173 # checks, and to allow sanitizing the text later. The inner lists are 

174 # * lines in sbatch scripts 

175 # * hetgroup arguments on the srun command line. 

176 # 

177 # The actual strings are created in two steps: 

178 # * The inner lists are turned into strings with `#SBATCH`, `:` 

179 # prefixes, back-slashes, and so on *without* newlines at the end. 

180 # * The list of strings is turned into a string and the missing 

181 # newlines are added. 

182 # 

183 # The convention of leaving away newlines means that an empty string 

184 # will be replaced a newline. 

185 

186 # assemble sbatch options 

187 # 

188 # the options below need to be specified only once; these are so-called 

189 # "propagated options" in the Slurm documentation 

190 # user-provided options are assumed to be non-propagated 

191 propagated_options = [ 

192 "--output={:s}".format(output_filename), 

193 "--error={:s}".format(error_filename), 

194 ] 

195 

196 hetjob_keyword = "hetjob" if self._use_het_prefix else "packjob" 

197 hetjob_options = [] # type: List[str] 

198 for i, _ in enumerate(commands): 

199 hetjob_options.extend(options.raw_arguments) 

200 if i + 1 < len(commands): 

201 hetjob_options.append(hetjob_keyword) 

202 

203 sbatch_options = propagated_options + hetjob_options 

204 

205 # serialize sbatch options 

206 def options2str(options: str) -> str: 

207 return "#SBATCH " + options 

208 

209 sbatch_options_str = [options2str(o) for o in sbatch_options] 

210 

211 # assemble srun arguments 

212 hetgroup_keyword = "het-group" if self._use_het_prefix else "pack-group" 

213 

214 sched_cmd = options.sched_cmd 

215 sched_cmd_opt = options.sched_cmd_opt 

216 

217 srun_arguments: List[List[str]] = [] 

218 if not sched_cmd: 

219 assert len(commands) == 1, "non-unit groups are not supported in this configuration" 

220 srun_arguments = [commands[0]] 

221 else: 

222 if len(commands) == 1: 

223 srun_arguments = [sched_cmd_opt + ["--"] + commands[0]] 

224 else: 

225 for i, cmd in enumerate(commands): 

226 args = [" ".join(sched_cmd_opt), f"--{hetgroup_keyword}={i}", "--"] + cmd 

227 srun_arguments.append(args) 

228 

229 # serialize srun arguments 

230 def args2str(hetgroup: int, args: List[str]) -> str: 

231 assert hetgroup >= 0 

232 assert hetgroup < len(commands) 

233 

234 prefix = ": " if hetgroup > 0 else "" 

235 suffix = " \\" if hetgroup + 1 < len(commands) else "" 

236 return " " + prefix + " ".join(args) + suffix 

237 

238 srun_arguments_str = [args2str(i, args) for i, args in enumerate(srun_arguments)] 

239 

240 # write srun calls to file 

241 Path('./sbatch').mkdir(parents=True, exist_ok=True) 

242 sbatch_script_filename = "./sbatch/sbatch.{:d}.sh".format(uid) 

243 sbatch_script = ( 

244 ["#!/bin/sh"] 

245 + ["# sbatch script for job {:s}".format(name)] 

246 + sbatch_options_str 

247 + [""] 

248 + (["exec \\"] if not sched_cmd else [f"exec {options.sched_cmd} \\"]) 

249 + srun_arguments_str 

250 ) 

251 

252 # POSIX requires files to end with a newline; missing newlines at the 

253 # end of a file may break scripts that append text. 

254 # this string won't contain a newline at the end; it must be added 

255 # manually or by using a function that adds it, e.g., `print` 

256 sbatch_script_str_noeol = "\n".join(sbatch_script) 

257 

258 with open(sbatch_script_filename, "w") as f: 

259 print(sbatch_script_str_noeol, file=f) 

260 

261 sbatch_call = ( 

262 ["sbatch"] 

263 + ["--parsable"] 

264 + ["--job-name={:s}".format(name)] 

265 + [sbatch_script_filename] 

266 ) 

267 

268 return sbatch_call, sbatch_env 

269 

270 def _make_job_impl(self, sbatch: CompletedProcess, uid: int) -> SlurmJob: 

271 if sbatch.stderr != "": 

272 logger.warning(f"sbatch (uid={uid}): {sbatch.stderr}") 

273 

274 if sbatch.exit_status != 0: 

275 raise RuntimeError("sbatch failed (exit status {:d})".format(sbatch.exit_status)) 

276 

277 match = self._sbatch_job_id_pattern.fullmatch(sbatch.stdout.strip()) 

278 if match is None: 

279 e = "no job ID found in sbatch output: '{:s}'" 

280 raise ValueError(e.format(sbatch.stdout)) 

281 

282 return SlurmJob(int(match.group(1))) 

283 

284 def _update_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]: 

285 job_list = ["--job={:d}".format(j.id()) for j in jobs] 

286 sacct_command = ["sacct", "--parsable2", "--format=JobID,State"] + job_list 

287 return sacct_command, os.environ 

288 

289 def _parse_update_jobs_impl(self, jobs: List[SlurmJob], sacct: CompletedProcess) -> None: 

290 job_map = dict([(j.id(), j) for j in jobs]) 

291 for line in sacct.stdout.strip().split("\n"): 

292 self._update_jobs_impl_pure(job_map, line) 

293 

294 @classmethod 

295 def _update_jobs_impl_pure(cls, job_map: Dict[int, SlurmJob], sacct_line: str) -> None: 

296 slurm_states_waiting = ["PENDING", "REQUEUED", "SUSPENDED", "RESIZING"] 

297 slurm_states_failure = [ 

298 "BOOT_FAIL", 

299 "CANCELLED", 

300 "DEADLINE", 

301 "FAILED", 

302 "NODE_FAIL", 

303 "OUT_OF_MEMORY", 

304 "PREEMPTED", 

305 "TIMEOUT", 

306 ] 

307 

308 match = cls._sacct_line_pattern.fullmatch(sacct_line) 

309 if match is None: 

310 return 

311 

312 job_id = int(match.group(1)) 

313 slurm_state = match.group(3) 

314 

315 assert job_id in job_map 

316 

317 j = job_map[job_id] 

318 

319 if slurm_state == "REVOKED": 

320 e = "cannot handle Slurm siblings like job {:d}" 

321 raise NotImplementedError(e.format(job_id)) 

322 

323 if slurm_state in slurm_states_waiting: 

324 assert j.state() in [State.WAITING, State.RUNNING] 

325 j._state = State.WAITING 

326 elif slurm_state in ["RUNNING"]: 

327 assert j.state() in [State.WAITING, State.RUNNING] 

328 j._state = State.RUNNING 

329 elif slurm_state in ["COMPLETED"]: 

330 assert j.state() in [State.WAITING, State.RUNNING, State.TERMINATED] 

331 j._state = State.TERMINATED 

332 elif slurm_state in slurm_states_failure: 

333 assert j.state() in [State.WAITING, State.RUNNING, State.FAILED] 

334 j._state = State.FAILED 

335 else: 

336 e = "unknown Slurm job state '{:s}'" 

337 raise RuntimeError(e.format(slurm_state)) 

338 

339 def _cancel_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]: 

340 job_list = [str(j.id()) for j in jobs] 

341 scancel_command = ["scancel", "--batch", "--quiet"] + job_list 

342 return scancel_command, os.environ 

343 

344 def _parse_cancel_jobs_impl(self, jobs: List[SlurmJob], proc: CompletedProcess) -> None: 

345 # scancel exits with status 1 if at least one job had already been 

346 # terminated 

347 if proc.exit_status not in [0, 1]: 

348 raise RuntimeError("scancel error: exit status {:d}".format(proc.exit_status))