Coverage for melissa/scheduler/oar.py: 26%
134 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
1#!/usr/bin/python3
3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12#
13# * Redistributions in binary form must reproduce the above copyright
14# notice, this list of conditions and the following disclaimer in the
15# documentation and/or other materials provided with the distribution.
16#
17# * Neither the name of the copyright holder nor the names of its
18# contributors may be used to endorse or promote products derived from
19# this software without specific prior written permission.
20#
21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
22# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
23# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
24# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
27# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33import rapidjson
34import logging
35import os
36import re
37import shutil
38import subprocess
39from typing import List, Tuple, Union, Dict
40from pathlib import Path
42from melissa.utility.process import ArgumentList, CompletedProcess, Environment
44from .job import Job, State
45from .scheduler import IndirectScheduler, Options
47logger = logging.getLogger(__name__)
50class OarJob(Job):
51 def __init__(self, job_id: int) -> None:
52 super(OarJob, self).__init__()
53 self._id = job_id
54 self._state = State.WAITING
56 def id(self) -> int:
57 return self._id
59 def unique_id(self) -> Union[str, int]:
60 return self._id
62 def state(self) -> State:
63 return self._state
65 def __repr__(self) -> str:
66 return "OarJob(id={:d},state={:s})".format(self._id, str(self._state))
68 def set_state(self, new_state: State) -> None:
69 self._state = new_state
72class OarScheduler(IndirectScheduler[OarJob]):
73 # always compile regular expressions to enforce ASCII matching
74 # allow matching things like version 1.2.3-rc4
75 # there is a space in front of the colon in version 2.5.8
76 _oarsub_version_regexp = r"OAR version\s*: (\d+)[.](\d+)[.](\d+\S*)"
77 _oarsub_version_pattern = re.compile(_oarsub_version_regexp, re.ASCII)
79 _oarsub_job_id_regexp = r"OAR_JOB_ID=(\d+)"
80 _oarsub_job_id_pattern = re.compile(_oarsub_job_id_regexp, re.ASCII)
82 @classmethod
83 def _name_impl(cls) -> str:
84 return "oar"
86 @classmethod
87 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
88 oarsub_path = shutil.which("oarsub")
89 if oarsub_path is None:
90 return False, "oarsub executable not found"
92 oarsub = subprocess.run(
93 [oarsub_path, "--version"],
94 stdin=subprocess.DEVNULL,
95 stdout=subprocess.PIPE,
96 stderr=subprocess.PIPE,
97 universal_newlines=True,
98 )
99 if oarsub.returncode != 0:
100 return False, "failed to execute %s: %s" % (oarsub_path, oarsub.stderr)
102 # do not use pattern.fullmatch() because of the newline at
103 # the end of the oarsub output and because the release name
104 # will be printed after the version number.
105 match = cls._oarsub_version_pattern.match(oarsub.stdout)
106 if match is None:
107 e = "oarsub output '{:s}' does not match expected format"
108 raise RuntimeError(e.format(oarsub.stdout))
110 version_str = oarsub.stdout[match.span(1)[0] : match.span(3)[1]]
111 return True, (oarsub_path, version_str)
113 def __init__(self, mpi_provider: str) -> None:
114 is_available, info = self.is_available()
115 if not is_available:
116 raise RuntimeError("OAR unavailable: {:s}".format(info))
118 super().__init__()
119 self.mpi_provider = mpi_provider
121 def _submit_job_impl(
122 self,
123 commands: List[ArgumentList],
124 env: Environment,
125 options: Options,
126 name: str,
127 uid: int,
128 ) -> Tuple[ArgumentList, Environment]:
129 sched_cmd = options.sched_cmd
130 sched_cmd_opt = options.sched_cmd_opt
132 if self.mpi_provider == "openmpi":
133 machinefile_arg = ["-machinefile", '"$OAR_NODE_FILE"']
134 else:
135 fmt = "OAR scheduler implementation for MPI provider '{:s}' missing"
136 raise NotImplementedError(fmt.format(self.mpi_provider))
138 # assemble mpirun arguments
139 mpirun_args = []
141 # gather environment variables
142 oar_env: Dict = {}
143 oar_env.update(env)
145 for key in ["LD_LIBRARY_PATH", "PATH"]:
146 if key not in oar_env and key in os.environ:
147 oar_env[key] = os.environ[key]
149 if oar_env:
150 env_args = ["env"] + ["%s=%s" % (key, oar_env[key]) for key in oar_env]
151 else:
152 env_args = []
154 for i, cmd in enumerate(commands):
155 args = sched_cmd_opt + ["--"] + env_args + cmd
156 args_str = " ".join(args)
157 mpirun_args.append(args_str)
159 mpirun_command = (
160 "exec " + sched_cmd
161 + " "
162 + " ".join(machinefile_arg)
163 + " \\\n "
164 + " : \\\n ".join(mpirun_args)
165 )
167 def opt2cmd(raw_arguments: List[str]) -> Tuple[str, List[str]]:
168 """
169 this function scans all client/server options to derive the resource and
170 submission commands.
171 """
172 oar_resource_cmd = "#OAR --resource "
173 oar_submission_opt = []
174 for o in raw_arguments:
175 if o[:2] in ["-t", "-p", "-q"]:
176 oar_submission_opt += [o[:2], o[3:]]
177 else:
178 oar_resource_cmd += o
179 return oar_resource_cmd, oar_submission_opt
181 Path('./oarsub').mkdir(parents=True, exist_ok=True)
182 script_filename = "./oarsub/oarsub.%d.sh" % uid
183 # oarsub cannot handle single quotes!
184 oar_resource_cmd, oar_submission_opt = opt2cmd(options.raw_arguments)
185 with open(script_filename, "w") as f:
186 print("#!/bin/sh", file=f)
187 print("#OAR -O ./stdout/oar.%d.out" % uid, file=f)
188 print("#OAR -E ./stdout/oar.%d.err" % uid, file=f)
189 print(oar_resource_cmd, file=f)
190 print(mpirun_command, file=f)
191 os.chmod(script_filename, 0o744)
193 maybe_job_name = ["--name", name] if name else []
195 oar_command = (
196 [
197 "oarsub",
198 "--scanscript",
199 "--directory={:s}".format(os.getcwd()),
200 ]
201 + oar_submission_opt
202 + maybe_job_name
203 + ["--", os.path.abspath(script_filename)]
204 )
206 return oar_command, os.environ
208 def _make_job_impl(self, oarsub: CompletedProcess, uid: int) -> OarJob:
209 if oarsub.stderr != "":
210 logger.warning("oarsub (uid=%d): %s", uid, oarsub.stderr)
211 if oarsub.exit_status != 0:
212 raise RuntimeError("oarsub failed (exit status {:d})".format(oarsub.exit_status))
214 # do not match because oarsub may print lines starting with
215 # [ADMISSION RULE]
216 match = self._oarsub_job_id_pattern.search(oarsub.stdout)
217 if match is None:
218 raise ValueError("no job id found in oarsub output: '{:s}'".format(oarsub.stdout))
220 return OarJob(int(match.group(1)))
222 def _update_jobs_impl(self, jobs: List[OarJob]) -> Tuple[ArgumentList, Environment]:
223 job_args = ["--job={:d}".format(j.id()) for j in jobs]
224 # `oarstat --state` does not distinguish between failed and successful
225 # jobs
226 return ["oarstat", "--full", "--json"] + job_args, os.environ
228 def _parse_update_jobs_impl(self, jobs: List[OarJob], oarstat: CompletedProcess) -> None:
229 states_map = {
230 "Error": State.ERROR,
231 "Finishing": State.TERMINATED,
232 "Hold": State.WAITING,
233 "Launching": State.WAITING,
234 "Running": State.RUNNING,
235 "Terminated": State.TERMINATED,
236 "toAckReservation": State.WAITING,
237 "toLaunch": State.WAITING,
238 "Waiting": State.WAITING,
239 }
241 info = rapidjson.loads(oarstat.stdout)
243 for j in jobs:
244 key = str(j.id())
245 if key not in info:
246 continue
248 oar_state = info[key]["state"]
249 new_state = states_map.get(oar_state)
250 if new_state is None:
251 fmt = "unknown OAR job state '{:s}'"
252 raise RuntimeError(fmt.format(oar_state))
254 if new_state == State.TERMINATED:
255 assert "exit_code" in info[key]
256 if int(info[key]["exit_code"]) == 0:
257 j.set_state(State.TERMINATED)
258 else:
259 j.set_state(State.FAILED)
260 else:
261 j.set_state(new_state)
263 def _cancel_jobs_impl(self, jobs: List[OarJob]) -> Tuple[ArgumentList, Environment]:
264 jobs_str = ["{:d}".format(job.id()) for job in jobs]
265 return ["oardel"] + jobs_str, os.environ
267 def _parse_cancel_jobs_impl(self, jobs: List[OarJob], proc: CompletedProcess) -> None:
268 job_already_killed_code = 6
269 if proc.exit_status not in [0, job_already_killed_code]:
270 raise RuntimeError("oardel error: exit status {:d}".format(proc.exit_status))