Coverage for melissa/scheduler/scheduler.py: 70%

135 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31__all__ = ["DirectScheduler", "IndirectScheduler", "HybridScheduler", "Options", "Scheduler"] 

32 

33import re 

34from typing import cast, Dict, Generic, List, Optional, Tuple, TypeVar, Union 

35from melissa.utility.process import ArgumentList, CompletedProcess, Environment, Process 

36 

37from .options import Options 

38 

39JobT = TypeVar("JobT") 

40 

41 

42class Scheduler(Generic[JobT]): 

43 _valid_name_pattern = re.compile("[A-Za-z0-9_-]+", re.ASCII) 

44 

45 @classmethod 

46 def name(cls) -> str: 

47 """ 

48 Returns a scheduler name for use by programs. 

49 The name must contain only ASCII characters and no whitespace. 

50 """ 

51 name = cls._name_impl() 

52 if cls._valid_name_pattern.fullmatch(name) is None: 

53 raise RuntimeError("invalid scheduler name: '%s'" % name) 

54 return name 

55 

56 @classmethod 

57 def is_direct(cls) -> bool: 

58 raise NotImplementedError("Scheduler.is_direct not implemented") 

59 

60 @classmethod 

61 def is_hybrid(cls) -> bool: 

62 raise NotImplementedError("Scheduler.is_hybrid not implemented") 

63 

64 @classmethod 

65 def is_available(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

66 """ 

67 Returns a triple containing True, the scheduler path, and scheduler 

68 version if available; a pair containing False, and a reason otherwise. 

69 """ 

70 return cls._is_available_impl() 

71 

72 def sanity_check(self, options: Options) -> List[str]: 

73 return self._sanity_check_impl(options) 

74 

75 def submit_job( 

76 self, 

77 commands: List[ArgumentList], 

78 env: Dict[str, Optional[str]], 

79 options: Options, 

80 name: str, 

81 unique_id: int, 

82 ) -> Tuple[ArgumentList, Environment]: 

83 if commands == []: 

84 raise ValueError("expected at least one command, got none") 

85 

86 # allow None as dictionary value so that callers can pass, for example, 

87 # env = { "PYTHONPATH": os.getenv("PYTHONPATH") } 

88 # without having to check if the environment variable was set 

89 env_clean = dict([(k, cast(str, env[k])) for k in env.keys() if env[k] is not None]) 

90 

91 return self._submit_job_impl(commands, env_clean, options, name, unique_id) 

92 

93 @classmethod 

94 def _name_impl(cls) -> str: 

95 raise NotImplementedError("BUG missing implementation") 

96 

97 @classmethod 

98 def _is_available_impl(cls) -> Tuple[bool, Union[str, Tuple[str, str]]]: 

99 raise NotImplementedError("BUG missing implementation") 

100 

101 def _sanity_check_impl(self, options: Options) -> List[str]: 

102 return [] 

103 

104 def _submit_job_impl( 

105 self, 

106 commands: List[ArgumentList], 

107 env: Environment, 

108 options: Options, 

109 name: str, 

110 unique_id: int, 

111 ) -> Tuple[ArgumentList, Environment]: 

112 raise NotImplementedError("BUG missing implementation") 

113 

114 

115class DirectScheduler(Generic[JobT], Scheduler[JobT]): 

116 @classmethod 

117 def is_direct(cls) -> bool: 

118 return True 

119 

120 @classmethod 

121 def is_hybrid(cls) -> bool: 

122 return False 

123 

124 def make_job(self, proc: "Process[str]", unique_id: int) -> JobT: 

125 return self._make_job_impl(proc, unique_id) 

126 

127 @classmethod 

128 def update_jobs(cls, jobs: List[JobT]) -> None: 

129 """ 

130 This method updates the status of the given jobs. 

131 

132 The method is a classmethod because it is not supposed any state. It 

133 might be executed concurrently with other scheduler methods. 

134 """ 

135 if jobs == []: 

136 raise ValueError("the list of jobs must not be empty") 

137 return cls._update_jobs_impl(jobs) 

138 

139 @classmethod 

140 def cancel_jobs(cls, jobs: List[JobT]) -> None: 

141 """ 

142 This method cancels the given jobs. 

143 

144 The method is a classmethod because it is not supposed any state. It 

145 might be executed concurrently with other scheduler methods. 

146 """ 

147 if jobs == []: 

148 raise ValueError("the list of jobs must not be empty") 

149 return cls._cancel_jobs_impl(jobs) 

150 

151 def _make_job_impl(self, proc: "Process[str]", unique_id: int) -> JobT: 

152 raise NotImplementedError("BUG missing implementation") 

153 

154 @classmethod 

155 def _update_jobs_impl(cls, jobs: List[JobT]) -> None: 

156 raise NotImplementedError("BUG missing implementation") 

157 

158 @classmethod 

159 def _cancel_jobs_impl(cls, jobs: List[JobT]) -> None: 

160 raise NotImplementedError("BUG missing implementation") 

161 

162 

163class IndirectScheduler(Generic[JobT], Scheduler[JobT]): 

164 @classmethod 

165 def is_direct(cls) -> bool: 

166 return False 

167 

168 @classmethod 

169 def is_hybrid(cls) -> bool: 

170 return False 

171 

172 def make_job(self, proc: CompletedProcess, unique_id: int) -> JobT: 

173 return self._make_job_impl(proc, unique_id) 

174 

175 def update_jobs(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]: 

176 return self._update_jobs_impl(jobs) 

177 

178 def cancel_jobs(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]: 

179 return self._cancel_jobs_impl(jobs) 

180 

181 def parse_update_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

182 return self._parse_update_jobs_impl(jobs, proc) 

183 

184 def parse_cancel_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

185 return self._parse_cancel_jobs_impl(jobs, proc) 

186 

187 def _make_job_impl(self, proc: CompletedProcess, unique_id: int) -> JobT: 

188 raise NotImplementedError("BUG missing implementation") 

189 

190 def _update_jobs_impl(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]: 

191 raise NotImplementedError("BUG missing implementation") 

192 

193 def _cancel_jobs_impl(self, jobs: List[JobT]) -> Tuple[ArgumentList, Environment]: 

194 raise NotImplementedError("BUG missing implementation") 

195 

196 def _parse_update_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

197 raise NotImplementedError("BUG missing implementation") 

198 

199 def _parse_cancel_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

200 raise NotImplementedError("BUG missing implementation") 

201 

202 

203class HybridScheduler(Generic[JobT], Scheduler[JobT]): 

204 """ 

205 This class mixes both direct and indirect schedulers. The purpose is to enable 

206 scheduling the server in an indirect fashion while the clients are submitted 

207 directly i.e. clients and server jobs are viewed as direct and indirect. 

208 

209 It follows the direct scheduler methods except for the job cancellation which 

210 treats the server job the way an indirect scheduler would. 

211 """ 

212 @classmethod 

213 def is_direct(cls) -> bool: 

214 return True 

215 

216 @classmethod 

217 def is_hybrid(cls) -> bool: 

218 return True 

219 

220 def make_job(self, proc: "Process[str]", unique_id: int) -> JobT: 

221 return self._make_job_impl(proc, unique_id) 

222 

223 @classmethod 

224 def update_jobs(cls, jobs: List[JobT]) -> None: 

225 """ 

226 This method updates the status of the given jobs. 

227 

228 The method is a classmethod because it is not supposed any state. It 

229 might be executed concurrently with other scheduler methods. 

230 """ 

231 if jobs == []: 

232 raise ValueError("the list of jobs must not be empty") 

233 return cls._update_jobs_impl(jobs) 

234 

235 @classmethod 

236 def cancel_client_jobs(cls, jobs: List[JobT]) -> None: 

237 """ 

238 This method cancels the given jobs. 

239 

240 The method is a classmethod because it is not supposed any state. It 

241 might be executed concurrently with other scheduler methods. 

242 """ 

243 if jobs == []: 

244 raise ValueError("the list of jobs must not be empty") 

245 return cls._cancel_client_jobs_impl(jobs) 

246 

247 @classmethod 

248 def cancel_server_job(cls, jobs: List[JobT]) -> Optional[Tuple[ArgumentList, Environment]]: 

249 """ 

250 This method cancels the given jobs. 

251 

252 The method is a classmethod because it is not supposed any state. It 

253 might be executed concurrently with other scheduler methods. 

254 """ 

255 if jobs == []: 

256 raise ValueError("the list of jobs must not be empty") 

257 return cls._cancel_server_job_impl(jobs) 

258 

259 def parse_update_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

260 raise NotImplementedError("BUG missing implementation") 

261 

262 def parse_cancel_jobs(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

263 return self._parse_cancel_jobs_impl(jobs, proc) 

264 

265 def _make_job_impl(self, proc: "Process[str]", unique_id: int) -> JobT: 

266 raise NotImplementedError("BUG missing implementation") 

267 

268 @classmethod 

269 def _update_jobs_impl(cls, jobs: List[JobT]) -> None: 

270 raise NotImplementedError("BUG missing implementation") 

271 

272 @classmethod 

273 def _cancel_client_jobs_impl(cls, jobs: List[JobT]) -> None: 

274 raise NotImplementedError("BUG missing implementation") 

275 

276 @classmethod 

277 def _cancel_server_job_impl( 

278 cls, jobs: List[JobT] 

279 ) -> Optional[Tuple[ArgumentList, Environment]]: 

280 raise NotImplementedError("BUG missing implementation") 

281 

282 def _parse_cancel_jobs_impl(self, jobs: List[JobT], proc: CompletedProcess) -> None: 

283 raise NotImplementedError("BUG missing implementation")