Coverage for melissa/scheduler/slurm.py: 40%
172 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
1#!/usr/bin/python3
3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31import logging
32import os
33import re
34import shutil
35import subprocess
36from typing import Dict, List, Optional, Tuple, Union
37from pathlib import Path
39from melissa.utility.process import ArgumentList, CompletedProcess, Environment
41from .job import Id, Job, State
42from .scheduler import IndirectScheduler, Options
44logger = logging.getLogger(__name__)
47class SlurmJob(Job):
48 def __init__(self, job_id: Id) -> None:
49 self._id = job_id
50 self._state = State.WAITING
52 def id(self) -> Id:
53 return self._id
55 def unique_id(self) -> Union[str, int]:
56 return self._id
58 def state(self) -> State:
59 return self._state
61 def __repr__(self) -> str:
62 r = "SlurmJob(id={:d},state={:s})".format(self.id(), str(self.state()))
63 return r
66class SlurmScheduler(IndirectScheduler[SlurmJob]):
67 # always compile regular expressions to enforce ASCII matching
68 # allow matching things like version 1.2.3-rc4
69 _srun_version_regexp = r"slurm (\d+)[.](\d+)[.](\d+\S*)"
70 _srun_version_pattern = re.compile(_srun_version_regexp, re.ASCII)
71 _sbatch_job_id_regexp = r"(\d+)"
72 _sbatch_job_id_pattern = re.compile(_sbatch_job_id_regexp, re.ASCII)
73 _sacct_line_regexp = r"(\d+)([+]0[.]batch)?[|](\w+)"
74 _sacct_line_pattern = re.compile(_sacct_line_regexp, re.ASCII)
76 _use_het_prefix: Optional[bool] = None
78 @classmethod
79 def _name_impl(cls) -> str:
80 return "slurm"
82 @classmethod
83 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
84 srun_path = shutil.which("srun")
85 if srun_path is None:
86 return False, "srun executable not found"
88 srun = subprocess.run(
89 [srun_path, "--version"],
90 stdin=subprocess.DEVNULL,
91 stdout=subprocess.PIPE,
92 stderr=subprocess.PIPE,
93 universal_newlines=True,
94 )
95 if srun.returncode != 0:
96 return False, "failed to execute %s: %s" % (srun_path, srun.stderr)
98 # do not use pattern.fullmatch() because of the newline at the end of
99 # the output
100 match = cls._srun_version_pattern.match(srun.stdout)
101 if match is None:
102 e = "srun output '{:s}' does not match expected format"
103 raise ValueError(e.format(srun.stdout))
105 version_major = int(match.group(1))
106 # the minor version might be of the form `05` but the Python int()
107 # function handles this correctly
108 version_minor = int(match.group(2))
109 version_patch = match.group(3)
111 if version_major < 19 or (version_major == 19 and version_minor < 5):
112 logger.warn(
113 "Melissa has not been tested with Slurm versions older than 19.05.5", RuntimeWarning
114 )
116 if version_major < 17 or (version_major == 17 and version_minor < 11):
117 return (
118 False,
119 "expected at least Slurm 17.11,"
120 f"got {version_major}.{version_minor}.{version_patch}"
121 "which does not support heterogeneous jobs",
122 )
124 cls._use_het_prefix = version_major >= 20
126 version_str = srun.stdout[match.span(1)[0] : match.span(3)[1]]
127 return True, (srun_path, version_str)
129 def __init__(self) -> None:
130 is_available, info = self.is_available()
131 if not is_available:
132 raise RuntimeError("Slurm unavailable: %s" % (info,))
134 assert self._use_het_prefix is not None
136 def _sanity_check_impl(self, options: Options) -> List[str]:
137 args = options.raw_arguments
138 errors = []
140 for a in args:
141 if a[0] != "-":
142 errors.append("non-option argument '{:s}' detected".format(a))
143 elif a in ["-n", "--ntasks", "--test-only"]:
144 errors.append("remove '{:s}' argument".format(a))
146 command = ["srun", "--test-only", "--ntasks=1"] + args + ["--", "true"]
147 srun = subprocess.run(
148 command,
149 stdin=subprocess.DEVNULL,
150 stdout=subprocess.DEVNULL,
151 stderr=subprocess.PIPE,
152 universal_newlines=True,
153 )
154 if srun.returncode != 0:
155 e = "srun error on trial execution: {:s}".format(srun.stderr)
156 errors.append(e)
158 return errors
160 def _submit_job_impl(
161 self, commands: List[ArgumentList], env: Environment, options: Options, name: str, uid: int
162 ) -> Tuple[ArgumentList, Environment]:
163 sbatch_env = os.environ.copy()
164 sbatch_env.update(env)
166 output_filename = "./stdout/job.{:d}.{:s}.out".format(uid, name)
167 error_filename = "./stdout/job.{:d}.{:s}.err".format(uid, name)
169 # sbatch Script and srun Arguments Assembly
170 #
171 # Command arguments are assembled initially as lists of lists of
172 # strings (`List[List[str]]`) to avoid ambiguities, to allow sanity
173 # checks, and to allow sanitizing the text later. The inner lists are
174 # * lines in sbatch scripts
175 # * hetgroup arguments on the srun command line.
176 #
177 # The actual strings are created in two steps:
178 # * The inner lists are turned into strings with `#SBATCH`, `:`
179 # prefixes, back-slashes, and so on *without* newlines at the end.
180 # * The list of strings is turned into a string and the missing
181 # newlines are added.
182 #
183 # The convention of leaving away newlines means that an empty string
184 # will be replaced a newline.
186 # assemble sbatch options
187 #
188 # the options below need to be specified only once; these are so-called
189 # "propagated options" in the Slurm documentation
190 # user-provided options are assumed to be non-propagated
191 propagated_options = [
192 "--output={:s}".format(output_filename),
193 "--error={:s}".format(error_filename),
194 ]
196 hetjob_keyword = "hetjob" if self._use_het_prefix else "packjob"
197 hetjob_options = [] # type: List[str]
198 for i, _ in enumerate(commands):
199 hetjob_options.extend(options.raw_arguments)
200 if i + 1 < len(commands):
201 hetjob_options.append(hetjob_keyword)
203 sbatch_options = propagated_options + hetjob_options
205 # serialize sbatch options
206 def options2str(options: str) -> str:
207 return "#SBATCH " + options
209 sbatch_options_str = [options2str(o) for o in sbatch_options]
211 # assemble srun arguments
212 hetgroup_keyword = "het-group" if self._use_het_prefix else "pack-group"
214 sched_cmd = options.sched_cmd
215 sched_cmd_opt = options.sched_cmd_opt
217 srun_arguments: List[List[str]] = []
218 if not sched_cmd:
219 assert len(commands) == 1, "non-unit groups are not supported in this configuration"
220 srun_arguments = [commands[0]]
221 else:
222 if len(commands) == 1:
223 srun_arguments = [sched_cmd_opt + ["--"] + commands[0]]
224 else:
225 for i, cmd in enumerate(commands):
226 args = [" ".join(sched_cmd_opt), f"--{hetgroup_keyword}={i}", "--"] + cmd
227 srun_arguments.append(args)
229 # serialize srun arguments
230 def args2str(hetgroup: int, args: List[str]) -> str:
231 assert hetgroup >= 0
232 assert hetgroup < len(commands)
234 prefix = ": " if hetgroup > 0 else ""
235 suffix = " \\" if hetgroup + 1 < len(commands) else ""
236 return " " + prefix + " ".join(args) + suffix
238 srun_arguments_str = [args2str(i, args) for i, args in enumerate(srun_arguments)]
240 # write srun calls to file
241 Path('./sbatch').mkdir(parents=True, exist_ok=True)
242 sbatch_script_filename = "./sbatch/sbatch.{:d}.sh".format(uid)
243 sbatch_script = (
244 ["#!/bin/sh"]
245 + ["# sbatch script for job {:s}".format(name)]
246 + sbatch_options_str
247 + [""]
248 + (["exec \\"] if not sched_cmd else [f"exec {options.sched_cmd} \\"])
249 + srun_arguments_str
250 )
252 # POSIX requires files to end with a newline; missing newlines at the
253 # end of a file may break scripts that append text.
254 # this string won't contain a newline at the end; it must be added
255 # manually or by using a function that adds it, e.g., `print`
256 sbatch_script_str_noeol = "\n".join(sbatch_script)
258 with open(sbatch_script_filename, "w") as f:
259 print(sbatch_script_str_noeol, file=f)
261 sbatch_call = (
262 ["sbatch"]
263 + ["--parsable"]
264 + ["--job-name={:s}".format(name)]
265 + [sbatch_script_filename]
266 )
268 return sbatch_call, sbatch_env
270 def _make_job_impl(self, sbatch: CompletedProcess, uid: int) -> SlurmJob:
271 if sbatch.stderr != "":
272 logger.warning(f"sbatch (uid={uid}): {sbatch.stderr}")
274 if sbatch.exit_status != 0:
275 raise RuntimeError("sbatch failed (exit status {:d})".format(sbatch.exit_status))
277 match = self._sbatch_job_id_pattern.fullmatch(sbatch.stdout.strip())
278 if match is None:
279 e = "no job ID found in sbatch output: '{:s}'"
280 raise ValueError(e.format(sbatch.stdout))
282 return SlurmJob(int(match.group(1)))
284 def _update_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]:
285 job_list = ["--job={:d}".format(j.id()) for j in jobs]
286 sacct_command = ["sacct", "--parsable2", "--format=JobID,State"] + job_list
287 return sacct_command, os.environ
289 def _parse_update_jobs_impl(self, jobs: List[SlurmJob], sacct: CompletedProcess) -> None:
290 job_map = dict([(j.id(), j) for j in jobs])
291 for line in sacct.stdout.strip().split("\n"):
292 self._update_jobs_impl_pure(job_map, line)
294 @classmethod
295 def _update_jobs_impl_pure(cls, job_map: Dict[int, SlurmJob], sacct_line: str) -> None:
296 slurm_states_waiting = ["PENDING", "REQUEUED", "SUSPENDED", "RESIZING"]
297 slurm_states_failure = [
298 "BOOT_FAIL",
299 "CANCELLED",
300 "DEADLINE",
301 "FAILED",
302 "NODE_FAIL",
303 "OUT_OF_MEMORY",
304 "PREEMPTED",
305 "TIMEOUT",
306 ]
308 match = cls._sacct_line_pattern.fullmatch(sacct_line)
309 if match is None:
310 return
312 job_id = int(match.group(1))
313 slurm_state = match.group(3)
315 assert job_id in job_map
317 j = job_map[job_id]
319 if slurm_state == "REVOKED":
320 e = "cannot handle Slurm siblings like job {:d}"
321 raise NotImplementedError(e.format(job_id))
323 if slurm_state in slurm_states_waiting:
324 assert j.state() in [State.WAITING, State.RUNNING]
325 j._state = State.WAITING
326 elif slurm_state in ["RUNNING"]:
327 assert j.state() in [State.WAITING, State.RUNNING]
328 j._state = State.RUNNING
329 elif slurm_state in ["COMPLETED"]:
330 assert j.state() in [State.WAITING, State.RUNNING, State.TERMINATED]
331 j._state = State.TERMINATED
332 elif slurm_state in slurm_states_failure:
333 assert j.state() in [State.WAITING, State.RUNNING, State.FAILED]
334 j._state = State.FAILED
335 else:
336 e = "unknown Slurm job state '{:s}'"
337 raise RuntimeError(e.format(slurm_state))
339 def _cancel_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]:
340 job_list = [str(j.id()) for j in jobs]
341 scancel_command = ["scancel", "--batch", "--quiet"] + job_list
342 return scancel_command, os.environ
344 def _parse_cancel_jobs_impl(self, jobs: List[SlurmJob], proc: CompletedProcess) -> None:
345 # scancel exits with status 1 if at least one job had already been
346 # terminated
347 if proc.exit_status not in [0, 1]:
348 raise RuntimeError("scancel error: exit status {:d}".format(proc.exit_status))