Coverage for melissa/server/offline_server.py: 0%
40 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-11-03 09:52 +0100
« prev ^ index » next coverage.py v7.10.1, created at 2025-11-03 09:52 +0100
1import logging
2from typing import List, Dict, Any
3from typing_extensions import override
5from melissa.scheduler import job
6from melissa.launcher import message
7from melissa.server.exceptions import FatalError
8from melissa.server.sensitivity_analysis import SensitivityAnalysisServer
11logger = logging.getLogger(__name__)
14class OfflineServer(SensitivityAnalysisServer):
15 """`OfflineServer` class extends the `SensitivityAnalysisServer` but
16 activates offline mode to disable reception of data. This is useful when the user
17 requires to sample the parameters and generate client scripts to store their data instead
18 of sending it to the server."""
19 def __init__(self, config_dict: Dict[str, Any]) -> None:
21 config_dict["sa_config"] = {
22 "checkpoint_interval": 0
23 }
24 super().__init__(config_dict)
25 self.make_study_offline()
26 if self._restart > 0:
27 raise FatalError("To rerun the study, please remove the existing study folder.")
29 @override
30 def _launch_groups(self, group_ids: List[int]) -> None:
31 """Submits all the groups instead of 1-by-1 lazy strategy."""
32 group_ids = list(range(0, self.nb_groups))
33 return super()._launch_groups(group_ids)
35 @override
36 def _handle_fd(self) -> None:
37 """Handles the launcher's messages."""
39 bs = self._launcherfd.recv(256)
40 rcvd_msg = self._decode_msg(bs)
42 for msg in rcvd_msg:
43 # 1. Launcher sent JOB_UPDATE message (msg.job_id <=> group_id)
44 if isinstance(msg, message.JobUpdate):
45 if msg.job_id not in self._groups:
46 continue
47 group = self._groups[msg.job_id]
48 group_id = group.group_id
49 # React to simulation status
50 if msg.job_state in [job.State.ERROR, job.State.FAILED]:
51 logger.debug("Launcher indicates job failure")
52 elif msg.job_state is job.State.TERMINATED:
53 logger.info(
54 f"Rank {self.rank}>> [Termination] group-id/sim-id={msg.job_id}"
55 )
56 for sim_id in group.simulations:
57 group.simulations[sim_id].connected = False
58 self.finished_groups.add(group_id)
59 # 2. Server sends PING
60 if self.rank == 0:
61 logger.debug(
62 "Server got message from launcher and sends PING back"
63 )
64 snd_msg = self._encode_msg(message.Ping())
65 self._launcherfd.send(snd_msg)