Coverage for melissa/launcher/state_machine.py: 79%
365 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
1#!/usr/bin/python3
3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30"""
31A state machine for a job launcher.
33The state machine is built on the following assumptions:
34* The maximum(!) time between two server/launcher pings is the same in both
35 directions.
36* A server disconnect is always interpreted as a server termination. Whether
37 the termination is considered to be an error depends on the exit status of
38 the server.
39* After a successful job cancellation (e.g., `scancel` exits with status zero)
40 the job is assumed to have terminated. In practice, this is not true when
41 using a batch scheduler. If this is a problem, then the state machine and/or
42 the server must be adapted.
44This paragraph describes the state machine from a high level. In the
45SERVER_DEAD phase, all existing jobs are cancelled and all job updates are
46waited for. Once this is done, either the state machine stops or a new server
47job is submitted with the state machine transitioning to the SERVER_READY
48phase. Once the job submission succeeds, the state machine will begin to
49execute job updates. The state machine transition into the SERVER_RUNNING phase
50if the server connects to the launcher or if the batch scheduler marks the
51server job as running. In the SERVER_RUNNING phase, the server is expected to
52ping the launcher regularly. Keep in mind that the launcher considers a job
53terminated as soon as the job cancellation was successful (e.g., `scancel` or
54`oarcancel`).
56The launcher transitions to the SERVER_DEAD phase again if
57* a job update marks the server job as dead,
58* the server closes the connection to the launcher, or
59* SIGTERM or SIGINT is received.
61The launcher stops if
62* the server job exited successfully, or
63* SIGTERM or SIGINT were received.
64"""
66from copy import copy
67import enum
68import logging
69import signal
70from typing import Iterable, List, Optional, Tuple, Union
72from melissa.utility.networking import ConnectionId
73from melissa.utility.functools import partition
74from melissa.utility.identifier import Id
75from melissa.scheduler.job import Job
76from melissa.scheduler.job import State as JobState
77from melissa.utility.time import Time
78from melissa.utility.checkpointing import checkpoint_available
80from . import action, event, message
82logger = logging.getLogger(__name__)
85class Configuration:
86 def __init__(
87 self,
88 cwd: str,
89 launcher_host: str,
90 launcher_port: int,
91 launcher_protocol: str,
92 server_executable: str,
93 no_fault_tolerance: bool = False,
94 server_ping_interval: float = 30,
95 job_update_interval: float = 30,
96 ) -> None:
97 assert launcher_port > 0
98 assert launcher_port <= 65535
100 self.working_directory = cwd
101 self.launcher_host = launcher_host
102 self.launcher_port = launcher_port
103 self.launcher_protocol = launcher_protocol
104 self.server_executable = server_executable
105 self.no_fault_tolerance = no_fault_tolerance
106 self.server_ping_interval = Time(seconds=server_ping_interval)
107 self.job_update_interval = Time(seconds=job_update_interval)
108 self.group_size = 1
110 # server multi-connection attributes
111 self.server_comm_size = 1
112 self.connected_devices = 0
114 # time-out monitor switch
115 self.time_out_monitor: bool = True
116 assert self.server_ping_interval >= self.job_update_interval
118 # server submission counter
119 self.server_submission: int = 0
122@enum.unique
123class Phase(enum.Enum):
124 SERVER_DEAD = 1
125 SERVER_READY = 2
126 SERVER_RUNNING = 3
127 STOP = 4
130class State:
131 def __init__(self) -> None:
132 self.phase = Phase.SERVER_DEAD
133 self.job_submissions: List[Union[action.JobSubmission, action.ClientJobSubmission]] = []
134 self.jobs = [] # type: List[Job]
135 self.jobs_to_cancel: List[Union[str, int]] = []
136 self.jobs_cancelling: List[Union[str, int]] = []
137 self.server_job_uid: Union[str, int, None] = None
138 self.server_cid = None # type: Optional[ConnectionId]
139 self.server_cids: List[Optional[ConnectionId]] = []
140 self.cid_to_uid_map: List[Tuple[int, Id]] = []
141 self.last_server_message = Time()
142 self.last_server_ping = Time()
143 self.last_job_update = Time()
144 self.job_update_in_progress = False
145 self.stop = False
146 self.exit_status = 0
149def _uid_to_cid(s: State, the_uid: Union[str, int]) -> int:
150 cids = [cid for (cid, uid) in s.cid_to_uid_map if uid == the_uid]
151 if len(cids) > 1:
152 raise RuntimeError(
153 f"UID {the_uid} is associated with {len(cids)} CIDs")
155 if cids:
156 return cids[0]
157 raise RuntimeError(f"unknown UID {the_uid}")
160def _cid_to_uid(s: State, the_cid: int) -> Optional[int]:
161 uids = [uid for (cid, uid) in s.cid_to_uid_map if cid == the_cid]
162 if len(uids) > 1:
163 raise RuntimeError(
164 "CID %d is associated with %d UIDs" % (the_cid, len(uids))
165 )
167 return uids[0] if uids else None
170def _kill_server(cfg: Configuration, s0: State, reason: str,
171 *args: object) -> Tuple[State, List[action.Action]]:
172 logger.error(reason, *args)
173 s1 = copy(s0)
174 s1.phase = Phase.SERVER_DEAD
175 s1.stop = True
176 return _transition_common(cfg, s0)
179def _make_client_job_submission(
180 cfg: Configuration, client_id: int
181) -> Union[action.ClientJobSubmission, action.JobSubmission]:
182 commands = [[f"./client_scripts/client.{client_id * cfg.group_size}.sh"]]
183 if cfg.group_size > 1:
184 # the server submits one request per group
185 # the client_id corresponds to the group_id
186 for i in range(1, cfg.group_size):
187 commands.append([f"./client_scripts/client.{cfg.group_size * client_id + i}.sh"])
188 return action.ClientJobSubmission(
189 working_directory=cfg.working_directory,
190 commands=commands,
191 environment={
192 "MELISSA_JOB_ID": str(client_id),
193 },
194 job_name="melissa-client",
195 client_id=client_id
196 )
199def _make_server_job_submission(
200 cfg: Configuration
201) -> action.ServerJobSubmission:
202 # restart the number of connected device counter
203 cfg.connected_devices = 0
204 # if server crashed before a checkpoint was created, we launch
205 # fresh.
206 if checkpoint_available():
207 cfg.server_submission += 1
208 return action.ServerJobSubmission(
209 working_directory=cfg.working_directory,
210 commands=[[cfg.server_executable]],
211 environment={
212 "MELISSA_LAUNCHER_HOST": cfg.launcher_host,
213 "MELISSA_LAUNCHER_PORT": str(cfg.launcher_port),
214 "MELISSA_LAUNCHER_PROTOCOL": cfg.launcher_protocol,
215 "MELISSA_FAULT_TOLERANCE": "OFF" if cfg.no_fault_tolerance else "ON",
216 "MELISSA_RESTART": "0" if cfg.server_submission == 0 else str(cfg.server_submission)
217 },
218 job_name="melissa-server",
219 )
222def _submit_client_jobs(
223 cfg: Configuration, s0: State, initial_id: int, num_jobs: int
224) -> Tuple[State, List[action.Action]]:
225 assert s0.phase == Phase.SERVER_RUNNING
226 assert s0.jobs != []
227 assert initial_id >= 0
228 assert num_jobs > 0
230 client_ids = [initial_id + i for i in range(num_jobs)]
231 submissions = [_make_client_job_submission(cfg, cid) for cid in client_ids]
233 s1 = copy(s0)
234 s1.job_submissions = s0.job_submissions + submissions
236 # copy for silencing mypy
237 return s1, [s for s in submissions]
240def _send_update(s: State, j: Job) -> action.MessageSending:
241 assert s.phase == Phase.SERVER_RUNNING
243 cid = _uid_to_cid(s, j.unique_id())
244 update = message.JobUpdate(cid, j.state())
245 return action.MessageSending(s.server_cids, update)
248def _submit_server_job(cfg: Configuration,
249 s0: State) -> Tuple[State, List[action.Action]]:
250 assert s0.phase == Phase.SERVER_DEAD
251 assert s0.job_submissions == []
252 assert s0.jobs == []
253 assert s0.jobs_cancelling == []
255 s1 = copy(s0)
256 s1.phase = Phase.SERVER_READY
257 sub = _make_server_job_submission(cfg)
258 s1.job_submissions = [sub]
260 return s1, [sub]
263def _transition_common(cfg: Configuration,
264 s0: State) -> Tuple[State, List[action.Action]]:
265 assert s0.phase != Phase.STOP
266 assert not s0.jobs or len(s0.cid_to_uid_map) + 1 == len(s0.jobs)
268 if s0.phase == Phase.SERVER_DEAD:
269 if s0.jobs and not s0.job_submissions and not s0.jobs_cancelling:
270 s1 = copy(s0)
271 s1.jobs_cancelling = [j.unique_id() for j in s0.jobs]
272 s1.jobs_to_cancel = []
273 return s1, [action.JobCancellation(s0.jobs)]
275 if not s0.jobs and \
276 not s0.job_submissions and \
277 not s0.jobs_cancelling and \
278 not s0.job_update_in_progress:
279 assert not s0.cid_to_uid_map
280 if s0.stop:
281 s1 = copy(s0)
282 s1.phase = Phase.STOP
283 return s1, [action.Exit(s0.exit_status)]
285 return _submit_server_job(cfg, s0)
287 return s0, []
290# required to recognize obsolete job updates
291assert JobState.WAITING.value < JobState.RUNNING.value
292assert JobState.RUNNING.value < JobState.TERMINATED.value
293assert JobState.RUNNING.value < JobState.ERROR.value
294assert JobState.RUNNING.value < JobState.FAILED.value
297def _transition_job_update(
298 cfg: Configuration, s0: State, now: Time, ev: event.JobUpdate[Job]
299) -> Tuple[State, List[action.Action]]:
300 assert s0.phase != Phase.STOP
301 assert s0.job_update_in_progress
303 def try_find(j: Job, jobs: List[Job]) -> Optional[Job]:
304 for k in jobs:
305 if j == k:
306 return k
307 return None
309 def update(j: Job) -> Job:
310 k = try_find(j, ev.jobs)
312 if k is None:
313 return j
315 # depending on the order of the job update and server message
316 # reception, the job update may actually indicate an outdated state.
317 # The check below tries to catch such a job state transition.
318 if j.state().value >= k.state().value:
319 return j
321 return k
323 job_error_states = [JobState.ERROR, JobState.FAILED]
324 # I considered naming this variable "stop states" but this is misleading
325 # because jobs can actually be stopped when running jobs locally and when
326 # using a batch scheduler.
327 job_dead_states = job_error_states + [JobState.TERMINATED]
329 def is_alive(j: Job) -> bool:
330 return j.state() not in job_dead_states
332 updated_jobs = [update(j) for j in s0.jobs]
333 live_jobs, dead_jobs = partition(is_alive, updated_jobs)
335 # warn about failed client jobs
336 for j in dead_jobs:
337 if j.state() in job_error_states:
338 logger.warning(
339 f"job failure id={j.id()} uid={j.unique_id()} state={j.state()}"
340 )
342 s1 = copy(s0)
343 s1.jobs = live_jobs
344 live_uids = [j.unique_id() for j in live_jobs]
345 s1.cid_to_uid_map = [
346 (cid, uid) for (cid, uid) in s0.cid_to_uid_map if uid in live_uids
347 ]
348 s1.last_job_update = now
349 s1.job_update_in_progress = False
351 # diagnose server job
352 def find_server_job(jobs: Iterable[Job]) -> Job:
353 if s0.server_job_uid is None:
354 raise RuntimeError("BUG server job unique ID is None")
356 for j in jobs:
357 if j.unique_id() == s0.server_job_uid:
358 return j
359 raise RuntimeError("BUG there should be a server job")
361 if s0.server_job_uid is not None:
362 assert s0.phase != Phase.STOP
364 new_server_job = find_server_job(updated_jobs)
366 # in case the server terminated successfully
367 if new_server_job.state() == JobState.TERMINATED:
368 s1.stop = True
370 if new_server_job.state() == JobState.FAILED and cfg.no_fault_tolerance:
371 s1.exit_status = 1
372 s1.stop = True
374 if new_server_job.state() in job_dead_states:
375 # if the server job terminates, the connection will be closed. it
376 # is very unlikely that the launcher would detect the job
377 # termination before the closure of the network connection. thus,
378 # we do not handle this case here.
379 assert s0.server_cid is None
381 s1.phase = Phase.SERVER_DEAD
382 s1.server_job_uid = None
383 # this branch will be taken, e.g., if the user pressed Ctrl+C after a
384 # server job was successfully submitted
385 elif new_server_job.state() == JobState.RUNNING and \
386 s0.phase == Phase.SERVER_READY:
387 s1 = _mark_server_as_running(s1, now)
389 # send updates to server
390 if s1.phase == Phase.SERVER_RUNNING:
391 updated_client_jobs = [
392 j for j in updated_jobs if j != s0.server_job_uid and j in ev.jobs
393 ]
394 return s1, [_send_update(s0, j) for j in updated_client_jobs]
396 return _transition_common(cfg, s1)
399def _transition_message_reception(
400 cfg: Configuration, s0: State, now: Time, ev: event.MessageReception
401) -> Tuple[State, List[action.Action]]:
402 msg = ev.message
403 s1 = copy(s0)
404 s1.last_server_message = now
406 if isinstance(msg, message.Exit):
407 logger.info(f"server sent EXIT message (status {msg.status})")
408 s1.phase = Phase.SERVER_DEAD
409 s1.stop = True
411 if msg.status >= 0 and msg.status <= 127:
412 s1.exit_status = msg.status
413 else:
414 s1.exit_status = 1
415 logger.warning(
416 f"expected server EXIT status in closed interval 0 to 127, got {msg.status}"
417 )
419 elif isinstance(msg, message.InvalidMessage):
420 logger.warning(
421 f"received invalid message from server ({len(msg.raw_bytes)} bytes): {msg.reason}"
422 )
424 elif isinstance(msg, message.JobCancellation):
425 maybe_uid = _cid_to_uid(s0, msg.job_id)
426 if maybe_uid is None:
427 return _kill_server(
428 cfg, s1, "request to cancel unknown client ID %d", msg.job_id
429 )
430 s1.jobs_to_cancel.append(maybe_uid)
432 elif isinstance(msg, message.JobSubmission):
433 if msg.initial_id < 0:
434 return _kill_server(
435 cfg, s1, "expected a nonnegative client ID, server sent %d",
436 msg.initial_id
437 )
438 if msg.num_jobs <= 0:
439 return _kill_server(
440 cfg, s1, "server requested to submit %d jobs", msg.num_jobs
441 )
442 if msg.num_jobs > 1000:
443 return _kill_server(
444 cfg, s1, "server wants to submit %d jobs", msg.num_jobs
445 )
447 return _submit_client_jobs(cfg, s1, msg.initial_id, msg.num_jobs)
449 elif isinstance(msg, message.JobUpdate):
450 return _kill_server(
451 cfg, s1, "server unexpectedly sent a job update for CID %d",
452 msg.job_id
453 )
455 elif isinstance(msg, message.Ping):
456 logger.debug("received server ping")
457 return s1, []
459 elif isinstance(msg, message.CommSize):
460 logger.debug(f"received comm size {msg.comm_size}")
461 cfg.server_comm_size = msg.comm_size
463 elif isinstance(msg, message.GroupSize):
464 logger.debug(f"received group size {msg.group_size}")
465 cfg.group_size = msg.group_size
467 elif isinstance(msg, message.StopTimeoutMonitoring):
468 logger.debug("Server time-out are not monitored anymore")
469 cfg.time_out_monitor = False
471 else:
472 raise RuntimeError("unknown message type {}".format(type(msg)))
474 return _transition_common(cfg, s1)
477def _transition_timeout(
478 cfg: Configuration, s0: State, now: Time, _: event.Timeout
479) -> Tuple[State, List[action.Action]]:
480 if s0.jobs:
481 s1 = copy(s0)
482 actions_1 = [] # type: List[action.Action]
484 if s0.last_job_update + cfg.job_update_interval < now \
485 and not s0.job_update_in_progress:
486 s1.job_update_in_progress = True
487 actions_1.append(action.JobUpdate([copy(j) for j in s0.jobs]))
489 if s0.phase == Phase.SERVER_RUNNING:
490 if (
491 s0.last_server_message + 2 * cfg.server_ping_interval < now
492 and cfg.time_out_monitor
493 ):
494 logger.warning(f"server time-out uid={s0.server_job_uid}")
495 s1.phase = Phase.SERVER_DEAD
496 if cfg.no_fault_tolerance:
497 s1.stop = True
498 s1.exit_status = 1
499 s2, actions_2 = _transition_common(cfg, s1)
500 return s2, actions_1 + actions_2
502 # do not assume the connection exists because the server is running
503 # and not timed out. the batch scheduler can also mark the server
504 # as running.
505 if s0.last_server_ping + cfg.server_ping_interval < now \
506 and s0.server_cid is not None:
507 s1.last_server_ping = now
508 # timeout monitoring with server rank 0 only
509 actions_1.append(
510 action.MessageSending([s0.server_cid], message.Ping())
511 )
513 if s0.jobs_to_cancel and not s0.jobs_cancelling:
514 s1.jobs_cancelling = s0.jobs_to_cancel
515 s1.jobs_to_cancel = []
516 cancel_jobs = [
517 j for j in s0.jobs if j.unique_id() in s0.jobs_to_cancel
518 ]
519 actions_1.append(action.JobCancellation(cancel_jobs))
521 return s1, actions_1
523 return _transition_common(cfg, s0)
526def _mark_server_as_running(s0: State, now: Time) -> State:
527 assert s0.phase in [Phase.SERVER_READY, Phase.SERVER_RUNNING]
528 s1 = copy(s0)
529 s1.phase = Phase.SERVER_RUNNING
530 s1.last_server_message = now
531 s1.last_server_ping = now
532 return s1
535def transition(cfg: Configuration, s0: State, now: Time,
536 ev: event.Event) -> Tuple[State, List[action.Action]]:
537 assert s0.phase != Phase.STOP
538 assert not s0.jobs or len(s0.cid_to_uid_map) + 1 == len(s0.jobs)
540 if isinstance(ev, event.ActionFailure):
541 logger.warning(f"scheduling action {ev.action} error: {ev.error}")
543 s1 = copy(s0)
544 if isinstance(ev.action, action.JobCancellation):
545 # remove jobs that terminated in the meantime
546 live_jobs_cancelling = [
547 j for j in s0.jobs_cancelling if j in s0.jobs
548 ]
549 s1.jobs_cancelling = []
550 s1.jobs_to_cancel = s0.jobs_to_cancel + live_jobs_cancelling
551 elif isinstance(ev.action, action.JobSubmission):
552 s1.job_submissions = [
553 s for s in s0.job_submissions if s != ev.action
554 ]
555 # did the server submission fail? terminate immediately
556 if s0.phase == Phase.SERVER_READY:
557 logger.error("the server job submission failed. exiting.")
558 s1.phase = Phase.SERVER_DEAD
559 s1.stop = True
560 elif s0.phase == Phase.SERVER_RUNNING:
561 assert s0.server_cid is not None
562 return s1, [
563 action.MessageSending(
564 s0.server_cids,
565 message.JobUpdate(ev.action.client_id, JobState.ERROR)
566 )
567 ]
568 elif isinstance(ev.action, action.JobUpdate):
569 s1.job_update_in_progress = False
570 elif isinstance(ev.action, action.MessageSending):
571 s1.phase = Phase.SERVER_DEAD
572 else:
573 raise NotImplementedError("BUG")
574 return _transition_common(cfg, s1)
576 if isinstance(ev, event.ConnectionShutdown):
577 assert s0.server_cid is not None
578 assert s0.server_cid == ev.cid
579 s1 = copy(s0)
580 s1.phase = Phase.SERVER_DEAD
581 if cfg.no_fault_tolerance:
582 s1.stop = True
583 s1.server_cid = None
584 s1.server_cids = []
585 logger.info("server closed connection")
586 return _transition_common(cfg, s1)
588 if isinstance(ev, event.JobCancellation):
589 assert all([j in s0.jobs_cancelling for j in ev.jobs])
590 cancelled_jobs = ev.jobs
591 s1 = copy(s0)
592 s1.jobs = [j for j in s0.jobs if j not in cancelled_jobs]
593 s1.jobs_cancelling = []
594 s1.cid_to_uid_map = [x for x in s0.cid_to_uid_map if x[1] in s1.jobs]
595 if s0.server_job_uid is not None \
596 and s0.server_job_uid in cancelled_jobs:
597 s1.server_job_uid = None
598 return _transition_common(cfg, s1)
600 if isinstance(ev, event.JobSubmission):
601 assert ev.submission in s0.job_submissions
602 s1 = copy(s0)
603 s1.job_submissions = [
604 s for s in s0.job_submissions if s != ev.submission
605 ]
606 s1.jobs = s0.jobs + [ev.job]
607 if isinstance(ev.submission, action.ServerJobSubmission) \
608 and s0.phase == Phase.SERVER_READY:
609 assert s0.server_job_uid is None
610 assert len(s0.jobs) == 0
611 assert len(s0.job_submissions) == 1
612 s1.server_job_uid = ev.job.unique_id()
613 s1.last_job_update = now
614 if ev.job.state() == JobState.RUNNING:
615 s2 = _mark_server_as_running(s1, now)
616 return _transition_common(cfg, s2)
617 elif isinstance(ev.submission, action.ClientJobSubmission):
618 s1.cid_to_uid_map.append(
619 (ev.submission.client_id, ev.job.unique_id())
620 )
621 if s1.phase == Phase.SERVER_RUNNING:
622 return s1, [
623 action.MessageSending(
624 s0.server_cids,
625 message.JobUpdate(
626 ev.submission.client_id, ev.job.state()
627 )
628 )
629 ]
631 return _transition_common(cfg, s1)
633 if isinstance(ev, event.JobUpdate):
634 return _transition_job_update(cfg, s0, now, ev)
636 if isinstance(ev, event.MessageReception):
637 return _transition_message_reception(cfg, s0, now, ev)
639 if isinstance(ev, event.NewConnection):
640 cfg.connected_devices += 1
641 if s0.phase in [Phase.SERVER_READY, Phase.SERVER_RUNNING] \
642 and s0.server_cid is None:
643 s1 = _mark_server_as_running(s0, now)
644 s1.server_cid = ev.cid
645 s1.server_cids.append(ev.cid)
646 return _transition_common(cfg, s1)
647 elif cfg.connected_devices > 0 and cfg.connected_devices <= cfg.server_comm_size:
648 logger.info(f"new connection to launcher on fd {ev.cid}")
649 s0.server_cids.append(ev.cid)
650 return s0, [action.ConnectionServer(ev.cid)]
651 else:
652 logger.warning("ignoring new connection to launcher")
653 return s0, [action.ConnectionClosure(ev.cid)]
655 if isinstance(ev, event.Signal):
656 if ev.signo in [signal.SIGINT, signal.SIGTERM]:
657 if s0.stop and not cfg.no_fault_tolerance:
658 print("The launcher is")
659 if s0.jobs:
660 print("* waiting for %d job(s)" % len(s0.jobs))
661 if s0.job_submissions:
662 print(
663 "* terminating %d job submission(s)" %
664 len(s0.job_submissions)
665 )
666 if s0.jobs_cancelling:
667 print(
668 "* waiting for the cancellation of %d job(s)" %
669 len(s0.jobs_cancelling)
670 )
671 if s0.job_update_in_progress:
672 print("* one job update")
673 return s0, []
674 else:
675 s1 = copy(s0)
676 s1.phase = Phase.SERVER_DEAD
677 s1.stop = True
678 return _transition_common(cfg, s1)
679 else:
680 return s0, []
682 if isinstance(ev, event.Timeout):
683 return _transition_timeout(cfg, s0, now, ev)
685 fmt = "unknown state transition (state {:s} event {:s})"
686 raise RuntimeError(fmt.format(s0.phase, type(ev)))