Coverage for melissa/server/fault_tolerance.py: 38%

45 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-10 22:25 +0100

1"""This script defines `FaultTolerance` class to monitor failed jobs.""" 

2from typing import Dict, List 

3import time 

4import logging 

5from melissa.server.simulation import Group 

6 

7logger = logging.getLogger(__name__) 

8 

9 

10class FaultToleranceError(Exception): 

11 """Fault tolerance specific exception.""" 

12 

13 

14class FaultTolerance: 

15 """This class represents a Fault-Tolerance object with attributes and mechanisms for handling 

16 simulation crashes and delays. 

17 

18 ### Parameters 

19 - **ft_off** (`bool`): Indicates whether fault tolerance is enabled or disabled. 

20 - **max_delay** (`float`): Maximum simulation walltime allowed before timing out. 

21 - **crashes_before_redraw** (`int`): Number of accepted failures before resampling inputs. 

22 - **nb_group** (`int`): Total number of groups involved in the simulation. 

23 

24 ### Attributes 

25 - **ft_off**: The fault-tolerance switch. 

26 - **max_delay**: The maximum simulation walltime. 

27 - **crashes_before_redraw**: The threshold for accepted failures before triggering input 

28 resampling. 

29 - **restart_group** (`dict`): A dictionary identifying which groups require relaunching, either 

30 with or without resampling. 

31 - **failed_id** (`list`): A list of unique ids corresponding to failed groups. 

32 - **nb_group**: The total count of groups.""" 

33 def __init__(self, 

34 ft_off: bool, 

35 max_delay: float, 

36 crashes_before_redraw: int, 

37 nb_group: int) -> None: 

38 

39 self.ft_off = ft_off 

40 self.max_delay = max_delay 

41 self.crashes_before_redraw = crashes_before_redraw 

42 self.restart_group: Dict[int, bool] = {} 

43 self.failed_id: List[int] = [] 

44 self.nb_group: int = nb_group 

45 

46 # def checkpointing(self) -> None: 

47 # """ 

48 # this function needs to be designed with the following 

49 # features: 

50 # - save NN model [DL] or Stats [SA] 

51 # - save buffer [DL] 

52 # - save simulations dictionary [DL, SA] 

53 # """ 

54 

55 # def restart(self) -> None: 

56 # """ 

57 # this function needs to be designed with the following 

58 # features: 

59 # - import NN model [DL] or Stats [SA] 

60 # - import buffer [DL] 

61 # - import simulations dictionary [DL, SA] 

62 # """ 

63 

64 def handle_failed_group(self, group_id: int, group: Group) -> bool: 

65 """Handles the reaction to failed simulations for a specific group by performing necessary 

66 actions such as marking failures and determining if additional steps are needed. 

67 

68 ### Parameters 

69 - **group_id** (`int`): The unique identifier of the failed group. 

70 - **group** (`Group`): An instance of the `Group` class representing 

71 the group that has failed. 

72 

73 ### Returns 

74 - **bool**: if the failure was handled successfully.""" 

75 

76 return_bool: bool = False 

77 group.nb_failures += 1 

78 

79 if group.nb_failures > self.crashes_before_redraw: 

80 logger.warning(f"[Fault Tolerance] group-id/sim-id={group_id} failed too many times.") 

81 return_bool = True 

82 else: 

83 logger.warning( 

84 f"[Fault Tolerance] group-id/sim-id={group_id} failed {group.nb_failures} times." 

85 ) 

86 return_bool = False 

87 

88 self.append_failed_id(group_id) 

89 return return_bool 

90 

91 def check_time_out(self, groups: Dict[int, Group]) -> bool: 

92 """Verifies if any simulation has timed out by performing the following actions: 

93 - Creates a list of simulation ids that have timed out. 

94 - Updates the number of failures. 

95 - Creates a dictionary to relaunch all timed-out simulations. 

96 - Attempts to update the list of unique failed ids. 

97 - Returns a boolean indicating if any simulation has timed out. 

98 

99 ### Parameters 

100 - **groups** (`Dict[int, Group]`): A dictionary where keys are group ids and values are 

101 `Group` instances representing each simulation group. 

102 

103 ### Returns 

104 - **bool**: if any simulation has timed out or not.""" 

105 

106 timed_out_ids: List[int] = [] 

107 

108 for group_id, grp in groups.items(): 

109 for _, sim in grp.simulations.items(): 

110 if ( 

111 sim.last_message is not None 

112 and not sim.finished() 

113 and time.time() - sim.last_message 

114 > self.max_delay 

115 and group_id not in timed_out_ids 

116 ): 

117 timed_out_ids.append(group_id) 

118 

119 for group_id in timed_out_ids: 

120 logger.warning(f"group-id/sim-id={group_id} has timed-out.") 

121 groups[group_id].nb_failures += 1 

122 for sim in list(groups[group_id].simulations.values()): 

123 sim.last_message = None 

124 self.restart_group[group_id] = ( 

125 groups[group_id].nb_failures > self.crashes_before_redraw 

126 ) 

127 self.append_failed_id(group_id) 

128 

129 return len(self.restart_group) > 0 

130 

131 def append_failed_id(self, group_id: int): 

132 """Attempts to update the list of unique failed group IDs. If an error occurs during the 

133 update process, an exception is raised. 

134 

135 ### Parameters 

136 - **group_id** (`int`): The unique identifier of the group that has failed.""" 

137 if group_id not in self.failed_id: 

138 self.failed_id.append(group_id) 

139 if self.ft_off: 

140 raise FaultToleranceError( 

141 f"Fault-Tolerance is off, group {group_id} " 

142 "failure will cause the server to abort" 

143 ) 

144 if len(self.failed_id) == self.nb_group: 

145 raise FaultToleranceError( 

146 "All groups failed please make sure: \n" 

147 "- the path to the executable is correct, \n" 

148 "- the number of expected time steps is correct, \n" 

149 "- the simulation walltime was well estimated." 

150 )