Coverage for melissa/server/fault_tolerance.py: 38%
45 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
1"""This script defines `FaultTolerance` class to monitor failed jobs."""
2from typing import Dict, List
3import time
4import logging
5from melissa.server.simulation import Group
7logger = logging.getLogger(__name__)
10class FaultToleranceError(Exception):
11 """Fault tolerance specific exception."""
14class FaultTolerance:
15 """This class represents a Fault-Tolerance object with attributes and mechanisms for handling
16 simulation crashes and delays.
18 ### Parameters
19 - **ft_off** (`bool`): Indicates whether fault tolerance is enabled or disabled.
20 - **max_delay** (`float`): Maximum simulation walltime allowed before timing out.
21 - **crashes_before_redraw** (`int`): Number of accepted failures before resampling inputs.
22 - **nb_group** (`int`): Total number of groups involved in the simulation.
24 ### Attributes
25 - **ft_off**: The fault-tolerance switch.
26 - **max_delay**: The maximum simulation walltime.
27 - **crashes_before_redraw**: The threshold for accepted failures before triggering input
28 resampling.
29 - **restart_group** (`dict`): A dictionary identifying which groups require relaunching, either
30 with or without resampling.
31 - **failed_id** (`list`): A list of unique ids corresponding to failed groups.
32 - **nb_group**: The total count of groups."""
33 def __init__(self,
34 ft_off: bool,
35 max_delay: float,
36 crashes_before_redraw: int,
37 nb_group: int) -> None:
39 self.ft_off = ft_off
40 self.max_delay = max_delay
41 self.crashes_before_redraw = crashes_before_redraw
42 self.restart_group: Dict[int, bool] = {}
43 self.failed_id: List[int] = []
44 self.nb_group: int = nb_group
46 # def checkpointing(self) -> None:
47 # """
48 # this function needs to be designed with the following
49 # features:
50 # - save NN model [DL] or Stats [SA]
51 # - save buffer [DL]
52 # - save simulations dictionary [DL, SA]
53 # """
55 # def restart(self) -> None:
56 # """
57 # this function needs to be designed with the following
58 # features:
59 # - import NN model [DL] or Stats [SA]
60 # - import buffer [DL]
61 # - import simulations dictionary [DL, SA]
62 # """
64 def handle_failed_group(self, group_id: int, group: Group) -> bool:
65 """Handles the reaction to failed simulations for a specific group by performing necessary
66 actions such as marking failures and determining if additional steps are needed.
68 ### Parameters
69 - **group_id** (`int`): The unique identifier of the failed group.
70 - **group** (`Group`): An instance of the `Group` class representing
71 the group that has failed.
73 ### Returns
74 - **bool**: if the failure was handled successfully."""
76 return_bool: bool = False
77 group.nb_failures += 1
79 if group.nb_failures > self.crashes_before_redraw:
80 logger.warning(f"[Fault Tolerance] group-id/sim-id={group_id} failed too many times.")
81 return_bool = True
82 else:
83 logger.warning(
84 f"[Fault Tolerance] group-id/sim-id={group_id} failed {group.nb_failures} times."
85 )
86 return_bool = False
88 self.append_failed_id(group_id)
89 return return_bool
91 def check_time_out(self, groups: Dict[int, Group]) -> bool:
92 """Verifies if any simulation has timed out by performing the following actions:
93 - Creates a list of simulation ids that have timed out.
94 - Updates the number of failures.
95 - Creates a dictionary to relaunch all timed-out simulations.
96 - Attempts to update the list of unique failed ids.
97 - Returns a boolean indicating if any simulation has timed out.
99 ### Parameters
100 - **groups** (`Dict[int, Group]`): A dictionary where keys are group ids and values are
101 `Group` instances representing each simulation group.
103 ### Returns
104 - **bool**: if any simulation has timed out or not."""
106 timed_out_ids: List[int] = []
108 for group_id, grp in groups.items():
109 for _, sim in grp.simulations.items():
110 if (
111 sim.last_message is not None
112 and not sim.finished()
113 and time.time() - sim.last_message
114 > self.max_delay
115 and group_id not in timed_out_ids
116 ):
117 timed_out_ids.append(group_id)
119 for group_id in timed_out_ids:
120 logger.warning(f"group-id/sim-id={group_id} has timed-out.")
121 groups[group_id].nb_failures += 1
122 for sim in list(groups[group_id].simulations.values()):
123 sim.last_message = None
124 self.restart_group[group_id] = (
125 groups[group_id].nb_failures > self.crashes_before_redraw
126 )
127 self.append_failed_id(group_id)
129 return len(self.restart_group) > 0
131 def append_failed_id(self, group_id: int):
132 """Attempts to update the list of unique failed group IDs. If an error occurs during the
133 update process, an exception is raised.
135 ### Parameters
136 - **group_id** (`int`): The unique identifier of the group that has failed."""
137 if group_id not in self.failed_id:
138 self.failed_id.append(group_id)
139 if self.ft_off:
140 raise FaultToleranceError(
141 f"Fault-Tolerance is off, group {group_id} "
142 "failure will cause the server to abort"
143 )
144 if len(self.failed_id) == self.nb_group:
145 raise FaultToleranceError(
146 "All groups failed please make sure: \n"
147 "- the path to the executable is correct, \n"
148 "- the number of expected time steps is correct, \n"
149 "- the simulation walltime was well estimated."
150 )