Coverage for melissa/scheduler/scheduler.py: 67%
135 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
1#!/usr/bin/python3
3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31__all__ = ["DirectScheduler", "IndirectScheduler", "HybridScheduler", "Options", "Scheduler"]
33import re
34from typing import cast, Dict, Generic, List, Optional, Tuple, TypeVar, Union
35from melissa.utility.process import ArgumentList, CompletedProcess, Environment, Process
37from .options import Options
39JobT = TypeVar("JobT")
42class Scheduler(Generic[JobT]):
43 _valid_name_pattern = re.compile("[A-Za-z0-9_-]+", re.ASCII)
45 @classmethod
46 def name(cls) -> str:
47 """
48 Returns a scheduler name for use by programs.
49 The name must contain only ASCII characters and no whitespace.
50 """
51 name = cls._name_impl()
52 if cls._valid_name_pattern.fullmatch(name) is None:
53 raise RuntimeError("invalid scheduler name: '%s'" % name)
54 return name
56 @classmethod
57 def is_direct(cls) -> bool:
58 raise NotImplementedError("Scheduler.is_direct not implemented")
60 @classmethod
61 def is_hybrid(cls) -> bool:
62 raise NotImplementedError("Scheduler.is_hybrid not implemented")
64 @classmethod
65 def is_available(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
66 """
67 Returns a triple containing True, the scheduler path, and scheduler
68 version if available; a pair containing False, and a reason otherwise.
69 """
70 return cls._is_available_impl()
72 def sanity_check(self, options: Options) -> List[str]:
73 return self._sanity_check_impl(options)
75 def submit_job(
76 self,
77 commands: List[ArgumentList],
78 env: Dict[str, Optional[str]],
79 options: Options,
80 name: str,
81 unique_id: int,
82 ) -> Tuple[ArgumentList, Environment]:
83 if commands == []:
84 raise ValueError("expected at least one command, got none")
86 # allow None as dictionary value so that callers can pass, for example,
87 # env = { "PYTHONPATH": os.getenv("PYTHONPATH") }
88 # without having to check if the environment variable was set
89 env_clean = dict([(k, cast(str, env[k])) for k in env.keys() if env[k] is not None])
91 return self._submit_job_impl(commands, env_clean, options, name, unique_id)
93 @classmethod
94 def _name_impl(cls) -> str:
95 raise NotImplementedError("BUG missing implementation")
97 @classmethod
98 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
99 raise NotImplementedError("BUG missing implementation")
101 def _sanity_check_impl(self, options: Options) -> List[str]:
102 return []
104 def _submit_job_impl(
105 self,
106 commands: List[ArgumentList],
107 env: Environment,
108 options: Options,
109 name: str,
110 unique_id: int,
111 ) -> Tuple[ArgumentList, Environment]:
112 raise NotImplementedError("BUG missing implementation")
115class DirectScheduler(Generic[JobT], Scheduler[JobT]):
116 @classmethod
117 def is_direct(cls) -> bool:
118 return True
120 @classmethod
121 def is_hybrid(cls) -> bool:
122 return False
124 def make_job(self, proc: "Process[str]", unique_id: int) -> JobT:
125 return self._make_job_impl(proc, unique_id)
127 @classmethod
128 def update_jobs(cls, jobs: List[JobT]) -> None:
129 """
130 This method updates the status of the given jobs.
132 The method is a classmethod because it is not supposed any state. It
133 might be executed concurrently with other scheduler methods.
134 """
135 if jobs == []:
136 raise ValueError("the list of jobs must not be empty")
137 return cls._update_jobs_impl(jobs)
139 @classmethod
140 def cancel_jobs(cls, jobs: List[JobT]) -> None:
141 """
142 This method cancels the given jobs.
144 The method is a classmethod because it is not supposed any state. It
145 might be executed concurrently with other scheduler methods.
146 """
147 if jobs == []:
148 raise ValueError("the list of jobs must not be empty")
149 return cls._cancel_jobs_impl(jobs)
151 def _make_job_impl(self, proc: "Process[str]", unique_id: int) -> JobT:
152 raise NotImplementedError("BUG missing implementation")
154 @classmethod
155 def _update_jobs_impl(cls, jobs: List[JobT]) -> None:
156 raise NotImplementedError("BUG missing implementation")
158 @classmethod
159 def _cancel_jobs_impl(cls, jobs: List[JobT]) -> None:
160 raise NotImplementedError("BUG missing implementation")
163class IndirectScheduler(Generic[JobT], Scheduler[JobT]):
164 @classmethod
165 def is_direct(cls) -> bool:
166 return False
168 @classmethod
169 def is_hybrid(cls) -> bool:
170 return False
172 def make_job(self, proc: CompletedProcess, unique_id: int) -> JobT:
173 return self._make_job_impl(proc, unique_id)
175 def update_jobs(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]:
176 return self._update_jobs_impl(jobs)
178 def cancel_jobs(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]:
179 return self._cancel_jobs_impl(jobs)
181 def parse_update_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None:
182 return self._parse_update_jobs_impl(jobs, proc)
184 def parse_cancel_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None:
185 return self._parse_cancel_jobs_impl(jobs, proc)
187 def _make_job_impl(self, proc: CompletedProcess, unique_id: int) -> JobT:
188 raise NotImplementedError("BUG missing implementation")
190 def _update_jobs_impl(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]:
191 raise NotImplementedError("BUG missing implementation")
193 def _cancel_jobs_impl(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]:
194 raise NotImplementedError("BUG missing implementation")
196 def _parse_update_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None:
197 raise NotImplementedError("BUG missing implementation")
199 def _parse_cancel_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None:
200 raise NotImplementedError("BUG missing implementation")
203class HybridScheduler(Generic[JobT], Scheduler[JobT]):
204 """
205 This class mixes both direct and indirect schedulers. The purpose is to enable
206 scheduling the server in an indirect fashion while the clients are submitted
207 directly i.e. clients and server jobs are viewed as direct and indirect.
209 It follows the direct scheduler methods except for the job cancellation which
210 treats the server job the way an indirect scheduler would.
211 """
212 @classmethod
213 def is_direct(cls) -> bool:
214 return True
216 @classmethod
217 def is_hybrid(cls) -> bool:
218 return True
220 def make_job(self, proc: "Process[str]", unique_id: int) -> JobT:
221 return self._make_job_impl(proc, unique_id)
223 @classmethod
224 def update_jobs(cls, jobs: List[JobT]) -> None:
225 """
226 This method updates the status of the given jobs.
228 The method is a classmethod because it is not supposed any state. It
229 might be executed concurrently with other scheduler methods.
230 """
231 if jobs == []:
232 raise ValueError("the list of jobs must not be empty")
233 return cls._update_jobs_impl(jobs)
235 @classmethod
236 def cancel_client_jobs(cls, jobs: List[JobT]) -> None:
237 """
238 This method cancels the given jobs.
240 The method is a classmethod because it is not supposed any state. It
241 might be executed concurrently with other scheduler methods.
242 """
243 if jobs == []:
244 raise ValueError("the list of jobs must not be empty")
245 return cls._cancel_client_jobs_impl(jobs)
247 @classmethod
248 def cancel_server_job(cls, jobs: List[JobT]) -> Optional[Tuple[ArgumentList, Environment]]:
249 """
250 This method cancels the given jobs.
252 The method is a classmethod because it is not supposed any state. It
253 might be executed concurrently with other scheduler methods.
254 """
255 if jobs == []:
256 raise ValueError("the list of jobs must not be empty")
257 return cls._cancel_server_job_impl(jobs)
259 def parse_update_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None:
260 raise NotImplementedError("BUG missing implementation")
262 def parse_cancel_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None:
263 return self._parse_cancel_jobs_impl(jobs, proc)
265 def _make_job_impl(self, proc: "Process[str]", unique_id: int) -> JobT:
266 raise NotImplementedError("BUG missing implementation")
268 @classmethod
269 def _update_jobs_impl(cls, jobs: List[JobT]) -> None:
270 raise NotImplementedError("BUG missing implementation")
272 @classmethod
273 def _cancel_client_jobs_impl(cls, jobs: List[JobT]) -> None:
274 raise NotImplementedError("BUG missing implementation")
276 @classmethod
277 def _cancel_server_job_impl(
278 cls, jobs: List[JobT]
279 ) -> Optional[Tuple[ArgumentList, Environment]]:
280 raise NotImplementedError("BUG missing implementation")
282 def _parse_cancel_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None:
283 raise NotImplementedError("BUG missing implementation")