Coverage for melissa/scheduler/scheduler.py: 70%
135 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-11-03 09:52 +0100
« prev ^ index » next coverage.py v7.10.1, created at 2025-11-03 09:52 +0100
1#!/usr/bin/python3
3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31__all__ = ["DirectScheduler", "IndirectScheduler", "HybridScheduler", "Options", "Scheduler"]
33import re
34from typing import cast, Dict, Generic, List, Optional, Tuple, TypeVar, Union, Any
35from melissa.utility.process import ArgumentList, CompletedProcess, Environment, Process
37from .options import Options
39JobT = TypeVar("JobT")
42class Scheduler(Generic[JobT]):
43 _valid_name_pattern = re.compile("[A-Za-z0-9_-]+", re.ASCII)
45 @classmethod
46 def name(cls) -> str:
47 """
48 Returns a scheduler name for use by programs.
49 The name must contain only ASCII characters and no whitespace.
50 """
51 name = cls._name_impl()
52 if cls._valid_name_pattern.fullmatch(name) is None:
53 raise RuntimeError("invalid scheduler name: '%s'" % name)
54 return name
56 @classmethod
57 def is_direct(cls) -> bool:
58 raise NotImplementedError("Scheduler.is_direct not implemented")
60 @classmethod
61 def is_hybrid(cls) -> bool:
62 raise NotImplementedError("Scheduler.is_hybrid not implemented")
64 @classmethod
65 def is_available(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
66 """
67 Returns a triple containing True, the scheduler path, and scheduler
68 version if available; a pair containing False, and a reason otherwise.
69 """
70 return cls._is_available_impl()
72 def sanity_check(self, options: Options) -> List[str]:
73 return self._sanity_check_impl(options)
75 def submit_job(
76 self,
77 commands: List[ArgumentList],
78 env: Dict[str, Optional[str]],
79 options: Options,
80 name: str,
81 unique_id: int,
82 ) -> Tuple[ArgumentList, Environment]:
83 if commands == []:
84 raise ValueError("expected at least one command, got none")
86 # allow None as dictionary value so that callers can pass, for example,
87 # env = { "PYTHONPATH": os.getenv("PYTHONPATH") }
88 # without having to check if the environment variable was set
89 env_clean = dict([(k, cast(str, env[k])) for k in env.keys() if env[k] is not None])
91 return self._submit_job_impl(commands, env_clean, options, name, unique_id)
93 @classmethod
94 def _name_impl(cls) -> str:
95 raise NotImplementedError("BUG missing implementation")
97 @classmethod
98 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]:
99 raise NotImplementedError("BUG missing implementation")
101 def _sanity_check_impl(self, options: Options) -> List[str]:
102 return []
104 def _submit_job_impl(
105 self,
106 commands: List[ArgumentList],
107 env: Environment,
108 options: Options,
109 name: str,
110 unique_id: int,
111 ) -> Tuple[ArgumentList, Environment]:
112 raise NotImplementedError("BUG missing implementation")
115class DirectScheduler(Generic[JobT], Scheduler[JobT]):
116 @classmethod
117 def is_direct(cls) -> bool:
118 return True
120 @classmethod
121 def is_hybrid(cls) -> bool:
122 return False
124 def make_job(self, proc: "Process[str]", unique_id: int, **kwargs: "dict[str, Any]") -> JobT:
125 return self._make_job_impl(proc, unique_id, **kwargs)
127 @classmethod
128 def update_jobs(cls, jobs: List[JobT]) -> None:
129 """
130 This method updates the status of the given jobs.
132 The method is a classmethod because it is not supposed any state. It
133 might be executed concurrently with other scheduler methods.
134 """
135 if jobs == []:
136 raise ValueError("the list of jobs must not be empty")
137 return cls._update_jobs_impl(jobs)
139 @classmethod
140 def cancel_jobs(cls, jobs: List[JobT]) -> None:
141 """
142 This method cancels the given jobs.
144 The method is a classmethod because it is not supposed any state. It
145 might be executed concurrently with other scheduler methods.
146 """
147 if jobs == []:
148 raise ValueError("the list of jobs must not be empty")
149 return cls._cancel_jobs_impl(jobs)
151 def _make_job_impl(self, proc: "Process[str]", unique_id: int,
152 **kwargs: "dict[str, Any]") -> JobT:
153 raise NotImplementedError("BUG missing implementation")
155 @classmethod
156 def _update_jobs_impl(cls, jobs: List[JobT]) -> None:
157 raise NotImplementedError("BUG missing implementation")
159 @classmethod
160 def _cancel_jobs_impl(cls, jobs: List[JobT]) -> None:
161 raise NotImplementedError("BUG missing implementation")
164class IndirectScheduler(Generic[JobT], Scheduler[JobT]):
165 @classmethod
166 def is_direct(cls) -> bool:
167 return False
169 @classmethod
170 def is_hybrid(cls) -> bool:
171 return False
173 def make_job(self, proc: CompletedProcess, unique_id: int) -> JobT:
174 return self._make_job_impl(proc, unique_id)
176 def update_jobs(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]:
177 return self._update_jobs_impl(jobs)
179 def cancel_jobs(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]:
180 return self._cancel_jobs_impl(jobs)
182 def parse_update_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None:
183 return self._parse_update_jobs_impl(jobs, proc)
185 def parse_cancel_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None:
186 return self._parse_cancel_jobs_impl(jobs, proc)
188 def _make_job_impl(self, proc: CompletedProcess, unique_id: int) -> JobT:
189 raise NotImplementedError("BUG missing implementation")
191 def _update_jobs_impl(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]:
192 raise NotImplementedError("BUG missing implementation")
194 def _cancel_jobs_impl(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]:
195 raise NotImplementedError("BUG missing implementation")
197 def _parse_update_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None:
198 raise NotImplementedError("BUG missing implementation")
200 def _parse_cancel_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None:
201 raise NotImplementedError("BUG missing implementation")
204class HybridScheduler(Generic[JobT], Scheduler[JobT]):
205 """
206 This class mixes both direct and indirect schedulers. The purpose is to enable
207 scheduling the server in an indirect fashion while the clients are submitted
208 directly i.e. clients and server jobs are viewed as direct and indirect.
210 It follows the direct scheduler methods except for the job cancellation which
211 treats the server job the way an indirect scheduler would.
212 """
213 @classmethod
214 def is_direct(cls) -> bool:
215 return True
217 @classmethod
218 def is_hybrid(cls) -> bool:
219 return True
221 def make_job(self, proc: "Process[str]", unique_id: int, **kwargs: "dict[str, Any]") -> JobT:
222 return self._make_job_impl(proc, unique_id, **kwargs)
224 @classmethod
225 def update_jobs(cls, jobs: List[JobT]) -> None:
226 """
227 This method updates the status of the given jobs.
229 The method is a classmethod because it is not supposed any state. It
230 might be executed concurrently with other scheduler methods.
231 """
232 if jobs == []:
233 raise ValueError("the list of jobs must not be empty")
234 return cls._update_jobs_impl(jobs)
236 @classmethod
237 def cancel_client_jobs(cls, jobs: List[JobT]) -> None:
238 """
239 This method cancels the given jobs.
241 The method is a classmethod because it is not supposed any state. It
242 might be executed concurrently with other scheduler methods.
243 """
244 if jobs == []:
245 raise ValueError("the list of jobs must not be empty")
246 return cls._cancel_client_jobs_impl(jobs)
248 @classmethod
249 def cancel_server_job(cls, jobs: List[JobT]) -> Optional[Tuple[ArgumentList, Environment]]:
250 """
251 This method cancels the given jobs.
253 The method is a classmethod because it is not supposed any state. It
254 might be executed concurrently with other scheduler methods.
255 """
256 if jobs == []:
257 raise ValueError("the list of jobs must not be empty")
258 return cls._cancel_server_job_impl(jobs)
260 def parse_update_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None:
261 raise NotImplementedError("BUG missing implementation")
263 def parse_cancel_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None:
264 return self._parse_cancel_jobs_impl(jobs, proc)
266 def _make_job_impl(self, proc: "Process[str]", unique_id: int,
267 **kwargs: "dict[str, Any]") -> JobT:
268 raise NotImplementedError("BUG missing implementation")
270 @classmethod
271 def _update_jobs_impl(cls, jobs: List[JobT]) -> None:
272 raise NotImplementedError("BUG missing implementation")
274 @classmethod
275 def _cancel_client_jobs_impl(cls, jobs: List[JobT]) -> None:
276 raise NotImplementedError("BUG missing implementation")
278 @classmethod
279 def _cancel_server_job_impl(
280 cls, jobs: List[JobT]
281 ) -> Optional[Tuple[ArgumentList, Environment]]:
282 raise NotImplementedError("BUG missing implementation")
284 def _parse_cancel_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None:
285 raise NotImplementedError("BUG missing implementation")