Coverage for melissa/scheduler/slurm.py: 40%

172 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-11-03 09:52 +0100

1#!/usr/bin/python3 

2 

3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31import logging 

32import os 

33import re 

34import shutil 

35import subprocess 

36from typing import Dict, List, Optional, Tuple, Union 

37from pathlib import Path 

38 

39from melissa.utility.process import ArgumentList, CompletedProcess, Environment 

40 

41from .job import Id, Job, State 

42from .scheduler import IndirectScheduler, Options 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47class SlurmJob(Job): 

48 def __init__(self, job_id: Id) -> None: 

49 self._id = job_id 

50 self._state = State.WAITING 

51 

52 def id(self) -> Id: 

53 return self._id 

54 

55 def unique_id(self) -> Union[str, int]: 

56 return self._id 

57 

58 def state(self) -> State: 

59 return self._state 

60 

61 def __repr__(self) -> str: 

62 r = "SlurmJob(id={:d},state={:s})".format(self.id(), str(self.state())) 

63 return r 

64 

65 

66class SlurmScheduler(IndirectScheduler[SlurmJob]): 

67 # always compile regular expressions to enforce ASCII matching 

68 # allow matching things like version 1.2.3-rc4 

69 _srun_version_regexp = r"slurm (\d+)[.](\d+)[.](\d+\S*)" 

70 _srun_version_pattern = re.compile(_srun_version_regexp, re.ASCII) 

71 _sbatch_job_id_regexp = r"(\d+)" 

72 _sbatch_job_id_pattern = re.compile(_sbatch_job_id_regexp, re.ASCII) 

73 _sacct_line_regexp = r"(\d+)([+]0[.]batch)?[|](\w+)" 

74 _sacct_line_pattern = re.compile(_sacct_line_regexp, re.ASCII) 

75 

76 _use_het_prefix: Optional[bool] = None 

77 

78 @classmethod 

79 def _name_impl(cls) -> str: 

80 return "slurm" 

81 

82 @classmethod 

83 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

84 srun_path = shutil.which("srun") 

85 if srun_path is None: 

86 return False, "srun executable not found" 

87 

88 srun = subprocess.run( 

89 [srun_path, "--version"], 

90 stdin=subprocess.DEVNULL, 

91 stdout=subprocess.PIPE, 

92 stderr=subprocess.PIPE, 

93 universal_newlines=True, 

94 ) 

95 if srun.returncode != 0: 

96 return False, "failed to execute %s: %s" % (srun_path, srun.stderr) 

97 

98 # do not use pattern.fullmatch() because of the newline at the end of 

99 # the output 

100 match = cls._srun_version_pattern.match(srun.stdout) 

101 if match is None: 

102 e = "srun output '{:s}' does not match expected format" 

103 raise ValueError(e.format(srun.stdout)) 

104 

105 version_major = int(match.group(1)) 

106 # the minor version might be of the form `05` but the Python int() 

107 # function handles this correctly 

108 version_minor = int(match.group(2)) 

109 version_patch = match.group(3) 

110 

111 if version_major < 19 or (version_major == 19 and version_minor < 5): 

112 logger.warn( 

113 "Melissa has not been tested with Slurm versions older than 19.05.5", RuntimeWarning 

114 ) 

115 

116 if version_major < 17 or (version_major == 17 and version_minor < 11): 

117 return ( 

118 False, 

119 "expected at least Slurm 17.11," 

120 f"got {version_major}.{version_minor}.{version_patch}" 

121 "which does not support heterogeneous jobs", 

122 ) 

123 

124 cls._use_het_prefix = version_major >= 20 

125 

126 version_str = srun.stdout[match.span(1)[0] : match.span(3)[1]] 

127 return True, (srun_path, version_str) 

128 

129 def __init__(self) -> None: 

130 is_available, info = self.is_available() 

131 if not is_available: 

132 raise RuntimeError("Slurm unavailable: %s" % (info,)) 

133 

134 assert self._use_het_prefix is not None 

135 

136 def _sanity_check_impl(self, options: Options) -> List[str]: 

137 args = options.raw_arguments 

138 errors = [] 

139 

140 for a in args: 

141 if a[0] != "-": 

142 errors.append("non-option argument '{:s}' detected".format(a)) 

143 elif a in ["-n", "--ntasks", "--test-only"]: 

144 errors.append("remove '{:s}' argument".format(a)) 

145 

146 command = ["srun", "--test-only", "--ntasks=1"] + args + ["--", "true"] 

147 srun = subprocess.run( 

148 command, 

149 stdin=subprocess.DEVNULL, 

150 stdout=subprocess.DEVNULL, 

151 stderr=subprocess.PIPE, 

152 universal_newlines=True, 

153 ) 

154 if srun.returncode != 0: 

155 e = "srun error on trial execution: {:s}".format(srun.stderr) 

156 errors.append(e) 

157 

158 return errors 

159 

160 def _submit_job_impl( 

161 self, commands: List[ArgumentList], env: Environment, options: Options, name: str, uid: int 

162 ) -> Tuple[ArgumentList, Environment]: 

163 sbatch_env = os.environ.copy() 

164 sbatch_env.update(env) 

165 

166 output_filename = "./stdout/job.{:d}.{:s}.out".format(uid, name) 

167 error_filename = "./stdout/job.{:d}.{:s}.err".format(uid, name) 

168 

169 # sbatch Script and srun Arguments Assembly 

170 # 

171 # Command arguments are assembled initially as lists of lists of 

172 # strings (`List[List[str]]`) to avoid ambiguities, to allow sanity 

173 # checks, and to allow sanitizing the text later. The inner lists are 

174 # * lines in sbatch scripts 

175 # * hetgroup arguments on the srun command line. 

176 # 

177 # The actual strings are created in two steps: 

178 # * The inner lists are turned into strings with `#SBATCH`, `:` 

179 # prefixes, back-slashes, and so on *without* newlines at the end. 

180 # * The list of strings is turned into a string and the missing 

181 # newlines are added. 

182 # 

183 # The convention of leaving away newlines means that an empty string 

184 # will be replaced a newline. 

185 

186 # assemble sbatch options 

187 # 

188 # the options below need to be specified only once; these are so-called 

189 # "propagated options" in the Slurm documentation 

190 # user-provided options are assumed to be non-propagated 

191 propagated_options = [ 

192 "--output={:s}".format(output_filename), 

193 "--error={:s}".format(error_filename), 

194 ] 

195 

196 hetjob_keyword = "hetjob" if self._use_het_prefix else "packjob" 

197 hetjob_options = [] # type: List[str] 

198 for i, _ in enumerate(commands): 

199 hetjob_options.extend(options.raw_arguments) 

200 if i + 1 < len(commands): 

201 hetjob_options.append(hetjob_keyword) 

202 

203 sbatch_options = propagated_options + hetjob_options 

204 

205 # serialize sbatch options 

206 def options2str(options: str) -> str: 

207 return "#SBATCH " + options 

208 

209 sbatch_options_str = [options2str(o) for o in sbatch_options] 

210 

211 # assemble srun arguments 

212 hetgroup_keyword = "het-group" if self._use_het_prefix else "pack-group" 

213 

214 sched_cmd = options.sched_cmd 

215 sched_cmd_opt = options.sched_cmd_opt 

216 

217 srun_arguments: List[List[str]] = [] 

218 if not sched_cmd: 

219 assert len(commands) == 1, "non-unit groups are not supported in this configuration" 

220 srun_arguments = [commands[0]] 

221 else: 

222 if len(commands) == 1: 

223 srun_arguments = [sched_cmd_opt + ["--"] + commands[0]] 

224 else: 

225 for i, cmd in enumerate(commands): 

226 args = [" ".join(sched_cmd_opt), f"--{hetgroup_keyword}={i}", "--"] + cmd 

227 srun_arguments.append(args) 

228 

229 # serialize srun arguments 

230 def args2jobsteps(hetgroup: int, args: List[str]) -> str: 

231 assert hetgroup >= 0 

232 assert hetgroup < len(commands) 

233 prefix = "exec " if not sched_cmd else f"exec {options.sched_cmd} " 

234 suffix = " & \\" 

235 return prefix + " ".join(args) + suffix 

236 

237 srun_arguments_str = [args2jobsteps(i, args) for i, args in enumerate(srun_arguments)] 

238 # write srun calls to file 

239 Path('./sbatch').mkdir(parents=True, exist_ok=True) 

240 sbatch_script_filename = "./sbatch/sbatch.{:d}.sh".format(uid) 

241 sbatch_script = ( 

242 ["#!/bin/sh"] 

243 + ["# sbatch script for job {:s}".format(name)] 

244 + sbatch_options_str 

245 + [""] 

246 + srun_arguments_str 

247 + ["wait"] 

248 ) 

249 

250 # POSIX requires files to end with a newline; missing newlines at the 

251 # end of a file may break scripts that append text. 

252 # this string won't contain a newline at the end; it must be added 

253 # manually or by using a function that adds it, e.g., `print` 

254 sbatch_script_str_noeol = "\n".join(sbatch_script) 

255 

256 with open(sbatch_script_filename, "w") as f: 

257 print(sbatch_script_str_noeol, file=f) 

258 

259 sbatch_call = ( 

260 ["sbatch"] 

261 + ["--parsable"] 

262 + ["--job-name={:s}".format(name)] 

263 + [sbatch_script_filename] 

264 ) 

265 

266 return sbatch_call, sbatch_env 

267 

268 def _make_job_impl(self, sbatch: CompletedProcess, uid: int) -> SlurmJob: 

269 if sbatch.stderr != "": 

270 logger.warning(f"sbatch (uid={uid}): {sbatch.stderr}") 

271 

272 if sbatch.exit_status != 0: 

273 raise RuntimeError("sbatch failed (exit status {:d})".format(sbatch.exit_status)) 

274 

275 match = self._sbatch_job_id_pattern.fullmatch(sbatch.stdout.strip()) 

276 if match is None: 

277 e = "no job ID found in sbatch output: '{:s}'" 

278 raise ValueError(e.format(sbatch.stdout)) 

279 

280 return SlurmJob(int(match.group(1))) 

281 

282 def _update_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]: 

283 job_list = ["--job={:d}".format(j.id()) for j in jobs] 

284 sacct_command = ["sacct", "--parsable2", "--format=JobID,State"] + job_list 

285 return sacct_command, os.environ 

286 

287 def _parse_update_jobs_impl(self, jobs: List[SlurmJob], sacct: CompletedProcess) -> None: 

288 job_map = dict([(j.id(), j) for j in jobs]) 

289 for line in sacct.stdout.strip().split("\n"): 

290 self._update_jobs_impl_pure(job_map, line) 

291 

292 @classmethod 

293 def _update_jobs_impl_pure(cls, job_map: Dict[int, SlurmJob], sacct_line: str) -> None: 

294 slurm_states_waiting = ["PENDING", "REQUEUED", "SUSPENDED", "RESIZING"] 

295 slurm_states_failure = [ 

296 "BOOT_FAIL", 

297 "CANCELLED", 

298 "DEADLINE", 

299 "FAILED", 

300 "NODE_FAIL", 

301 "OUT_OF_MEMORY", 

302 "PREEMPTED", 

303 "TIMEOUT", 

304 ] 

305 

306 match = cls._sacct_line_pattern.fullmatch(sacct_line) 

307 if match is None: 

308 return 

309 

310 job_id = int(match.group(1)) 

311 slurm_state = match.group(3) 

312 

313 assert job_id in job_map 

314 

315 j = job_map[job_id] 

316 

317 if slurm_state == "REVOKED": 

318 e = "cannot handle Slurm siblings like job {:d}" 

319 raise NotImplementedError(e.format(job_id)) 

320 

321 if slurm_state in slurm_states_waiting: 

322 assert j.state() in [State.WAITING, State.RUNNING] 

323 j._state = State.WAITING 

324 elif slurm_state in ["RUNNING"]: 

325 assert j.state() in [State.WAITING, State.RUNNING] 

326 j._state = State.RUNNING 

327 elif slurm_state in ["COMPLETED"]: 

328 assert j.state() in [State.WAITING, State.RUNNING, State.TERMINATED] 

329 j._state = State.TERMINATED 

330 elif slurm_state in slurm_states_failure: 

331 assert j.state() in [State.WAITING, State.RUNNING, State.FAILED] 

332 j._state = State.FAILED 

333 else: 

334 e = "unknown Slurm job state '{:s}'" 

335 raise RuntimeError(e.format(slurm_state)) 

336 

337 def _cancel_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]: 

338 job_list = [str(j.id()) for j in jobs] 

339 scancel_command = ["scancel", "--batch", "--quiet"] + job_list 

340 return scancel_command, os.environ 

341 

342 def _parse_cancel_jobs_impl(self, jobs: List[SlurmJob], proc: CompletedProcess) -> None: 

343 # scancel exits with status 1 if at least one job had already been 

344 # terminated 

345 if proc.exit_status not in [0, 1]: 

346 raise RuntimeError("scancel error: exit status {:d}".format(proc.exit_status))