Coverage for melissa/launcher/state_machine.py: 79%

366 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30""" 

31A state machine for a job launcher. 

32 

33The state machine is built on the following assumptions: 

34* The maximum(!) time between two server/launcher pings is the same in both 

35 directions. 

36* A server disconnect is always interpreted as a server termination. Whether 

37 the termination is considered to be an error depends on the exit status of 

38 the server. 

39* After a successful job cancellation (e.g., `scancel` exits with status zero) 

40 the job is assumed to have terminated. In practice, this is not true when 

41 using a batch scheduler. If this is a problem, then the state machine and/or 

42 the server must be adapted. 

43 

44This paragraph describes the state machine from a high level. In the 

45SERVER_DEAD phase, all existing jobs are cancelled and all job updates are 

46waited for. Once this is done, either the state machine stops or a new server 

47job is submitted with the state machine transitioning to the SERVER_READY 

48phase. Once the job submission succeeds, the state machine will begin to 

49execute job updates. The state machine transition into the SERVER_RUNNING phase 

50if the server connects to the launcher or if the batch scheduler marks the 

51server job as running. In the SERVER_RUNNING phase, the server is expected to 

52ping the launcher regularly. Keep in mind that the launcher considers a job 

53terminated as soon as the job cancellation was successful (e.g., `scancel` or 

54`oarcancel`). 

55 

56The launcher transitions to the SERVER_DEAD phase again if 

57* a job update marks the server job as dead, 

58* the server closes the connection to the launcher, or 

59* SIGTERM or SIGINT is received. 

60 

61The launcher stops if 

62* the server job exited successfully, or 

63* SIGTERM or SIGINT were received. 

64""" 

65 

66from copy import copy 

67import enum 

68import logging 

69import signal 

70from typing import Iterable, List, Optional, Tuple, Union 

71 

72from melissa.utility.networking import ConnectionId 

73from melissa.utility.functools import partition 

74from melissa.utility.identifier import Id 

75from melissa.scheduler.job import Job 

76from melissa.scheduler.job import State as JobState 

77from melissa.utility.time import Time 

78from melissa.utility.checkpointing import checkpoint_available 

79 

80from . import action, event, message 

81 

82logger = logging.getLogger(__name__) 

83 

84 

85class Configuration: 

86 def __init__( 

87 self, 

88 cwd: str, 

89 launcher_host: str, 

90 launcher_port: int, 

91 launcher_protocol: str, 

92 server_executable: str, 

93 no_fault_tolerance: bool = False, 

94 server_ping_interval: float = 30, 

95 job_update_interval: float = 30, 

96 ) -> None: 

97 assert launcher_port > 0 

98 assert launcher_port <= 65535 

99 

100 self.working_directory = cwd 

101 self.launcher_host = launcher_host 

102 self.launcher_port = launcher_port 

103 self.launcher_protocol = launcher_protocol 

104 self.server_executable = server_executable 

105 self.no_fault_tolerance = no_fault_tolerance 

106 self.server_ping_interval = Time(seconds=server_ping_interval) 

107 self.job_update_interval = Time(seconds=job_update_interval) 

108 self.group_size = 1 

109 

110 # server multi-connection attributes 

111 self.server_comm_size = 1 

112 self.connected_devices = 0 

113 

114 # time-out monitor switch 

115 self.time_out_monitor: bool = True 

116 assert self.server_ping_interval >= self.job_update_interval 

117 

118 # server submission counter 

119 self.server_submission: int = 0 

120 

121 

122@enum.unique 

123class Phase(enum.Enum): 

124 SERVER_DEAD = 1 

125 SERVER_READY = 2 

126 SERVER_RUNNING = 3 

127 STOP = 4 

128 

129 

130class State: 

131 def __init__(self) -> None: 

132 self.phase = Phase.SERVER_DEAD 

133 self.job_submissions: List[Union[action.JobSubmission, action.ClientJobSubmission]] = [] 

134 self.jobs = [] # type: List[Job] 

135 self.jobs_to_cancel: List[Union[str, int]] = [] 

136 self.jobs_cancelling: List[Union[str, int]] = [] 

137 self.server_job_uid: Union[str, int, None] = None 

138 self.server_cid = None # type: Optional[ConnectionId] 

139 self.server_cids: List[Optional[ConnectionId]] = [] 

140 self.cid_to_uid_map: List[Tuple[int, Id]] = [] 

141 self.last_server_message = Time() 

142 self.last_server_ping = Time() 

143 self.last_job_update = Time() 

144 self.job_update_in_progress = False 

145 self.stop = False 

146 self.exit_status = 0 

147 

148 

149def _uid_to_cid(s: State, the_uid: Union[str, int]) -> int: 

150 cids = [cid for (cid, uid) in s.cid_to_uid_map if uid == the_uid] 

151 if len(cids) > 1: 

152 raise RuntimeError( 

153 f"UID {the_uid} is associated with {len(cids)} CIDs") 

154 

155 if cids: 

156 return cids[0] 

157 raise RuntimeError(f"unknown UID {the_uid}") 

158 

159 

160def _cid_to_uid(s: State, the_cid: int) -> Optional[int]: 

161 uids = [uid for (cid, uid) in s.cid_to_uid_map if cid == the_cid] 

162 if len(uids) > 1: 

163 raise RuntimeError( 

164 "CID %d is associated with %d UIDs" % (the_cid, len(uids)) 

165 ) 

166 

167 return uids[0] if uids else None 

168 

169 

170def _kill_server(cfg: Configuration, s0: State, reason: str, 

171 *args: object) -> Tuple[State, List[action.Action]]: 

172 logger.error(reason, *args) 

173 s1 = copy(s0) 

174 s1.phase = Phase.SERVER_DEAD 

175 s1.stop = True 

176 return _transition_common(cfg, s0) 

177 

178 

179def _make_client_job_submission( 

180 cfg: Configuration, client_id: int 

181) -> Union[action.ClientJobSubmission, action.JobSubmission]: 

182 commands = [[f"./client_scripts/client.{client_id*cfg.group_size}.sh"]] 

183 if cfg.group_size > 1: 

184 # the server submits one request per group 

185 # the client_id corresponds to the group_id 

186 for i in range(1, cfg.group_size): 

187 commands.append([f"./client_scripts/client.{cfg.group_size*client_id + i}.sh"]) 

188 return action.ClientJobSubmission( 

189 working_directory=cfg.working_directory, 

190 commands=commands, 

191 environment={ 

192 "MELISSA_JOB_ID": str(client_id), 

193 }, 

194 job_name="melissa-client", 

195 client_id=client_id 

196 ) 

197 

198 

199def _make_server_job_submission( 

200 cfg: Configuration 

201) -> action.ServerJobSubmission: 

202 # restart the number of connected device counter 

203 cfg.connected_devices = 0 

204 # if server crashed before a checkpoint was created, we launch 

205 # fresh. 

206 if checkpoint_available(): 

207 cfg.server_submission += 1 

208 return action.ServerJobSubmission( 

209 working_directory=cfg.working_directory, 

210 commands=[[cfg.server_executable]], 

211 environment={ 

212 "MELISSA_LAUNCHER_HOST": cfg.launcher_host, 

213 "MELISSA_LAUNCHER_PORT": str(cfg.launcher_port), 

214 "MELISSA_LAUNCHER_PROTOCOL": cfg.launcher_protocol, 

215 "MELISSA_FAULT_TOLERANCE": "OFF" if cfg.no_fault_tolerance else "ON", 

216 "MELISSA_RESTART": "0" if cfg.server_submission == 0 else str(cfg.server_submission) 

217 }, 

218 job_name="melissa-server", 

219 ) 

220 

221 

222def _submit_client_jobs( 

223 cfg: Configuration, s0: State, initial_id: int, num_jobs: int 

224) -> Tuple[State, List[action.Action]]: 

225 assert s0.phase == Phase.SERVER_RUNNING 

226 assert s0.jobs != [] 

227 assert initial_id >= 0 

228 assert num_jobs > 0 

229 

230 client_ids = [initial_id + i for i in range(num_jobs)] 

231 submissions = [_make_client_job_submission(cfg, cid) for cid in client_ids] 

232 

233 s1 = copy(s0) 

234 s1.job_submissions = s0.job_submissions + submissions 

235 

236 # copy for silencing mypy 

237 return s1, [s for s in submissions] 

238 

239 

240def _send_update(s: State, j: Job) -> action.MessageSending: 

241 assert s.phase == Phase.SERVER_RUNNING 

242 

243 cid = _uid_to_cid(s, j.unique_id()) 

244 update = message.JobUpdate(cid, j.state()) 

245 return action.MessageSending(s.server_cids, update) 

246 

247 

248def _submit_server_job(cfg: Configuration, 

249 s0: State) -> Tuple[State, List[action.Action]]: 

250 assert s0.phase == Phase.SERVER_DEAD 

251 assert s0.job_submissions == [] 

252 assert s0.jobs == [] 

253 assert s0.jobs_cancelling == [] 

254 

255 s1 = copy(s0) 

256 s1.phase = Phase.SERVER_READY 

257 sub = _make_server_job_submission(cfg) 

258 s1.job_submissions = [sub] 

259 

260 return s1, [sub] 

261 

262 

263def _transition_common(cfg: Configuration, 

264 s0: State) -> Tuple[State, List[action.Action]]: 

265 assert s0.phase != Phase.STOP 

266 assert not s0.jobs or len(s0.cid_to_uid_map) + 1 == len(s0.jobs) 

267 

268 if s0.phase == Phase.SERVER_DEAD: 

269 if s0.jobs and not s0.job_submissions and not s0.jobs_cancelling: 

270 s1 = copy(s0) 

271 s1.jobs_cancelling = [j.unique_id() for j in s0.jobs] 

272 s1.jobs_to_cancel = [] 

273 return s1, [action.JobCancellation(s0.jobs)] 

274 

275 if not s0.jobs and \ 

276 not s0.job_submissions and \ 

277 not s0.jobs_cancelling and \ 

278 not s0.job_update_in_progress: 

279 assert not s0.cid_to_uid_map 

280 if s0.stop: 

281 s1 = copy(s0) 

282 s1.phase = Phase.STOP 

283 return s1, [action.Exit(s0.exit_status)] 

284 

285 return _submit_server_job(cfg, s0) 

286 

287 return s0, [] 

288 

289 

290# required to recognize obsolete job updates 

291assert JobState.WAITING.value < JobState.RUNNING.value 

292assert JobState.RUNNING.value < JobState.TERMINATED.value 

293assert JobState.RUNNING.value < JobState.ERROR.value 

294assert JobState.RUNNING.value < JobState.FAILED.value 

295 

296 

297def _transition_job_update( 

298 cfg: Configuration, s0: State, now: Time, ev: event.JobUpdate[Job] 

299) -> Tuple[State, List[action.Action]]: 

300 assert s0.phase != Phase.STOP 

301 assert s0.job_update_in_progress 

302 

303 def try_find(j: Job, jobs: List[Job]) -> Optional[Job]: 

304 for k in jobs: 

305 if j == k: 

306 return k 

307 return None 

308 

309 def update(j: Job) -> Job: 

310 k = try_find(j, ev.jobs) 

311 

312 if k is None: 

313 return j 

314 

315 # depending on the order of the job update and server message 

316 # reception, the job update may actually indicate an outdated state. 

317 # The check below tries to catch such a job state transition. 

318 if j.state().value >= k.state().value: 

319 return j 

320 

321 return k 

322 

323 job_error_states = [JobState.ERROR, JobState.FAILED] 

324 # I considered naming this variable "stop states" but this is misleading 

325 # because jobs can actually be stopped when running jobs locally and when 

326 # using a batch scheduler. 

327 job_dead_states = job_error_states + [JobState.TERMINATED] 

328 

329 def is_alive(j: Job) -> bool: 

330 return j.state() not in job_dead_states 

331 

332 updated_jobs = [update(j) for j in s0.jobs] 

333 live_jobs, dead_jobs = partition(is_alive, updated_jobs) 

334 

335 # warn about failed client jobs 

336 for j in dead_jobs: 

337 if j.state() in job_error_states: 

338 logger.warning( 

339 f"job failure id={j.id()} uid={j.unique_id()} state={j.state()}" 

340 ) 

341 

342 s1 = copy(s0) 

343 s1.jobs = live_jobs 

344 live_uids = [j.unique_id() for j in live_jobs] 

345 s1.cid_to_uid_map = [ 

346 (cid, uid) for (cid, uid) in s0.cid_to_uid_map if uid in live_uids 

347 ] 

348 s1.last_job_update = now 

349 s1.job_update_in_progress = False 

350 

351 # diagnose server job 

352 def find_server_job(jobs: Iterable[Job]) -> Job: 

353 if s0.server_job_uid is None: 

354 raise RuntimeError("BUG server job unique ID is None") 

355 

356 for j in jobs: 

357 if j.unique_id() == s0.server_job_uid: 

358 return j 

359 raise RuntimeError("BUG there should be a server job") 

360 

361 if s0.server_job_uid is not None: 

362 assert s0.phase != Phase.STOP 

363 

364 new_server_job = find_server_job(updated_jobs) 

365 

366 # in case the server terminated successfully 

367 if new_server_job.state() == JobState.TERMINATED: 

368 s1.stop = True 

369 

370 if new_server_job.state() == JobState.FAILED and cfg.no_fault_tolerance: 

371 s1.exit_status = 1 

372 s1.stop = True 

373 

374 if new_server_job.state() in job_dead_states: 

375 # if the server job terminates, the connection will be closed. it 

376 # is very unlikely that the launcher would detect the job 

377 # termination before the closure of the network connection. thus, 

378 # we do not handle this case here. 

379 assert s0.server_cid is None 

380 

381 s1.phase = Phase.SERVER_DEAD 

382 s1.server_job_uid = None 

383 # this branch will be taken, e.g., if the user pressed Ctrl+C after a 

384 # server job was successfully submitted 

385 elif new_server_job.state() == JobState.RUNNING and \ 

386 s0.phase == Phase.SERVER_READY: 

387 s1 = _mark_server_as_running(s1, now) 

388 

389 # send updates to server 

390 if s1.phase == Phase.SERVER_RUNNING: 

391 updated_client_jobs = [ 

392 j for j in updated_jobs if j != s0.server_job_uid and j in ev.jobs 

393 ] 

394 return s1, [_send_update(s0, j) for j in updated_client_jobs] 

395 

396 return _transition_common(cfg, s1) 

397 

398 

399def _transition_message_reception( 

400 cfg: Configuration, s0: State, now: Time, ev: event.MessageReception 

401) -> Tuple[State, List[action.Action]]: 

402 msg = ev.message 

403 s1 = copy(s0) 

404 s1.last_server_message = now 

405 

406 if isinstance(msg, message.Exit): 

407 logger.info(f"server sent EXIT message (status {msg.status})") 

408 s1.phase = Phase.SERVER_DEAD 

409 s1.stop = True 

410 

411 if msg.status >= 0 and msg.status <= 127: 

412 s1.exit_status = msg.status 

413 else: 

414 s1.exit_status = 1 

415 logger.warning( 

416 f"expected server EXIT status in closed interval 0 to 127, got {msg.status}" 

417 ) 

418 

419 elif isinstance(msg, message.InvalidMessage): 

420 logger.warning( 

421 f"received invalid message from server ({len(msg.raw_bytes)} bytes): {msg.reason}" 

422 ) 

423 

424 elif isinstance(msg, message.JobCancellation): 

425 maybe_uid = _cid_to_uid(s0, msg.job_id) 

426 if maybe_uid is None: 

427 return _kill_server( 

428 cfg, s1, "request to cancel unknown client ID %d", msg.job_id 

429 ) 

430 s1.jobs_to_cancel.append(maybe_uid) 

431 

432 elif isinstance(msg, message.JobSubmission): 

433 if msg.initial_id < 0: 

434 return _kill_server( 

435 cfg, s1, "expected a nonnegative client ID, server sent %d", 

436 msg.initial_id 

437 ) 

438 if msg.num_jobs <= 0: 

439 return _kill_server( 

440 cfg, s1, "server requested to submit %d jobs", msg.num_jobs 

441 ) 

442 if msg.num_jobs > 1000: 

443 return _kill_server( 

444 cfg, s1, "server wants to submit %d jobs", msg.num_jobs 

445 ) 

446 

447 return _submit_client_jobs(cfg, s1, msg.initial_id, msg.num_jobs) 

448 

449 elif isinstance(msg, message.JobUpdate): 

450 return _kill_server( 

451 cfg, s1, "server unexpectedly sent a job update for CID %d", 

452 msg.job_id 

453 ) 

454 

455 elif isinstance(msg, message.Ping): 

456 logger.debug("received server ping") 

457 return s1, [] 

458 

459 elif isinstance(msg, message.CommSize): 

460 logger.debug(f"received comm size {msg.comm_size}") 

461 cfg.server_comm_size = msg.comm_size 

462 

463 elif isinstance(msg, message.GroupSize): 

464 logger.debug(f"received group size {msg.group_size}") 

465 cfg.group_size = msg.group_size 

466 

467 elif isinstance(msg, message.StopTimeoutMonitoring): 

468 logger.debug("Server time-out are not monitored anymore") 

469 cfg.time_out_monitor = False 

470 

471 else: 

472 raise RuntimeError("unknown message type {}".format(type(msg))) 

473 

474 return _transition_common(cfg, s1) 

475 

476 

477def _transition_timeout( 

478 cfg: Configuration, s0: State, now: Time, _: event.Timeout 

479) -> Tuple[State, List[action.Action]]: 

480 if s0.jobs: 

481 s1 = copy(s0) 

482 actions_1 = [] # type: List[action.Action] 

483 

484 if s0.last_job_update + cfg.job_update_interval < now \ 

485 and not s0.job_update_in_progress: 

486 s1.job_update_in_progress = True 

487 actions_1.append(action.JobUpdate([copy(j) for j in s0.jobs])) 

488 

489 if s0.phase == Phase.SERVER_RUNNING: 

490 if ( 

491 s0.last_server_message + 2 * cfg.server_ping_interval < now 

492 and cfg.time_out_monitor 

493 ): 

494 logger.warning(f"server time-out uid={s0.server_job_uid}") 

495 s1.phase = Phase.SERVER_DEAD 

496 if cfg.no_fault_tolerance: 

497 s1.stop = True 

498 s1.exit_status = 1 

499 s2, actions_2 = _transition_common(cfg, s1) 

500 return s2, actions_1 + actions_2 

501 

502 # do not assume the connection exists because the server is running 

503 # and not timed out. the batch scheduler can also mark the server 

504 # as running. 

505 if s0.last_server_ping + cfg.server_ping_interval < now \ 

506 and s0.server_cid is not None: 

507 s1.last_server_ping = now 

508 # timeout monitoring with server rank 0 only 

509 actions_1.append( 

510 action.MessageSending([s0.server_cid], message.Ping()) 

511 ) 

512 

513 if s0.jobs_to_cancel and not s0.jobs_cancelling: 

514 s1.jobs_cancelling = s0.jobs_to_cancel 

515 s1.jobs_to_cancel = [] 

516 cancel_jobs = [ 

517 j for j in s0.jobs if j.unique_id() in s0.jobs_to_cancel 

518 ] 

519 actions_1.append(action.JobCancellation(cancel_jobs)) 

520 

521 return s1, actions_1 

522 

523 return _transition_common(cfg, s0) 

524 

525 

526def _mark_server_as_running(s0: State, now: Time) -> State: 

527 assert s0.phase in [Phase.SERVER_READY, Phase.SERVER_RUNNING] 

528 s1 = copy(s0) 

529 s1.phase = Phase.SERVER_RUNNING 

530 s1.last_server_message = now 

531 s1.last_server_ping = now 

532 return s1 

533 

534 

535def transition(cfg: Configuration, s0: State, now: Time, 

536 ev: event.Event) -> Tuple[State, List[action.Action]]: 

537 assert s0.phase != Phase.STOP 

538 assert not s0.jobs or len(s0.cid_to_uid_map) + 1 == len(s0.jobs) 

539 

540 if isinstance(ev, event.ActionFailure): 

541 logger.warning(f"scheduling action {ev.action} error: {ev.error}") 

542 

543 s1 = copy(s0) 

544 if isinstance(ev.action, action.JobCancellation): 

545 # remove jobs that terminated in the meantime 

546 live_jobs_cancelling = [ 

547 j for j in s0.jobs_cancelling if j in s0.jobs 

548 ] 

549 s1.jobs_cancelling = [] 

550 s1.jobs_to_cancel = s0.jobs_to_cancel + live_jobs_cancelling 

551 elif isinstance(ev.action, action.JobSubmission): 

552 s1.job_submissions = [ 

553 s for s in s0.job_submissions if s != ev.action 

554 ] 

555 # did the server submission fail? terminate immediately 

556 if s0.phase == Phase.SERVER_READY: 

557 logger.error("the server job submission failed. exiting.") 

558 s1.phase = Phase.SERVER_DEAD 

559 s1.stop = True 

560 elif s0.phase == Phase.SERVER_RUNNING: 

561 assert s0.server_cid is not None 

562 return s1, [ 

563 action.MessageSending( 

564 s0.server_cids, 

565 message.JobUpdate(ev.action.client_id, JobState.ERROR) 

566 ) 

567 ] 

568 elif isinstance(ev.action, action.JobUpdate): 

569 s1.job_update_in_progress = False 

570 elif isinstance(ev.action, action.MessageSending): 

571 s1.phase = Phase.SERVER_DEAD 

572 else: 

573 raise NotImplementedError("BUG") 

574 return _transition_common(cfg, s1) 

575 

576 if isinstance(ev, event.ConnectionShutdown): 

577 assert s0.server_cid is not None 

578 assert s0.server_cid == ev.cid 

579 s1 = copy(s0) 

580 s1.phase = Phase.SERVER_DEAD 

581 if cfg.no_fault_tolerance: 

582 s1.stop = True 

583 s1.server_cid = None 

584 s1.server_cids = [] 

585 logger.info("server closed connection") 

586 return _transition_common(cfg, s1) 

587 

588 if isinstance(ev, event.JobCancellation): 

589 assert all([j in s0.jobs_cancelling for j in ev.jobs]) 

590 cancelled_jobs = ev.jobs 

591 s1 = copy(s0) 

592 s1.jobs = [j for j in s0.jobs if j not in cancelled_jobs] 

593 s1.jobs_cancelling = [] 

594 s1.cid_to_uid_map = [x for x in s0.cid_to_uid_map if x[1] in s1.jobs] 

595 if s0.server_job_uid is not None \ 

596 and s0.server_job_uid in cancelled_jobs: 

597 s1.server_job_uid = None 

598 return _transition_common(cfg, s1) 

599 

600 if isinstance(ev, event.JobSubmission): 

601 assert ev.submission in s0.job_submissions 

602 s1 = copy(s0) 

603 s1.job_submissions = [ 

604 s for s in s0.job_submissions if s != ev.submission 

605 ] 

606 s1.jobs = s0.jobs + [ev.job] 

607 if isinstance(ev.submission, action.ServerJobSubmission) \ 

608 and s0.phase == Phase.SERVER_READY: 

609 assert s0.server_job_uid is None 

610 assert len(s0.jobs) == 0 

611 assert len(s0.job_submissions) == 1 

612 s1.server_job_uid = ev.job.unique_id() 

613 s1.last_job_update = now 

614 if ev.job.state() == JobState.RUNNING: 

615 s2 = _mark_server_as_running(s1, now) 

616 return _transition_common(cfg, s2) 

617 elif isinstance(ev.submission, action.ClientJobSubmission): 

618 s1.cid_to_uid_map.append( 

619 (ev.submission.client_id, ev.job.unique_id()) 

620 ) 

621 if s1.phase == Phase.SERVER_RUNNING: 

622 return s1, [ 

623 action.MessageSending( 

624 s0.server_cids, 

625 message.JobUpdate( 

626 ev.submission.client_id, ev.job.state() 

627 ) 

628 ) 

629 ] 

630 

631 return _transition_common(cfg, s1) 

632 

633 if isinstance(ev, event.JobUpdate): 

634 return _transition_job_update(cfg, s0, now, ev) 

635 

636 if isinstance(ev, event.MessageReception): 

637 return _transition_message_reception(cfg, s0, now, ev) 

638 

639 if isinstance(ev, event.NewConnection): 

640 cfg.connected_devices += 1 

641 if s0.phase in [Phase.SERVER_READY, Phase.SERVER_RUNNING] \ 

642 and s0.server_cid is None: 

643 s1 = _mark_server_as_running(s0, now) 

644 s1.server_cid = ev.cid 

645 s1.server_cids.append(ev.cid) 

646 return _transition_common(cfg, s1) 

647 elif cfg.connected_devices > 0 and cfg.connected_devices <= cfg.server_comm_size: 

648 logger.info(f"new connection to launcher on fd {ev.cid}") 

649 s0.server_cids.append(ev.cid) 

650 return s0, [action.ConnectionServer(ev.cid)] 

651 else: 

652 logger.warning("ignoring new connection to launcher") 

653 return s0, [action.ConnectionClosure(ev.cid)] 

654 

655 if isinstance(ev, event.Signal): 

656 if ev.signo in [signal.SIGINT, signal.SIGTERM]: 

657 if s0.stop and not cfg.no_fault_tolerance: 

658 print("The launcher is") 

659 if s0.jobs: 

660 print("* waiting for %d job(s)" % len(s0.jobs)) 

661 if s0.job_submissions: 

662 print( 

663 "* terminating %d job submission(s)" % 

664 len(s0.job_submissions) 

665 ) 

666 if s0.jobs_cancelling: 

667 print( 

668 "* waiting for the cancellation of %d job(s)" % 

669 len(s0.jobs_cancelling) 

670 ) 

671 if s0.job_update_in_progress: 

672 print("* one job update") 

673 return s0, [] 

674 else: 

675 s1 = copy(s0) 

676 s1.phase = Phase.SERVER_DEAD 

677 s1.stop = True 

678 return _transition_common(cfg, s1) 

679 else: 

680 return s0, [] 

681 

682 if isinstance(ev, event.Timeout): 

683 return _transition_timeout(cfg, s0, now, ev) 

684 

685 fmt = "unknown state transition (state {:s} event {:s})" 

686 raise RuntimeError(fmt.format(s0.phase, type(ev)))