Coverage for melissa/scheduler/scheduler.py: 70%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-11-03 09:52 +0100

1#!/usr/bin/python3 

2 

3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31__all__ = ["DirectScheduler", "IndirectScheduler", "HybridScheduler", "Options", "Scheduler"] 

32 

33import re 

34from typing import cast, Dict, Generic, List, Optional, Tuple, TypeVar, Union, Any 

35from melissa.utility.process import ArgumentList, CompletedProcess, Environment, Process 

36 

37from .options import Options 

38 

39JobT = TypeVar("JobT") 

40 

41 

42class Scheduler(Generic[JobT]): 

43 _valid_name_pattern = re.compile("[A-Za-z0-9_-]+", re.ASCII) 

44 

45 @classmethod 

46 def name(cls) -> str: 

47 """ 

48 Returns a scheduler name for use by programs. 

49 The name must contain only ASCII characters and no whitespace. 

50 """ 

51 name = cls._name_impl() 

52 if cls._valid_name_pattern.fullmatch(name) is None: 

53 raise RuntimeError("invalid scheduler name: '%s'" % name) 

54 return name 

55 

56 @classmethod 

57 def is_direct(cls) -> bool: 

58 raise NotImplementedError("Scheduler.is_direct not implemented") 

59 

60 @classmethod 

61 def is_hybrid(cls) -> bool: 

62 raise NotImplementedError("Scheduler.is_hybrid not implemented") 

63 

64 @classmethod 

65 def is_available(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

66 """ 

67 Returns a triple containing True, the scheduler path, and scheduler 

68 version if available; a pair containing False, and a reason otherwise. 

69 """ 

70 return cls._is_available_impl() 

71 

72 def sanity_check(self, options: Options) -> List[str]: 

73 return self._sanity_check_impl(options) 

74 

75 def submit_job( 

76 self, 

77 commands: List[ArgumentList], 

78 env: Dict[str, Optional[str]], 

79 options: Options, 

80 name: str, 

81 unique_id: int, 

82 ) -> Tuple[ArgumentList, Environment]: 

83 if commands == []: 

84 raise ValueError("expected at least one command, got none") 

85 

86 # allow None as dictionary value so that callers can pass, for example, 

87 # env = { "PYTHONPATH": os.getenv("PYTHONPATH") } 

88 # without having to check if the environment variable was set 

89 env_clean = dict([(k, cast(str, env[k])) for k in env.keys() if env[k] is not None]) 

90 

91 return self._submit_job_impl(commands, env_clean, options, name, unique_id) 

92 

93 @classmethod 

94 def _name_impl(cls) -> str: 

95 raise NotImplementedError("BUG missing implementation") 

96 

97 @classmethod 

98 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

99 raise NotImplementedError("BUG missing implementation") 

100 

101 def _sanity_check_impl(self, options: Options) -> List[str]: 

102 return [] 

103 

104 def _submit_job_impl( 

105 self, 

106 commands: List[ArgumentList], 

107 env: Environment, 

108 options: Options, 

109 name: str, 

110 unique_id: int, 

111 ) -> Tuple[ArgumentList, Environment]: 

112 raise NotImplementedError("BUG missing implementation") 

113 

114 

115class DirectScheduler(Generic[JobT], Scheduler[JobT]): 

116 @classmethod 

117 def is_direct(cls) -> bool: 

118 return True 

119 

120 @classmethod 

121 def is_hybrid(cls) -> bool: 

122 return False 

123 

124 def make_job(self, proc: "Process[str]", unique_id: int, **kwargs: "dict[str, Any]") -> JobT: 

125 return self._make_job_impl(proc, unique_id, **kwargs) 

126 

127 @classmethod 

128 def update_jobs(cls, jobs: List[JobT]) -> None: 

129 """ 

130 This method updates the status of the given jobs. 

131 

132 The method is a classmethod because it is not supposed any state. It 

133 might be executed concurrently with other scheduler methods. 

134 """ 

135 if jobs == []: 

136 raise ValueError("the list of jobs must not be empty") 

137 return cls._update_jobs_impl(jobs) 

138 

139 @classmethod 

140 def cancel_jobs(cls, jobs: List[JobT]) -> None: 

141 """ 

142 This method cancels the given jobs. 

143 

144 The method is a classmethod because it is not supposed any state. It 

145 might be executed concurrently with other scheduler methods. 

146 """ 

147 if jobs == []: 

148 raise ValueError("the list of jobs must not be empty") 

149 return cls._cancel_jobs_impl(jobs) 

150 

151 def _make_job_impl(self, proc: "Process[str]", unique_id: int, 

152 **kwargs: "dict[str, Any]") -> JobT: 

153 raise NotImplementedError("BUG missing implementation") 

154 

155 @classmethod 

156 def _update_jobs_impl(cls, jobs: List[JobT]) -> None: 

157 raise NotImplementedError("BUG missing implementation") 

158 

159 @classmethod 

160 def _cancel_jobs_impl(cls, jobs: List[JobT]) -> None: 

161 raise NotImplementedError("BUG missing implementation") 

162 

163 

164class IndirectScheduler(Generic[JobT], Scheduler[JobT]): 

165 @classmethod 

166 def is_direct(cls) -> bool: 

167 return False 

168 

169 @classmethod 

170 def is_hybrid(cls) -> bool: 

171 return False 

172 

173 def make_job(self, proc: CompletedProcess, unique_id: int) -> JobT: 

174 return self._make_job_impl(proc, unique_id) 

175 

176 def update_jobs(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]: 

177 return self._update_jobs_impl(jobs) 

178 

179 def cancel_jobs(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]: 

180 return self._cancel_jobs_impl(jobs) 

181 

182 def parse_update_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

183 return self._parse_update_jobs_impl(jobs, proc) 

184 

185 def parse_cancel_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

186 return self._parse_cancel_jobs_impl(jobs, proc) 

187 

188 def _make_job_impl(self, proc: CompletedProcess, unique_id: int) -> JobT: 

189 raise NotImplementedError("BUG missing implementation") 

190 

191 def _update_jobs_impl(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]: 

192 raise NotImplementedError("BUG missing implementation") 

193 

194 def _cancel_jobs_impl(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]: 

195 raise NotImplementedError("BUG missing implementation") 

196 

197 def _parse_update_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

198 raise NotImplementedError("BUG missing implementation") 

199 

200 def _parse_cancel_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

201 raise NotImplementedError("BUG missing implementation") 

202 

203 

204class HybridScheduler(Generic[JobT], Scheduler[JobT]): 

205 """ 

206 This class mixes both direct and indirect schedulers. The purpose is to enable 

207 scheduling the server in an indirect fashion while the clients are submitted 

208 directly i.e. clients and server jobs are viewed as direct and indirect. 

209 

210 It follows the direct scheduler methods except for the job cancellation which 

211 treats the server job the way an indirect scheduler would. 

212 """ 

213 @classmethod 

214 def is_direct(cls) -> bool: 

215 return True 

216 

217 @classmethod 

218 def is_hybrid(cls) -> bool: 

219 return True 

220 

221 def make_job(self, proc: "Process[str]", unique_id: int, **kwargs: "dict[str, Any]") -> JobT: 

222 return self._make_job_impl(proc, unique_id, **kwargs) 

223 

224 @classmethod 

225 def update_jobs(cls, jobs: List[JobT]) -> None: 

226 """ 

227 This method updates the status of the given jobs. 

228 

229 The method is a classmethod because it is not supposed any state. It 

230 might be executed concurrently with other scheduler methods. 

231 """ 

232 if jobs == []: 

233 raise ValueError("the list of jobs must not be empty") 

234 return cls._update_jobs_impl(jobs) 

235 

236 @classmethod 

237 def cancel_client_jobs(cls, jobs: List[JobT]) -> None: 

238 """ 

239 This method cancels the given jobs. 

240 

241 The method is a classmethod because it is not supposed any state. It 

242 might be executed concurrently with other scheduler methods. 

243 """ 

244 if jobs == []: 

245 raise ValueError("the list of jobs must not be empty") 

246 return cls._cancel_client_jobs_impl(jobs) 

247 

248 @classmethod 

249 def cancel_server_job(cls, jobs: List[JobT]) -> Optional[Tuple[ArgumentList, Environment]]: 

250 """ 

251 This method cancels the given jobs. 

252 

253 The method is a classmethod because it is not supposed any state. It 

254 might be executed concurrently with other scheduler methods. 

255 """ 

256 if jobs == []: 

257 raise ValueError("the list of jobs must not be empty") 

258 return cls._cancel_server_job_impl(jobs) 

259 

260 def parse_update_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

261 raise NotImplementedError("BUG missing implementation") 

262 

263 def parse_cancel_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

264 return self._parse_cancel_jobs_impl(jobs, proc) 

265 

266 def _make_job_impl(self, proc: "Process[str]", unique_id: int, 

267 **kwargs: "dict[str, Any]") -> JobT: 

268 raise NotImplementedError("BUG missing implementation") 

269 

270 @classmethod 

271 def _update_jobs_impl(cls, jobs: List[JobT]) -> None: 

272 raise NotImplementedError("BUG missing implementation") 

273 

274 @classmethod 

275 def _cancel_client_jobs_impl(cls, jobs: List[JobT]) -> None: 

276 raise NotImplementedError("BUG missing implementation") 

277 

278 @classmethod 

279 def _cancel_server_job_impl( 

280 cls, jobs: List[JobT] 

281 ) -> Optional[Tuple[ArgumentList, Environment]]: 

282 raise NotImplementedError("BUG missing implementation") 

283 

284 def _parse_cancel_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

285 raise NotImplementedError("BUG missing implementation")