Coverage for melissa/scheduler/slurm_global.py: 28%
138 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
1#!/usr/bin/python3
3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31import logging
32import os
33import re
34import shutil
35import subprocess
36from types import ModuleType
37from typing import cast, List, Tuple, Union
39from melissa.utility import time
40from melissa.utility.process import ArgumentList, Environment, Process
42from .job import Id, Job, State
43from .scheduler import DirectScheduler, Options
45logging = cast(ModuleType, logging.getLogger(__name__))
48class SrunJob(Job):
49 def __init__(self, uid: Id, process: "subprocess.Popen[str]") -> None:
50 super().__init__()
51 self._uid = uid
52 self._process = process
53 self._state = State.RUNNING
55 def id(self) -> Id:
56 return self._process.pid
58 def unique_id(self) -> Union[str, int]:
59 return self._uid
61 def state(self) -> State:
62 return self._state
64 def __repr__(self) -> str:
65 r = "SrunJob(id={:d},state={:s})".format(self.id(), str(self._state))
66 return r
69class SlurmGlobalScheduler(DirectScheduler[SrunJob]):
70 # always compile regular expressions to enforce ASCII matching
71 # allow matching things like version 1.2.3-rc4
72 _srun_version_regexp = r"slurm (\d+)[.](\d+)[.](\d+\S*)"
73 _srun_version_pattern = re.compile(_srun_version_regexp, re.ASCII)
74 _sbatch_job_id_regexp = r"(\d+)"
75 _sbatch_job_id_pattern = re.compile(_sbatch_job_id_regexp, re.ASCII)
76 _sacct_line_regexp = r"(\d+)([+]0[.]batch)?[|](\w+)"
77 _sacct_line_pattern = re.compile(_sacct_line_regexp, re.ASCII)
78 _use_het_prefix: bool = False
80 @classmethod
81 def _name_impl(cls) -> str:
82 return "slurm"
84 @classmethod
85 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
86 srun_path = shutil.which("srun")
87 if srun_path is None:
88 return False, "srun executable not found"
90 srun = subprocess.run(
91 [srun_path, "--version"],
92 stdin=subprocess.DEVNULL,
93 stdout=subprocess.PIPE,
94 stderr=subprocess.PIPE,
95 universal_newlines=True,
96 )
97 if srun.returncode != 0:
98 return False, "failed to execute %s: %s" % (srun_path, srun.stderr)
100 # do not use pattern.fullmatch() because of the newline at the end of
101 # the output
102 match = cls._srun_version_pattern.match(srun.stdout)
103 if match is None:
104 e = "srun output '{:s}' does not match expected format"
105 raise ValueError(e.format(srun.stdout))
107 version_major = int(match.group(1))
108 # the minor version might be of the form `05` but the Python int()
109 # function handles this correctly
110 version_minor = int(match.group(2))
111 version_patch = int(match.group(3))
113 if version_major < 19 or (version_major == 19 and version_minor < 5):
114 logging.warn(
115 "Melissa has not been tested with Slurm versions older than 19.05.5", RuntimeWarning
116 )
118 if version_major < 17 or (version_major == 17 and version_minor < 11):
119 return (
120 False,
121 (
122 "Expected at least Slurm 17.11, got"
123 f"{version_major}.{version_minor}.{version_patch}"
124 "which does not support heterogeneous jobs"
125 ),
126 )
128 cls._use_het_prefix = version_major >= 20
130 version_str = srun.stdout[match.span(1)[0] : match.span(3)[1]]
131 return True, (srun_path, version_str)
133 def __init__(self) -> None:
134 is_available, info = self.is_available()
135 if not is_available:
136 raise RuntimeError("Slurm unavailable: %s" % (info,))
138 assert self._use_het_prefix is not None
140 def _sanity_check_impl(self, options: Options) -> List[str]:
141 args = options.raw_arguments
142 errors = []
144 for a in args:
145 if a[0] != "-":
146 errors.append("non-option argument '{:s}' detected".format(a))
147 elif a in ["-n", "--ntasks", "--test-only"]:
148 errors.append("remove '{:s}' argument".format(a))
150 command = ["srun", "--test-only", "--ntasks=1"] + args + ["--", "true"]
151 srun = subprocess.run(
152 command,
153 stdin=subprocess.DEVNULL,
154 stdout=subprocess.DEVNULL,
155 stderr=subprocess.PIPE,
156 universal_newlines=True,
157 )
158 if srun.returncode != 0:
159 e = "srun error on trial execution: {:s}".format(srun.stderr)
160 errors.append(e)
162 return errors
164 def _submit_job_impl(
165 self,
166 commands: List[ArgumentList],
167 env: Environment,
168 options: Options,
169 name: str,
170 unique_id: int,
171 ) -> Tuple[ArgumentList, Environment]:
172 # Approach to environment variables:
173 # By default all environment variables are propagated
174 srun_env = os.environ.copy()
175 srun_env.update(env)
177 output_filename = "./stdout/job.{:d}.{:s}.out".format(unique_id, name)
178 error_filename = "./stdout/job.{:d}.{:s}.err".format(unique_id, name)
180 # build propagated options of the srun command line
181 propagated_options = [
182 "--output={:s}".format(output_filename),
183 "--error={:s}".format(error_filename),
184 ] + options.raw_arguments
186 sched_cmd = options.sched_cmd
187 sched_cmd_opt = options.sched_cmd_opt
189 assert len(commands) == 1, "non-unit groups are not supported in this configuration"
190 srun_arguments: List[List[str]] = []
191 if not sched_cmd:
192 srun_arguments = [commands[0]]
193 else:
194 srun_arguments = [["--"], commands[0]]
196 def args2str(args: List[str]) -> str:
197 return "" + "".join(args)
198 logging.debug(f"srun_arguments: {srun_arguments}")
199 srun_arguments_str = [args2str(args) for args in srun_arguments]
200 logging.debug(f"srun_arguments_str: {srun_arguments_str}")
202 # write srun/job execution call
203 if not sched_cmd:
204 srun_call = srun_arguments_str
205 else:
206 srun_call = [sched_cmd] + sched_cmd_opt + propagated_options + srun_arguments_str
208 return srun_call, srun_env
210 def _make_job_impl(self, proc: "Process[str]", unique_id: int) -> SrunJob:
211 return SrunJob(unique_id, proc)
213 @classmethod
214 def _update_jobs_impl(cls, jobs: List[SrunJob]) -> None:
215 for j in jobs:
216 returncode = j._process.poll()
217 if returncode is None:
218 state = State.RUNNING
219 elif returncode == 0:
220 state = State.TERMINATED
221 else:
222 state = State.FAILED
223 j._state = state
225 @classmethod
226 def _cancel_jobs_impl(cls, jobs: List[SrunJob]) -> None:
227 # when the user presses ctrl+c, the shell will send all processes in
228 # the same process group SIGINT. some programs respond intelligently to
229 # signals by freeing ressources and exiting. these programs may also
230 # exit _immediately_ if they receive a second signal within a short
231 # time frame (e.g., srun or mpirun which won't terminate its child
232 # processes in this case). for this reason, we wait before terminating
233 # jobs here.
234 max_wait_time = time.Time(seconds=5)
236 # wait at most max_wait_time overall
237 def compute_timeout(t_start: time.Time) -> time.Time:
238 t_waited = time.monotonic() - t_start
239 if t_waited < max_wait_time:
240 return max_wait_time - t_waited
241 return time.Time(seconds=0)
243 # terminate processes
244 t_0 = time.monotonic()
245 for j in jobs:
246 try:
247 timeout = compute_timeout(t_0)
248 j._process.wait(timeout.total_seconds())
249 except subprocess.TimeoutExpired:
250 logging.debug("Slurm srun scheduler terminating process %d", j.id())
251 j._process.terminate()
253 # kill processes if necessary
254 t_1 = time.monotonic()
255 for j in jobs:
256 try:
257 timeout = compute_timeout(t_1)
258 j._process.wait(timeout.total_seconds())
259 except subprocess.TimeoutExpired:
260 logging.debug("Slurm srun scheduler killing process %d", j.id())
261 j._process.kill()
263 j._state = State.FAILED