Coverage for melissa/scheduler/oar_hybrid.py: 15%

263 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are 

8# met: 

9# 

10# * Redistributions of source code must retain the above copyright notice, 

11# this list of conditions and the following disclaimer. 

12# 

13# * Redistributions in binary form must reproduce the above copyright 

14# notice, this list of conditions and the following disclaimer in the 

15# documentation and/or other materials provided with the distribution. 

16# 

17# * Neither the name of the copyright holder nor the names of its 

18# contributors may be used to endorse or promote products derived from 

19# this software without specific prior written permission. 

20# 

21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 

22# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 

23# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 

24# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 

25# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 

26# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 

27# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 

28# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 

29# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 

30# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 

31# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

32 

33import json 

34import logging 

35import os 

36import re 

37import shutil 

38import subprocess 

39from typing import List, Tuple, Union, Optional, Dict 

40from pathlib import Path 

41 

42from melissa.utility.process import ArgumentList, CompletedProcess, Environment 

43 

44from .job import Job, State 

45from .scheduler import IndirectScheduler, Options 

46 

47logger = logging.getLogger(__name__) 

48 

49 

50class OarJob(Job): 

51 def __init__(self, job_uid: Union[str, int], job_id: int, queue: str) -> None: 

52 super(OarJob, self).__init__() 

53 self._uid = job_uid 

54 self._id = job_id 

55 self._state = State.WAITING 

56 self.queue = queue 

57 

58 def id(self) -> int: 

59 return self._id 

60 

61 def unique_id(self) -> Union[str, int]: 

62 return self._uid 

63 

64 def state(self) -> State: 

65 return self._state 

66 

67 def __repr__(self) -> str: 

68 return f"OarJob(uid={self._uid},id={self._id},state={self._state})" 

69 

70 def set_state(self, new_state: State) -> None: 

71 self._state = new_state 

72 

73 

74class OarContainer: 

75 def __init__( 

76 self, 

77 container_options: List[str], 

78 container_client_size: int, 

79 besteffort_alloc_freq: int 

80 ) -> None: 

81 # general attributes 

82 self.container_job: Optional[OarJob] = None 

83 self.options: List[str] = container_options 

84 self.container_client_size: int = container_client_size 

85 self.besteffort_alloc_freq: int = besteffort_alloc_freq 

86 

87 

88class OarHybridScheduler(IndirectScheduler[OarJob]): 

89 # always compile regular expressions to enforce ASCII matching 

90 # allow matching things like version 1.2.3-rc4 

91 # there is a space in front of the colon in version 2.5.8 

92 _oarsub_version_regexp = r"OAR version\s*: (\d+)[.](\d+)[.](\d+\S*)" 

93 _oarsub_version_pattern = re.compile(_oarsub_version_regexp, re.ASCII) 

94 

95 _oarsub_job_id_regexp = r"OAR_JOB_ID=(\d+)" 

96 _oarsub_job_id_pattern = re.compile(_oarsub_job_id_regexp, re.ASCII) 

97 

98 @classmethod 

99 def _name_impl(cls) -> str: 

100 return "oar" 

101 

102 @classmethod 

103 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

104 oarsub_path = shutil.which("oarsub") 

105 if oarsub_path is None: 

106 return False, "oarsub executable not found" 

107 

108 oarsub = subprocess.run( 

109 [oarsub_path, "--version"], 

110 stdin=subprocess.DEVNULL, 

111 stdout=subprocess.PIPE, 

112 stderr=subprocess.PIPE, 

113 universal_newlines=True, 

114 ) 

115 if oarsub.returncode != 0: 

116 return False, "failed to execute %s: %s" % (oarsub_path, oarsub.stderr) 

117 

118 # do not use pattern.fullmatch() because of the newline at 

119 # the end of the oarsub output and because the release name 

120 # will be printed after the version number. 

121 match = cls._oarsub_version_pattern.match(oarsub.stdout) 

122 if match is None: 

123 e = "oarsub output '{:s}' does not match expected format" 

124 raise RuntimeError(e.format(oarsub.stdout)) 

125 

126 version_str = oarsub.stdout[match.span(1)[0] : match.span(3)[1]] 

127 return True, (oarsub_path, version_str) 

128 

129 def __init__( 

130 self, 

131 mpi_provider: str, 

132 container_options: List[str], 

133 container_client_size: int, 

134 besteffort_alloc_freq: int 

135 ) -> None: 

136 is_available, info = self.is_available() 

137 if not is_available: 

138 raise RuntimeError("OAR unavailable: {:s}".format(info)) 

139 

140 if mpi_provider not in ["openmpi"]: 

141 raise ValueError("unknown MPI implementation '%s'" % mpi_provider) 

142 

143 super().__init__() 

144 self.mpi_provider = mpi_provider 

145 

146 # initialize attributes related to the HybridAllocation 

147 self.container = OarContainer( 

148 container_options, 

149 container_client_size, 

150 besteffort_alloc_freq 

151 ) 

152 

153 # list of client uids submitted in the container 

154 self.container_uid: List[int] = [] 

155 # list of submitted and terminated clients 

156 self.submitted_client_uid: List[int] = [] 

157 self.terminated_client_uid: List[int] = [] 

158 # server uid and client counter 

159 self.server_uid: int = 0 

160 self.client_ctr: int = 0 

161 

162 def _submit_job_impl( 

163 self, 

164 commands: List[ArgumentList], 

165 env: Environment, 

166 options: Options, 

167 name: str, 

168 uid: int, 

169 ) -> Tuple[ArgumentList, Environment]: 

170 if self.container.container_job is None: 

171 self._submit_job_impl_container() 

172 if name == "melissa-server": 

173 self.server_uid = uid 

174 return self._submit_job_impl_server(commands, env, options, name, uid) 

175 else: 

176 self.client_ctr += 1 

177 return self._submit_job_impl_client(commands, env, options, name, uid) 

178 

179 def _submit_job_impl_container(self): 

180 def opt2cmd(raw_arguments: List[str]) -> Tuple[str, List[str]]: 

181 """ 

182 this function scans all client/server options to derive the resource and 

183 submission commands. 

184 """ 

185 contr_resource_cmd = "" 

186 contr_submission_opt = [] 

187 for o in raw_arguments: 

188 if o[:2] in ["-t", "-p", "-q"]: 

189 contr_submission_opt += [o[:2], o[3:]] 

190 else: 

191 contr_resource_cmd += o 

192 return contr_resource_cmd, contr_submission_opt 

193 

194 # get container resource and options 

195 contr_resource_cmd, contr_submission_opt = opt2cmd(self.container.options) 

196 contr_resource_cmd = ["-l"] + [contr_resource_cmd] 

197 

198 # build the submission command 

199 script_filename = "dummy_script.sh" 

200 # oarsub cannot handle single quotes! 

201 contr_submission_opt = [ 

202 "-t", 

203 "container", 

204 "-t", 

205 "cosystem", 

206 ] + contr_submission_opt 

207 

208 # create dummy sleeping script to keep the container alive 

209 stdout_filename = "./stdout/oar.container.out" 

210 stderr_filename = "./stdout/oar.container.err" 

211 with open(script_filename, "w") as f: 

212 print("#!/bin/sh", file=f) 

213 print("#OAR -O %s" % stdout_filename, file=f) 

214 print("#OAR -E %s" % stderr_filename, file=f) 

215 print("echo \"DATE =$(date)\"", file=f) 

216 print("echo \"Hostname =$(hostname -s)\"", file=f) 

217 print("echo \"Working directory =$(pwd)\"", file=f) 

218 print("echo \"\"", file=f) 

219 print("sleep $(($OAR_JOB_WALLTIME_SECONDS-60))", file=f) 

220 os.chmod(script_filename, 0o744) 

221 

222 maybe_job_name = ["--name", "melissa-container"] 

223 

224 oar_command = ( 

225 [ 

226 "oarsub", 

227 "--scanscript", 

228 "--directory={:s}".format(os.getcwd()), 

229 ] 

230 + contr_submission_opt 

231 + contr_resource_cmd 

232 + maybe_job_name 

233 + ["--", os.path.abspath(script_filename)] 

234 ) 

235 

236 # since container jobs are specific to this scheduler it must 

237 # be launched manually from here 

238 logger.debug(f"launching {oar_command}") 

239 with open(stdout_filename, "w") as stdout: 

240 with open(stderr_filename, "w") as stderr: 

241 proc = subprocess.Popen( 

242 args=oar_command, 

243 env=os.environ, 

244 stdin=subprocess.DEVNULL, 

245 stdout=stdout, 

246 stderr=stderr, 

247 universal_newlines=True, 

248 ) 

249 logger.debug("submitting job uid container") 

250 proc.wait() 

251 

252 # the corresponding CompletedProcess and Job are made here too 

253 with open(stdout_filename, "r") as f: 

254 stdout = f.read() 

255 with open(stderr_filename, "r") as f: 

256 stderr = f.read() 

257 oarsub = CompletedProcess(exit_status=proc.returncode, stdout=stdout, stderr=stderr) 

258 self.container.container_job = self._make_job_impl(oarsub, "container") 

259 logger.debug( 

260 f"job submission succeeded (UID container ID {self.container.container_job.id()})" 

261 ) 

262 

263 def _submit_job_impl_server( 

264 self, 

265 commands: List[ArgumentList], 

266 env: Environment, 

267 options: Options, 

268 name: str, 

269 uid: int, 

270 ) -> Tuple[ArgumentList, Environment]: 

271 sched_cmd = options.sched_cmd 

272 sched_cmd_opt = options.sched_cmd_opt 

273 

274 if self.mpi_provider == "openmpi": 

275 machinefile_arg = ["-machinefile", '"$OAR_NODE_FILE"'] 

276 else: 

277 fmt = "OAR scheduler implementation for MPI provider '{:s}' missing" 

278 raise NotImplementedError(fmt.format(self.mpi_provider)) 

279 

280 # assemble mpirun arguments 

281 mpirun_args = [] 

282 

283 # gather environment variables 

284 oar_env: Dict = {} 

285 oar_env.update(env) 

286 

287 for key in ["LD_LIBRARY_PATH", "PATH"]: 

288 if key not in oar_env and key in os.environ: 

289 oar_env[key] = os.environ[key] 

290 

291 if oar_env: 

292 env_args = ["env"] + ["%s=%s" % (key, oar_env[key]) for key in oar_env] 

293 else: 

294 env_args = [] 

295 

296 for i, cmd in enumerate(commands): 

297 args = sched_cmd_opt + ["--"] + env_args + cmd 

298 args_str = " ".join(args) 

299 mpirun_args.append(args_str) 

300 

301 mpirun_command = ( 

302 "exec " + sched_cmd 

303 + " " 

304 + " ".join(machinefile_arg) 

305 + " \\\n " 

306 + " : \\\n ".join(mpirun_args) 

307 ) 

308 

309 def opt2cmd(raw_arguments: List[str]) -> Tuple[str, List[str]]: 

310 """ 

311 this function scans all client/server options to derive the resource and 

312 submission commands. 

313 """ 

314 oar_resource_cmd = "" 

315 oar_submission_opt = [] 

316 for o in raw_arguments: 

317 if o[:2] in ["-t", "-p", "-q"]: 

318 oar_submission_opt += [o[:2], o[3:]] 

319 else: 

320 oar_resource_cmd += o 

321 return oar_resource_cmd, oar_submission_opt 

322 

323 server_resource_cmd, server_submission_opt = opt2cmd(options.raw_arguments) 

324 

325 Path('./oarsub').mkdir(parents=True, exist_ok=True) 

326 script_filename = "./oarsub/oarsub.%d.sh" % uid 

327 # oarsub cannot handle single quotes! 

328 with open(script_filename, "w") as f: 

329 print("#!/bin/sh", file=f) 

330 print("#OAR -O ./stdout/oar.%d.out" % uid, file=f) 

331 print("#OAR -E ./stdout/oar.%d.err" % uid, file=f) 

332 print(mpirun_command, file=f) 

333 os.chmod(script_filename, 0o744) 

334 

335 maybe_job_name = ["--name", name] if name else [] 

336 

337 assert self.container.container_job 

338 

339 oar_command = ( 

340 [ 

341 "oarsub", 

342 "--scanscript", 

343 "--directory={:s}".format(os.getcwd()), 

344 ] 

345 + ["-t", "inner=" + str(self.container.container_job.id())] 

346 + server_submission_opt 

347 + ["-l", server_resource_cmd] 

348 + maybe_job_name 

349 + ["--", os.path.abspath(script_filename)] 

350 ) 

351 

352 return oar_command, os.environ 

353 

354 def _submit_job_impl_client( 

355 self, 

356 commands: List[ArgumentList], 

357 env: Environment, 

358 options: Options, 

359 name: str, 

360 uid: int, 

361 ) -> Tuple[ArgumentList, Environment]: 

362 sched_cmd = options.sched_cmd 

363 sched_cmd_opt = options.sched_cmd_opt 

364 

365 if self.mpi_provider == "openmpi": 

366 machinefile_arg = ["-machinefile", '"$OAR_NODE_FILE"'] 

367 else: 

368 fmt = "OAR scheduler implementation for MPI provider '{:s}' missing" 

369 raise NotImplementedError(fmt.format(self.mpi_provider)) 

370 

371 # assemble mpirun arguments 

372 mpirun_args = [] 

373 

374 # gather environment variables 

375 oar_env: Dict[str, str] = {} 

376 oar_env.update(env) 

377 

378 for key in ["LD_LIBRARY_PATH", "PATH"]: 

379 if key not in oar_env and key in os.environ: 

380 oar_env[key] = os.environ[key] 

381 

382 if oar_env: 

383 env_args = ["env"] + ["%s=%s" % (key, oar_env[key]) for key in oar_env] 

384 else: 

385 env_args = [] 

386 

387 for cmd in commands: 

388 args = sched_cmd_opt + ["--"] + env_args + cmd 

389 args_str = " ".join(args) 

390 mpirun_args.append(args_str) 

391 

392 mpirun_command = ( 

393 "exec " + sched_cmd 

394 + " " 

395 + " ".join(machinefile_arg) 

396 + " \\\n " 

397 + " : \\\n ".join(mpirun_args) 

398 ) 

399 

400 def opt2cmd(raw_arguments: List[str]) -> Tuple[str, List[str]]: 

401 """ 

402 this function scans all client/server options to derive the resource and 

403 submission commands. 

404 """ 

405 oar_resource_cmd = "" 

406 oar_submission_opt = [] 

407 for o in raw_arguments: 

408 if o[:2] in ["-t", "-p", "-q"]: 

409 oar_submission_opt += [o[:2], o[3:]] 

410 else: 

411 oar_resource_cmd += o 

412 return oar_resource_cmd, oar_submission_opt 

413 

414 client_resource_cmd, client_submission_opt = opt2cmd(options.raw_arguments) 

415 

416 Path('./oarsub').mkdir(parents=True, exist_ok=True) 

417 script_filename = "./oarsub/oarsub.%d.sh" % uid 

418 # oarsub cannot handle single quotes! 

419 with open(script_filename, "w") as f: 

420 print("#!/bin/sh", file=f) 

421 print("#OAR -O ./stdout/oar.%d.out" % uid, file=f) 

422 print("#OAR -E ./stdout/oar.%d.err" % uid, file=f) 

423 print(mpirun_command, file=f) 

424 os.chmod(script_filename, 0o744) 

425 

426 maybe_job_name = ["--name", name] if name else [] 

427 

428 # queue selection conditions depend on the launching phase (1st or 2nd) which is inferred 

429 # from the size of the terminated client list (=0 => 1st phase >0 => 2nd phase) 

430 # -> in the 1st phase: is the submission counter a multiple of the frequency parameter ? 

431 # -> in the 2nd phase: are there less clients in the container than there is room in it ? 

432 fst_cond = ( 

433 len(self.terminated_client_uid) == 0 

434 and self.client_ctr % self.container.besteffort_alloc_freq == 0 

435 ) 

436 n_client_in_cont = len( 

437 [uid for uid in self.container_uid if uid not in self.terminated_client_uid] 

438 ) 

439 snd_cond = ( 

440 len(self.terminated_client_uid) > 0 

441 and n_client_in_cont >= self.container.container_client_size 

442 ) 

443 if fst_cond or snd_cond: 

444 logger.debug(f"first condition: {fst_cond}, second condition: {snd_cond}") 

445 oar_command = ( 

446 [ 

447 "oarsub", 

448 "--scanscript", 

449 "--directory={:s}".format(os.getcwd()), 

450 ] 

451 + ["-t", "besteffort"] 

452 + client_submission_opt 

453 + ["-l", client_resource_cmd] 

454 + maybe_job_name 

455 + ["--", os.path.abspath(script_filename)] 

456 ) 

457 else: 

458 assert self.container.container_job 

459 self.container_uid.append(uid) 

460 oar_command = ( 

461 [ 

462 "oarsub", 

463 "--scanscript", 

464 "--directory={:s}".format(os.getcwd()), 

465 ] 

466 + ["-t", "inner=" + str(self.container.container_job.id())] 

467 + client_submission_opt 

468 + ["-l", client_resource_cmd] 

469 + maybe_job_name 

470 + ["--", os.path.abspath(script_filename)] 

471 ) 

472 

473 return oar_command, os.environ 

474 

475 def _make_job_impl(self, oarsub: CompletedProcess, uid: Union[int, str]) -> OarJob: 

476 if oarsub.stderr != "": 

477 logger.warning(f"oarsub (uid={uid}): {oarsub.stderr}") 

478 if oarsub.exit_status != 0: 

479 raise RuntimeError("oarsub failed (exit status {oarsub.exit_status})") 

480 

481 # do not match because oarsub may print lines starting with 

482 # [ADMISSION RULE] 

483 match = self._oarsub_job_id_pattern.search(oarsub.stdout) 

484 if match is None: 

485 raise ValueError("no job id found in oarsub output: '{:s}'".format(oarsub.stdout)) 

486 

487 if type(uid) is str or uid == self.server_uid: 

488 return OarJob(uid, int(match.group(1)), "default") 

489 else: 

490 self.submitted_client_uid.append(int(uid)) 

491 return OarJob( 

492 uid, 

493 int(match.group(1)), 

494 "default" if uid in self.container_uid else "best-effort", 

495 ) 

496 

497 def _update_jobs_impl(self, jobs: List[OarJob]) -> Tuple[ArgumentList, Environment]: 

498 assert self.container.container_job 

499 job_args = ["--job={:d}".format(j.id()) for j in jobs + [self.container.container_job]] 

500 # `oarstat --state` does not distinguish between failed and successful 

501 # jobs 

502 return ["oarstat", "--full", "--json"] + job_args, os.environ 

503 

504 def _parse_update_jobs_impl(self, jobs: List[OarJob], oarstat: CompletedProcess) -> None: 

505 states_map = { 

506 "Error": State.ERROR, 

507 "Finishing": State.TERMINATED, 

508 "Hold": State.WAITING, 

509 "Launching": State.WAITING, 

510 "Running": State.RUNNING, 

511 "Terminated": State.TERMINATED, 

512 "toAckReservation": State.WAITING, 

513 "toLaunch": State.WAITING, 

514 "Waiting": State.WAITING, 

515 } 

516 

517 info = json.loads(oarstat.stdout) 

518 

519 assert self.container.container_job 

520 for j in jobs + [self.container.container_job]: 

521 key = str(j.id()) 

522 if key not in info: 

523 continue 

524 

525 oar_state = info[key]["state"] 

526 new_state = states_map.get(oar_state) 

527 if new_state is None: 

528 fmt = "unknown OAR job state '{:s}'" 

529 raise RuntimeError(fmt.format(oar_state)) 

530 

531 if new_state == State.TERMINATED: 

532 assert "exit_code" in info[key] 

533 # when a BESTEFFORT_KILL event is trigerred by OAR 

534 # the state becomes finishing with a null (i.e. None) exit_code 

535 if info[key]["exit_code"] is not None and int(info[key]["exit_code"]) == 0: 

536 j.set_state(State.TERMINATED) 

537 self.terminated_client_uid.append(int(j.unique_id())) 

538 else: 

539 if info[key]["exit_code"] is None: 

540 logger.debug(f"best-effort job {j.id()} was killed") 

541 j.set_state(State.FAILED) 

542 elif new_state == State.ERROR and j.queue == "best-effort": 

543 # State.Failed will result in a job resubmission request from the server 

544 logger.debug(f"best-effort job {j.id()} was killed") 

545 j.set_state(State.FAILED) 

546 else: 

547 j.set_state(new_state) 

548 

549 def _cancel_jobs_impl(self, jobs: List[OarJob]) -> Tuple[ArgumentList, Environment]: 

550 jobs_str = ["{:d}".format(job.id()) for job in jobs] 

551 if self.server_uid in [job.unique_id() for job in jobs]: 

552 assert self.container.container_job 

553 logger.debug("Cancelling container") 

554 jobs_str.append(str(self.container.container_job.id())) 

555 self.container.container_job = None 

556 return ["oardel"] + jobs_str, os.environ 

557 

558 def _parse_cancel_jobs_impl(self, jobs: List[OarJob], proc: CompletedProcess) -> None: 

559 job_already_killed_code = 6 

560 if proc.exit_status not in [0, job_already_killed_code]: 

561 raise RuntimeError("oardel error: exit status {:d}".format(proc.exit_status))