Coverage for melissa/scheduler/oar_hybrid.py: 15%
263 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
1#!/usr/bin/python3
3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12#
13# * Redistributions in binary form must reproduce the above copyright
14# notice, this list of conditions and the following disclaimer in the
15# documentation and/or other materials provided with the distribution.
16#
17# * Neither the name of the copyright holder nor the names of its
18# contributors may be used to endorse or promote products derived from
19# this software without specific prior written permission.
20#
21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
22# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
23# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
24# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
27# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33import json
34import logging
35import os
36import re
37import shutil
38import subprocess
39from typing import List, Tuple, Union, Optional, Dict
40from pathlib import Path
42from melissa.utility.process import ArgumentList, CompletedProcess, Environment
44from .job import Job, State
45from .scheduler import IndirectScheduler, Options
47logger = logging.getLogger(__name__)
50class OarJob(Job):
51 def __init__(self, job_uid: Union[str, int], job_id: int, queue: str) -> None:
52 super(OarJob, self).__init__()
53 self._uid = job_uid
54 self._id = job_id
55 self._state = State.WAITING
56 self.queue = queue
58 def id(self) -> int:
59 return self._id
61 def unique_id(self) -> Union[str, int]:
62 return self._uid
64 def state(self) -> State:
65 return self._state
67 def __repr__(self) -> str:
68 return f"OarJob(uid={self._uid},id={self._id},state={self._state})"
70 def set_state(self, new_state: State) -> None:
71 self._state = new_state
74class OarContainer:
75 def __init__(
76 self,
77 container_options: List[str],
78 container_client_size: int,
79 besteffort_alloc_freq: int
80 ) -> None:
81 # general attributes
82 self.container_job: Optional[OarJob] = None
83 self.options: List[str] = container_options
84 self.container_client_size: int = container_client_size
85 self.besteffort_alloc_freq: int = besteffort_alloc_freq
88class OarHybridScheduler(IndirectScheduler[OarJob]):
89 # always compile regular expressions to enforce ASCII matching
90 # allow matching things like version 1.2.3-rc4
91 # there is a space in front of the colon in version 2.5.8
92 _oarsub_version_regexp = r"OAR version\s*: (\d+)[.](\d+)[.](\d+\S*)"
93 _oarsub_version_pattern = re.compile(_oarsub_version_regexp, re.ASCII)
95 _oarsub_job_id_regexp = r"OAR_JOB_ID=(\d+)"
96 _oarsub_job_id_pattern = re.compile(_oarsub_job_id_regexp, re.ASCII)
98 @classmethod
99 def _name_impl(cls) -> str:
100 return "oar"
102 @classmethod
103 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
104 oarsub_path = shutil.which("oarsub")
105 if oarsub_path is None:
106 return False, "oarsub executable not found"
108 oarsub = subprocess.run(
109 [oarsub_path, "--version"],
110 stdin=subprocess.DEVNULL,
111 stdout=subprocess.PIPE,
112 stderr=subprocess.PIPE,
113 universal_newlines=True,
114 )
115 if oarsub.returncode != 0:
116 return False, "failed to execute %s: %s" % (oarsub_path, oarsub.stderr)
118 # do not use pattern.fullmatch() because of the newline at
119 # the end of the oarsub output and because the release name
120 # will be printed after the version number.
121 match = cls._oarsub_version_pattern.match(oarsub.stdout)
122 if match is None:
123 e = "oarsub output '{:s}' does not match expected format"
124 raise RuntimeError(e.format(oarsub.stdout))
126 version_str = oarsub.stdout[match.span(1)[0] : match.span(3)[1]]
127 return True, (oarsub_path, version_str)
129 def __init__(
130 self,
131 mpi_provider: str,
132 container_options: List[str],
133 container_client_size: int,
134 besteffort_alloc_freq: int
135 ) -> None:
136 is_available, info = self.is_available()
137 if not is_available:
138 raise RuntimeError("OAR unavailable: {:s}".format(info))
140 if mpi_provider not in ["openmpi"]:
141 raise ValueError("unknown MPI implementation '%s'" % mpi_provider)
143 super().__init__()
144 self.mpi_provider = mpi_provider
146 # initialize attributes related to the HybridAllocation
147 self.container = OarContainer(
148 container_options,
149 container_client_size,
150 besteffort_alloc_freq
151 )
153 # list of client uids submitted in the container
154 self.container_uid: List[int] = []
155 # list of submitted and terminated clients
156 self.submitted_client_uid: List[int] = []
157 self.terminated_client_uid: List[int] = []
158 # server uid and client counter
159 self.server_uid: int = 0
160 self.client_ctr: int = 0
162 def _submit_job_impl(
163 self,
164 commands: List[ArgumentList],
165 env: Environment,
166 options: Options,
167 name: str,
168 uid: int,
169 ) -> Tuple[ArgumentList, Environment]:
170 if self.container.container_job is None:
171 self._submit_job_impl_container()
172 if name == "melissa-server":
173 self.server_uid = uid
174 return self._submit_job_impl_server(commands, env, options, name, uid)
175 else:
176 self.client_ctr += 1
177 return self._submit_job_impl_client(commands, env, options, name, uid)
179 def _submit_job_impl_container(self):
180 def opt2cmd(raw_arguments: List[str]) -> Tuple[str, List[str]]:
181 """
182 this function scans all client/server options to derive the resource and
183 submission commands.
184 """
185 contr_resource_cmd = ""
186 contr_submission_opt = []
187 for o in raw_arguments:
188 if o[:2] in ["-t", "-p", "-q"]:
189 contr_submission_opt += [o[:2], o[3:]]
190 else:
191 contr_resource_cmd += o
192 return contr_resource_cmd, contr_submission_opt
194 # get container resource and options
195 contr_resource_cmd, contr_submission_opt = opt2cmd(self.container.options)
196 contr_resource_cmd = ["-l"] + [contr_resource_cmd]
198 # build the submission command
199 script_filename = "dummy_script.sh"
200 # oarsub cannot handle single quotes!
201 contr_submission_opt = [
202 "-t",
203 "container",
204 "-t",
205 "cosystem",
206 ] + contr_submission_opt
208 # create dummy sleeping script to keep the container alive
209 stdout_filename = "./stdout/oar.container.out"
210 stderr_filename = "./stdout/oar.container.err"
211 with open(script_filename, "w") as f:
212 print("#!/bin/sh", file=f)
213 print("#OAR -O %s" % stdout_filename, file=f)
214 print("#OAR -E %s" % stderr_filename, file=f)
215 print("echo \"DATE =$(date)\"", file=f)
216 print("echo \"Hostname =$(hostname -s)\"", file=f)
217 print("echo \"Working directory =$(pwd)\"", file=f)
218 print("echo \"\"", file=f)
219 print("sleep $(($OAR_JOB_WALLTIME_SECONDS-60))", file=f)
220 os.chmod(script_filename, 0o744)
222 maybe_job_name = ["--name", "melissa-container"]
224 oar_command = (
225 [
226 "oarsub",
227 "--scanscript",
228 "--directory={:s}".format(os.getcwd()),
229 ]
230 + contr_submission_opt
231 + contr_resource_cmd
232 + maybe_job_name
233 + ["--", os.path.abspath(script_filename)]
234 )
236 # since container jobs are specific to this scheduler it must
237 # be launched manually from here
238 logger.debug(f"launching {oar_command}")
239 with open(stdout_filename, "w") as stdout:
240 with open(stderr_filename, "w") as stderr:
241 proc = subprocess.Popen(
242 args=oar_command,
243 env=os.environ,
244 stdin=subprocess.DEVNULL,
245 stdout=stdout,
246 stderr=stderr,
247 universal_newlines=True,
248 )
249 logger.debug("submitting job uid container")
250 proc.wait()
252 # the corresponding CompletedProcess and Job are made here too
253 with open(stdout_filename, "r") as f:
254 stdout = f.read()
255 with open(stderr_filename, "r") as f:
256 stderr = f.read()
257 oarsub = CompletedProcess(exit_status=proc.returncode, stdout=stdout, stderr=stderr)
258 self.container.container_job = self._make_job_impl(oarsub, "container")
259 logger.debug(
260 f"job submission succeeded (UID container ID {self.container.container_job.id()})"
261 )
263 def _submit_job_impl_server(
264 self,
265 commands: List[ArgumentList],
266 env: Environment,
267 options: Options,
268 name: str,
269 uid: int,
270 ) -> Tuple[ArgumentList, Environment]:
271 sched_cmd = options.sched_cmd
272 sched_cmd_opt = options.sched_cmd_opt
274 if self.mpi_provider == "openmpi":
275 machinefile_arg = ["-machinefile", '"$OAR_NODE_FILE"']
276 else:
277 fmt = "OAR scheduler implementation for MPI provider '{:s}' missing"
278 raise NotImplementedError(fmt.format(self.mpi_provider))
280 # assemble mpirun arguments
281 mpirun_args = []
283 # gather environment variables
284 oar_env: Dict = {}
285 oar_env.update(env)
287 for key in ["LD_LIBRARY_PATH", "PATH"]:
288 if key not in oar_env and key in os.environ:
289 oar_env[key] = os.environ[key]
291 if oar_env:
292 env_args = ["env"] + ["%s=%s" % (key, oar_env[key]) for key in oar_env]
293 else:
294 env_args = []
296 for i, cmd in enumerate(commands):
297 args = sched_cmd_opt + ["--"] + env_args + cmd
298 args_str = " ".join(args)
299 mpirun_args.append(args_str)
301 mpirun_command = (
302 "exec " + sched_cmd
303 + " "
304 + " ".join(machinefile_arg)
305 + " \\\n "
306 + " : \\\n ".join(mpirun_args)
307 )
309 def opt2cmd(raw_arguments: List[str]) -> Tuple[str, List[str]]:
310 """
311 this function scans all client/server options to derive the resource and
312 submission commands.
313 """
314 oar_resource_cmd = ""
315 oar_submission_opt = []
316 for o in raw_arguments:
317 if o[:2] in ["-t", "-p", "-q"]:
318 oar_submission_opt += [o[:2], o[3:]]
319 else:
320 oar_resource_cmd += o
321 return oar_resource_cmd, oar_submission_opt
323 server_resource_cmd, server_submission_opt = opt2cmd(options.raw_arguments)
325 Path('./oarsub').mkdir(parents=True, exist_ok=True)
326 script_filename = "./oarsub/oarsub.%d.sh" % uid
327 # oarsub cannot handle single quotes!
328 with open(script_filename, "w") as f:
329 print("#!/bin/sh", file=f)
330 print("#OAR -O ./stdout/oar.%d.out" % uid, file=f)
331 print("#OAR -E ./stdout/oar.%d.err" % uid, file=f)
332 print(mpirun_command, file=f)
333 os.chmod(script_filename, 0o744)
335 maybe_job_name = ["--name", name] if name else []
337 assert self.container.container_job
339 oar_command = (
340 [
341 "oarsub",
342 "--scanscript",
343 "--directory={:s}".format(os.getcwd()),
344 ]
345 + ["-t", "inner=" + str(self.container.container_job.id())]
346 + server_submission_opt
347 + ["-l", server_resource_cmd]
348 + maybe_job_name
349 + ["--", os.path.abspath(script_filename)]
350 )
352 return oar_command, os.environ
354 def _submit_job_impl_client(
355 self,
356 commands: List[ArgumentList],
357 env: Environment,
358 options: Options,
359 name: str,
360 uid: int,
361 ) -> Tuple[ArgumentList, Environment]:
362 sched_cmd = options.sched_cmd
363 sched_cmd_opt = options.sched_cmd_opt
365 if self.mpi_provider == "openmpi":
366 machinefile_arg = ["-machinefile", '"$OAR_NODE_FILE"']
367 else:
368 fmt = "OAR scheduler implementation for MPI provider '{:s}' missing"
369 raise NotImplementedError(fmt.format(self.mpi_provider))
371 # assemble mpirun arguments
372 mpirun_args = []
374 # gather environment variables
375 oar_env: Dict[str, str] = {}
376 oar_env.update(env)
378 for key in ["LD_LIBRARY_PATH", "PATH"]:
379 if key not in oar_env and key in os.environ:
380 oar_env[key] = os.environ[key]
382 if oar_env:
383 env_args = ["env"] + ["%s=%s" % (key, oar_env[key]) for key in oar_env]
384 else:
385 env_args = []
387 for cmd in commands:
388 args = sched_cmd_opt + ["--"] + env_args + cmd
389 args_str = " ".join(args)
390 mpirun_args.append(args_str)
392 mpirun_command = (
393 "exec " + sched_cmd
394 + " "
395 + " ".join(machinefile_arg)
396 + " \\\n "
397 + " : \\\n ".join(mpirun_args)
398 )
400 def opt2cmd(raw_arguments: List[str]) -> Tuple[str, List[str]]:
401 """
402 this function scans all client/server options to derive the resource and
403 submission commands.
404 """
405 oar_resource_cmd = ""
406 oar_submission_opt = []
407 for o in raw_arguments:
408 if o[:2] in ["-t", "-p", "-q"]:
409 oar_submission_opt += [o[:2], o[3:]]
410 else:
411 oar_resource_cmd += o
412 return oar_resource_cmd, oar_submission_opt
414 client_resource_cmd, client_submission_opt = opt2cmd(options.raw_arguments)
416 Path('./oarsub').mkdir(parents=True, exist_ok=True)
417 script_filename = "./oarsub/oarsub.%d.sh" % uid
418 # oarsub cannot handle single quotes!
419 with open(script_filename, "w") as f:
420 print("#!/bin/sh", file=f)
421 print("#OAR -O ./stdout/oar.%d.out" % uid, file=f)
422 print("#OAR -E ./stdout/oar.%d.err" % uid, file=f)
423 print(mpirun_command, file=f)
424 os.chmod(script_filename, 0o744)
426 maybe_job_name = ["--name", name] if name else []
428 # queue selection conditions depend on the launching phase (1st or 2nd) which is inferred
429 # from the size of the terminated client list (=0 => 1st phase >0 => 2nd phase)
430 # -> in the 1st phase: is the submission counter a multiple of the frequency parameter ?
431 # -> in the 2nd phase: are there less clients in the container than there is room in it ?
432 fst_cond = (
433 len(self.terminated_client_uid) == 0
434 and self.client_ctr % self.container.besteffort_alloc_freq == 0
435 )
436 n_client_in_cont = len(
437 [uid for uid in self.container_uid if uid not in self.terminated_client_uid]
438 )
439 snd_cond = (
440 len(self.terminated_client_uid) > 0
441 and n_client_in_cont >= self.container.container_client_size
442 )
443 if fst_cond or snd_cond:
444 logger.debug(f"first condition: {fst_cond}, second condition: {snd_cond}")
445 oar_command = (
446 [
447 "oarsub",
448 "--scanscript",
449 "--directory={:s}".format(os.getcwd()),
450 ]
451 + ["-t", "besteffort"]
452 + client_submission_opt
453 + ["-l", client_resource_cmd]
454 + maybe_job_name
455 + ["--", os.path.abspath(script_filename)]
456 )
457 else:
458 assert self.container.container_job
459 self.container_uid.append(uid)
460 oar_command = (
461 [
462 "oarsub",
463 "--scanscript",
464 "--directory={:s}".format(os.getcwd()),
465 ]
466 + ["-t", "inner=" + str(self.container.container_job.id())]
467 + client_submission_opt
468 + ["-l", client_resource_cmd]
469 + maybe_job_name
470 + ["--", os.path.abspath(script_filename)]
471 )
473 return oar_command, os.environ
475 def _make_job_impl(self, oarsub: CompletedProcess, uid: Union[int, str]) -> OarJob:
476 if oarsub.stderr != "":
477 logger.warning(f"oarsub (uid={uid}): {oarsub.stderr}")
478 if oarsub.exit_status != 0:
479 raise RuntimeError("oarsub failed (exit status {oarsub.exit_status})")
481 # do not match because oarsub may print lines starting with
482 # [ADMISSION RULE]
483 match = self._oarsub_job_id_pattern.search(oarsub.stdout)
484 if match is None:
485 raise ValueError("no job id found in oarsub output: '{:s}'".format(oarsub.stdout))
487 if type(uid) is str or uid == self.server_uid:
488 return OarJob(uid, int(match.group(1)), "default")
489 else:
490 self.submitted_client_uid.append(int(uid))
491 return OarJob(
492 uid,
493 int(match.group(1)),
494 "default" if uid in self.container_uid else "best-effort",
495 )
497 def _update_jobs_impl(self, jobs: List[OarJob]) -> Tuple[ArgumentList, Environment]:
498 assert self.container.container_job
499 job_args = ["--job={:d}".format(j.id()) for j in jobs + [self.container.container_job]]
500 # `oarstat --state` does not distinguish between failed and successful
501 # jobs
502 return ["oarstat", "--full", "--json"] + job_args, os.environ
504 def _parse_update_jobs_impl(self, jobs: List[OarJob], oarstat: CompletedProcess) -> None:
505 states_map = {
506 "Error": State.ERROR,
507 "Finishing": State.TERMINATED,
508 "Hold": State.WAITING,
509 "Launching": State.WAITING,
510 "Running": State.RUNNING,
511 "Terminated": State.TERMINATED,
512 "toAckReservation": State.WAITING,
513 "toLaunch": State.WAITING,
514 "Waiting": State.WAITING,
515 }
517 info = json.loads(oarstat.stdout)
519 assert self.container.container_job
520 for j in jobs + [self.container.container_job]:
521 key = str(j.id())
522 if key not in info:
523 continue
525 oar_state = info[key]["state"]
526 new_state = states_map.get(oar_state)
527 if new_state is None:
528 fmt = "unknown OAR job state '{:s}'"
529 raise RuntimeError(fmt.format(oar_state))
531 if new_state == State.TERMINATED:
532 assert "exit_code" in info[key]
533 # when a BESTEFFORT_KILL event is trigerred by OAR
534 # the state becomes finishing with a null (i.e. None) exit_code
535 if info[key]["exit_code"] is not None and int(info[key]["exit_code"]) == 0:
536 j.set_state(State.TERMINATED)
537 self.terminated_client_uid.append(int(j.unique_id()))
538 else:
539 if info[key]["exit_code"] is None:
540 logger.debug(f"best-effort job {j.id()} was killed")
541 j.set_state(State.FAILED)
542 elif new_state == State.ERROR and j.queue == "best-effort":
543 # State.Failed will result in a job resubmission request from the server
544 logger.debug(f"best-effort job {j.id()} was killed")
545 j.set_state(State.FAILED)
546 else:
547 j.set_state(new_state)
549 def _cancel_jobs_impl(self, jobs: List[OarJob]) -> Tuple[ArgumentList, Environment]:
550 jobs_str = ["{:d}".format(job.id()) for job in jobs]
551 if self.server_uid in [job.unique_id() for job in jobs]:
552 assert self.container.container_job
553 logger.debug("Cancelling container")
554 jobs_str.append(str(self.container.container_job.id()))
555 self.container.container_job = None
556 return ["oardel"] + jobs_str, os.environ
558 def _parse_cancel_jobs_impl(self, jobs: List[OarJob], proc: CompletedProcess) -> None:
559 job_already_killed_code = 6
560 if proc.exit_status not in [0, job_already_killed_code]:
561 raise RuntimeError("oardel error: exit status {:d}".format(proc.exit_status))