Coverage for melissa/server/offline_server.py: 0%

40 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-11-03 09:52 +0100

1import logging 

2from typing import List, Dict, Any 

3from typing_extensions import override 

4 

5from melissa.scheduler import job 

6from melissa.launcher import message 

7from melissa.server.exceptions import FatalError 

8from melissa.server.sensitivity_analysis import SensitivityAnalysisServer 

9 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14class OfflineServer(SensitivityAnalysisServer): 

15 """`OfflineServer` class extends the `SensitivityAnalysisServer` but 

16 activates offline mode to disable reception of data. This is useful when the user 

17 requires to sample the parameters and generate client scripts to store their data instead 

18 of sending it to the server.""" 

19 def __init__(self, config_dict: Dict[str, Any]) -> None: 

20 

21 config_dict["sa_config"] = { 

22 "checkpoint_interval": 0 

23 } 

24 super().__init__(config_dict) 

25 self.make_study_offline() 

26 if self._restart > 0: 

27 raise FatalError("To rerun the study, please remove the existing study folder.") 

28 

29 @override 

30 def _launch_groups(self, group_ids: List[int]) -> None: 

31 """Submits all the groups instead of 1-by-1 lazy strategy.""" 

32 group_ids = list(range(0, self.nb_groups)) 

33 return super()._launch_groups(group_ids) 

34 

35 @override 

36 def _handle_fd(self) -> None: 

37 """Handles the launcher's messages.""" 

38 

39 bs = self._launcherfd.recv(256) 

40 rcvd_msg = self._decode_msg(bs) 

41 

42 for msg in rcvd_msg: 

43 # 1. Launcher sent JOB_UPDATE message (msg.job_id <=> group_id) 

44 if isinstance(msg, message.JobUpdate): 

45 if msg.job_id not in self._groups: 

46 continue 

47 group = self._groups[msg.job_id] 

48 group_id = group.group_id 

49 # React to simulation status 

50 if msg.job_state in [job.State.ERROR, job.State.FAILED]: 

51 logger.debug("Launcher indicates job failure") 

52 elif msg.job_state is job.State.TERMINATED: 

53 logger.info( 

54 f"Rank {self.rank}>> [Termination] group-id/sim-id={msg.job_id}" 

55 ) 

56 for sim_id in group.simulations: 

57 group.simulations[sim_id].connected = False 

58 self.finished_groups.add(group_id) 

59 # 2. Server sends PING 

60 if self.rank == 0: 

61 logger.debug( 

62 "Server got message from launcher and sends PING back" 

63 ) 

64 snd_msg = self._encode_msg(message.Ping()) 

65 self._launcherfd.send(snd_msg)