Coverage for melissa/scheduler/unreliable_scheduler.py: 0%
51 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-09-22 10:36 +0000
1#!/usr/bin/python3
3# Copyright (c) 2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31from random import Random
32from typing import List, Optional, Tuple, Union
34from .scheduler import IndirectScheduler, Options
35from .slurm import SlurmJob, SlurmScheduler
36from melissa.utility.time import Time
37from melissa.utility.process import ArgumentList, CompletedProcess, Environment
40class UnreliableScheduler(IndirectScheduler[SlurmJob]):
41 """
42 This class executes the commands of an indirect scheduler with a fixed
43 delay for testing purposes (i.e., for testing asynchronicity).
44 """
46 min_delay = Time(seconds=1)
47 max_delay = Time(seconds=5)
48 fail_probability = 0.1
50 @classmethod
51 def _name_impl(cls) -> str:
52 return "unreliable-scheduler"
54 @classmethod
55 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
56 return SlurmScheduler.is_available()
58 def __init__(self, seed: Optional[int] = None) -> None:
59 self._rng = Random()
60 self._rng.seed(seed)
61 self._impl = SlurmScheduler()
63 def _sanity_check_impl(self, options: Options) -> List[str]:
64 return self._impl.sanity_check(options)
66 def _modify_args(self, args: ArgumentList) -> ArgumentList:
67 assert self.fail_probability >= 0
68 assert self.fail_probability <= 1
70 min_delay_s = self.min_delay.total_seconds()
71 max_delay_s = self.max_delay.total_seconds()
72 sleep_before = self._rng.uniform(min_delay_s / 2, max_delay_s / 2)
73 sleep_after = self._rng.uniform(min_delay_s / 2, max_delay_s / 2)
75 if self._rng.random() <= self.fail_probability:
76 ret = self._rng.randint(2, 127)
77 args = ["exit", str(ret)]
79 def sleep(t_sec: float) -> ArgumentList:
80 return ["sleep", str(t_sec)]
82 args_mod = sleep(sleep_before) + ["&&"] + args + ["&&"] + sleep(sleep_after)
84 return ["sh", "-c", " ".join(args_mod)]
86 def _submit_job_impl(
87 self, commands: List[ArgumentList], env: Environment, options: Options, name: str, uid: int
88 ) -> Tuple[ArgumentList, Environment]:
89 args, env = self._impl._submit_job_impl(commands, env, options, name, uid)
90 return self._modify_args(args), env
92 def _make_job_impl(self, proc: CompletedProcess, uid: int) -> SlurmJob:
93 return self._impl.make_job(proc, uid)
95 def _update_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]:
96 args, env = self._impl.update_jobs(jobs)
97 return self._modify_args(args), env
99 def _parse_update_jobs_impl(self, jobs: List[SlurmJob], proc: CompletedProcess) -> None:
100 self._impl.parse_update_jobs(jobs, proc)
102 def _cancel_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]:
103 args, env = self._impl.cancel_jobs(jobs)
104 return self._modify_args(args), env
106 def _parse_cancel_jobs_impl(self, jobs: List[SlurmJob], proc: CompletedProcess) -> None:
107 self._impl.parse_cancel_jobs(jobs, proc)