Coverage for melissa/scheduler/oar.py: 26%

134 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are 

8# met: 

9# 

10# * Redistributions of source code must retain the above copyright notice, 

11# this list of conditions and the following disclaimer. 

12# 

13# * Redistributions in binary form must reproduce the above copyright 

14# notice, this list of conditions and the following disclaimer in the 

15# documentation and/or other materials provided with the distribution. 

16# 

17# * Neither the name of the copyright holder nor the names of its 

18# contributors may be used to endorse or promote products derived from 

19# this software without specific prior written permission. 

20# 

21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 

22# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 

23# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 

24# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 

25# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 

26# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 

27# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 

28# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 

29# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 

30# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 

31# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

32 

33import rapidjson 

34import logging 

35import os 

36import re 

37import shutil 

38import subprocess 

39from typing import List, Tuple, Union, Dict 

40from pathlib import Path 

41 

42from melissa.utility.process import ArgumentList, CompletedProcess, Environment 

43 

44from .job import Job, State 

45from .scheduler import IndirectScheduler, Options 

46 

47logger = logging.getLogger(__name__) 

48 

49 

50class OarJob(Job): 

51 def __init__(self, job_id: int) -> None: 

52 super(OarJob, self).__init__() 

53 self._id = job_id 

54 self._state = State.WAITING 

55 

56 def id(self) -> int: 

57 return self._id 

58 

59 def unique_id(self) -> Union[str, int]: 

60 return self._id 

61 

62 def state(self) -> State: 

63 return self._state 

64 

65 def __repr__(self) -> str: 

66 return "OarJob(id={:d},state={:s})".format(self._id, str(self._state)) 

67 

68 def set_state(self, new_state: State) -> None: 

69 self._state = new_state 

70 

71 

72class OarScheduler(IndirectScheduler[OarJob]): 

73 # always compile regular expressions to enforce ASCII matching 

74 # allow matching things like version 1.2.3-rc4 

75 # there is a space in front of the colon in version 2.5.8 

76 _oarsub_version_regexp = r"OAR version\s*: (\d+)[.](\d+)[.](\d+\S*)" 

77 _oarsub_version_pattern = re.compile(_oarsub_version_regexp, re.ASCII) 

78 

79 _oarsub_job_id_regexp = r"OAR_JOB_ID=(\d+)" 

80 _oarsub_job_id_pattern = re.compile(_oarsub_job_id_regexp, re.ASCII) 

81 

82 @classmethod 

83 def _name_impl(cls) -> str: 

84 return "oar" 

85 

86 @classmethod 

87 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

88 oarsub_path = shutil.which("oarsub") 

89 if oarsub_path is None: 

90 return False, "oarsub executable not found" 

91 

92 oarsub = subprocess.run( 

93 [oarsub_path, "--version"], 

94 stdin=subprocess.DEVNULL, 

95 stdout=subprocess.PIPE, 

96 stderr=subprocess.PIPE, 

97 universal_newlines=True, 

98 ) 

99 if oarsub.returncode != 0: 

100 return False, "failed to execute %s: %s" % (oarsub_path, oarsub.stderr) 

101 

102 # do not use pattern.fullmatch() because of the newline at 

103 # the end of the oarsub output and because the release name 

104 # will be printed after the version number. 

105 match = cls._oarsub_version_pattern.match(oarsub.stdout) 

106 if match is None: 

107 e = "oarsub output '{:s}' does not match expected format" 

108 raise RuntimeError(e.format(oarsub.stdout)) 

109 

110 version_str = oarsub.stdout[match.span(1)[0] : match.span(3)[1]] 

111 return True, (oarsub_path, version_str) 

112 

113 def __init__(self, mpi_provider: str) -> None: 

114 is_available, info = self.is_available() 

115 if not is_available: 

116 raise RuntimeError("OAR unavailable: {:s}".format(info)) 

117 

118 super().__init__() 

119 self.mpi_provider = mpi_provider 

120 

121 def _submit_job_impl( 

122 self, 

123 commands: List[ArgumentList], 

124 env: Environment, 

125 options: Options, 

126 name: str, 

127 uid: int, 

128 ) -> Tuple[ArgumentList, Environment]: 

129 sched_cmd = options.sched_cmd 

130 sched_cmd_opt = options.sched_cmd_opt 

131 

132 if self.mpi_provider == "openmpi": 

133 machinefile_arg = ["-machinefile", '"$OAR_NODE_FILE"'] 

134 else: 

135 fmt = "OAR scheduler implementation for MPI provider '{:s}' missing" 

136 raise NotImplementedError(fmt.format(self.mpi_provider)) 

137 

138 # assemble mpirun arguments 

139 mpirun_args = [] 

140 

141 # gather environment variables 

142 oar_env: Dict = {} 

143 oar_env.update(env) 

144 

145 for key in ["LD_LIBRARY_PATH", "PATH"]: 

146 if key not in oar_env and key in os.environ: 

147 oar_env[key] = os.environ[key] 

148 

149 if oar_env: 

150 env_args = ["env"] + ["%s=%s" % (key, oar_env[key]) for key in oar_env] 

151 else: 

152 env_args = [] 

153 

154 for i, cmd in enumerate(commands): 

155 args = sched_cmd_opt + ["--"] + env_args + cmd 

156 args_str = " ".join(args) 

157 mpirun_args.append(args_str) 

158 

159 mpirun_command = ( 

160 "exec " + sched_cmd 

161 + " " 

162 + " ".join(machinefile_arg) 

163 + " \\\n " 

164 + " : \\\n ".join(mpirun_args) 

165 ) 

166 

167 def opt2cmd(raw_arguments: List[str]) -> Tuple[str, List[str]]: 

168 """ 

169 this function scans all client/server options to derive the resource and 

170 submission commands. 

171 """ 

172 oar_resource_cmd = "#OAR --resource " 

173 oar_submission_opt = [] 

174 for o in raw_arguments: 

175 if o[:2] in ["-t", "-p", "-q"]: 

176 oar_submission_opt += [o[:2], o[3:]] 

177 else: 

178 oar_resource_cmd += o 

179 return oar_resource_cmd, oar_submission_opt 

180 

181 Path('./oarsub').mkdir(parents=True, exist_ok=True) 

182 script_filename = "./oarsub/oarsub.%d.sh" % uid 

183 # oarsub cannot handle single quotes! 

184 oar_resource_cmd, oar_submission_opt = opt2cmd(options.raw_arguments) 

185 with open(script_filename, "w") as f: 

186 print("#!/bin/sh", file=f) 

187 print("#OAR -O ./stdout/oar.%d.out" % uid, file=f) 

188 print("#OAR -E ./stdout/oar.%d.err" % uid, file=f) 

189 print(oar_resource_cmd, file=f) 

190 print(mpirun_command, file=f) 

191 os.chmod(script_filename, 0o744) 

192 

193 maybe_job_name = ["--name", name] if name else [] 

194 

195 oar_command = ( 

196 [ 

197 "oarsub", 

198 "--scanscript", 

199 "--directory={:s}".format(os.getcwd()), 

200 ] 

201 + oar_submission_opt 

202 + maybe_job_name 

203 + ["--", os.path.abspath(script_filename)] 

204 ) 

205 

206 return oar_command, os.environ 

207 

208 def _make_job_impl(self, oarsub: CompletedProcess, uid: int) -> OarJob: 

209 if oarsub.stderr != "": 

210 logger.warning("oarsub (uid=%d): %s", uid, oarsub.stderr) 

211 if oarsub.exit_status != 0: 

212 raise RuntimeError("oarsub failed (exit status {:d})".format(oarsub.exit_status)) 

213 

214 # do not match because oarsub may print lines starting with 

215 # [ADMISSION RULE] 

216 match = self._oarsub_job_id_pattern.search(oarsub.stdout) 

217 if match is None: 

218 raise ValueError("no job id found in oarsub output: '{:s}'".format(oarsub.stdout)) 

219 

220 return OarJob(int(match.group(1))) 

221 

222 def _update_jobs_impl(self, jobs: List[OarJob]) -> Tuple[ArgumentList, Environment]: 

223 job_args = ["--job={:d}".format(j.id()) for j in jobs] 

224 # `oarstat --state` does not distinguish between failed and successful 

225 # jobs 

226 return ["oarstat", "--full", "--json"] + job_args, os.environ 

227 

228 def _parse_update_jobs_impl(self, jobs: List[OarJob], oarstat: CompletedProcess) -> None: 

229 states_map = { 

230 "Error": State.ERROR, 

231 "Finishing": State.TERMINATED, 

232 "Hold": State.WAITING, 

233 "Launching": State.WAITING, 

234 "Running": State.RUNNING, 

235 "Terminated": State.TERMINATED, 

236 "toAckReservation": State.WAITING, 

237 "toLaunch": State.WAITING, 

238 "Waiting": State.WAITING, 

239 } 

240 

241 info = rapidjson.loads(oarstat.stdout) 

242 

243 for j in jobs: 

244 key = str(j.id()) 

245 if key not in info: 

246 continue 

247 

248 oar_state = info[key]["state"] 

249 new_state = states_map.get(oar_state) 

250 if new_state is None: 

251 fmt = "unknown OAR job state '{:s}'" 

252 raise RuntimeError(fmt.format(oar_state)) 

253 

254 if new_state == State.TERMINATED: 

255 assert "exit_code" in info[key] 

256 if int(info[key]["exit_code"]) == 0: 

257 j.set_state(State.TERMINATED) 

258 else: 

259 j.set_state(State.FAILED) 

260 else: 

261 j.set_state(new_state) 

262 

263 def _cancel_jobs_impl(self, jobs: List[OarJob]) -> Tuple[ArgumentList, Environment]: 

264 jobs_str = ["{:d}".format(job.id()) for job in jobs] 

265 return ["oardel"] + jobs_str, os.environ 

266 

267 def _parse_cancel_jobs_impl(self, jobs: List[OarJob], proc: CompletedProcess) -> None: 

268 job_already_killed_code = 6 

269 if proc.exit_status not in [0, job_already_killed_code]: 

270 raise RuntimeError("oardel error: exit status {:d}".format(proc.exit_status))