Coverage for melissa/scheduler/unreliable_scheduler.py: 0%

51 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31from random import Random 

32from typing import List, Optional, Tuple, Union 

33 

34from .scheduler import IndirectScheduler, Options 

35from .slurm import SlurmJob, SlurmScheduler 

36from melissa.utility.time import Time 

37from melissa.utility.process import ArgumentList, CompletedProcess, Environment 

38 

39 

40class UnreliableScheduler(IndirectScheduler[SlurmJob]): 

41 """ 

42 This class executes the commands of an indirect scheduler with a fixed 

43 delay for testing purposes (i.e., for testing asynchronicity). 

44 """ 

45 

46 min_delay = Time(seconds=1) 

47 max_delay = Time(seconds=5) 

48 fail_probability = 0.1 

49 

50 @classmethod 

51 def _name_impl(cls) -> str: 

52 return "unreliable-scheduler" 

53 

54 @classmethod 

55 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

56 return SlurmScheduler.is_available() 

57 

58 def __init__(self, seed: Optional[int] = None) -> None: 

59 self._rng = Random() 

60 self._rng.seed(seed) 

61 self._impl = SlurmScheduler() 

62 

63 def _sanity_check_impl(self, options: Options) -> List[str]: 

64 return self._impl.sanity_check(options) 

65 

66 def _modify_args(self, args: ArgumentList) -> ArgumentList: 

67 assert self.fail_probability >= 0 

68 assert self.fail_probability <= 1 

69 

70 min_delay_s = self.min_delay.total_seconds() 

71 max_delay_s = self.max_delay.total_seconds() 

72 sleep_before = self._rng.uniform(min_delay_s / 2, max_delay_s / 2) 

73 sleep_after = self._rng.uniform(min_delay_s / 2, max_delay_s / 2) 

74 

75 if self._rng.random() <= self.fail_probability: 

76 ret = self._rng.randint(2, 127) 

77 args = ["exit", str(ret)] 

78 

79 def sleep(t_sec: float) -> ArgumentList: 

80 return ["sleep", str(t_sec)] 

81 

82 args_mod = sleep(sleep_before) + ["&&"] + args + ["&&"] + sleep(sleep_after) 

83 

84 return ["sh", "-c", " ".join(args_mod)] 

85 

86 def _submit_job_impl( 

87 self, commands: List[ArgumentList], env: Environment, options: Options, name: str, uid: int 

88 ) -> Tuple[ArgumentList, Environment]: 

89 args, env = self._impl._submit_job_impl(commands, env, options, name, uid) 

90 return self._modify_args(args), env 

91 

92 def _make_job_impl(self, proc: CompletedProcess, uid: int) -> SlurmJob: 

93 return self._impl.make_job(proc, uid) 

94 

95 def _update_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]: 

96 args, env = self._impl.update_jobs(jobs) 

97 return self._modify_args(args), env 

98 

99 def _parse_update_jobs_impl(self, jobs: List[SlurmJob], proc: CompletedProcess) -> None: 

100 self._impl.parse_update_jobs(jobs, proc) 

101 

102 def _cancel_jobs_impl(self, jobs: List[SlurmJob]) -> Tuple[ArgumentList, Environment]: 

103 args, env = self._impl.cancel_jobs(jobs) 

104 return self._modify_args(args), env 

105 

106 def _parse_cancel_jobs_impl(self, jobs: List[SlurmJob], proc: CompletedProcess) -> None: 

107 self._impl.parse_cancel_jobs(jobs, proc)