Server Class¶
melissa.server.base_server.BaseServer¶
Bases: ABC
BaseServer
class that handles the following tasks:
- Manages connections with the launcher and clients.
- Generates client scripts for simulations.
- Encodes and decodes messages between the server and clients.
- Provides basic checkpointing functionality to save and restore states.
Parameters¶
- config_dict (
Dict[str, Any]
): A dictionary containing configuration settings for initializing the server. - checkpoint_file (
str
, optional): The filename for the checkpoint file (default is"checkpoint.pkl"
). This file is used for saving and restoring the server's state.
Attributes¶
- comm (
MPI.Intracomm
): The MPI communicator for inter-process communication. - rank (
int
): The rank of the current process in the MPI communicator. - comm_size (
int
): The total number of server processes in the MPI communicator. - client_comm_size (
int
): The total number of client processes. - server_processes (
int
): Synonym forcomm_size
. - connection_port (
int
): The server port to establish request-response connection with the clients. -
data_puller_port (
int
): The server port to establish data pulling with the clients. -
_offline_mode (
bool
): Internal flag indicating offline mode where no sending operation takes place. Useful when running multiple clients to produce datasets. Callself.make_study_offline
to enable. - _learning (
int
): Internal flag indicating the learning state (initially 0). - __t0 (
float
): The timestamp marking the initialization time of the object. - _job_limit (
int
): Maximum number of jobs the launcher can manage concurrently. -
__is_direct_scheduler (
bool
): Flag indicating whether the study is using a direct scheduler. -
_restart (
int
): Flag indicating if the system is in a restart state; initialized from theMELISSA_RESTART
environment variable. - _consistency_lock (
threading.RLock
): Reentrant lock to ensure thread-safe operations on shared resources. - _is_receiving (
bool
): Flag indicating whether data reception is ongoing. - _is_online (
bool
): Flag indicating if the system is in an online operational mode. - _sobol_op (
bool
): Flag indicating whether Sobol operations are being performed. - _total_bytes_recv (
int
): Tracks the total number of bytes received over the network. - _active_sim_ids (
set
): Set of active simulation ids currently being managed. -
max_sim_id (
int
): Tracks the maximum simulation id observed so far. -
_groups (
Dict[int, Group]
): Dictionary mapping group ids toGroup
objects. - _parameter_sampler (
Optional[BaseExperiment]
): Sampler for generating parameter values for simulations. -
__parameter_generator (
Any
): Internal generator object for producing parameters. -
verbose_level (
int
): Determines the verbosity level for logging and debugging output. - config_dict (
Dict[str, Any
]): Configuration dictionary provided during initialization. -
checkpoint_file (
str
): File name used for storing checkpoint data. -
crashes_before_redraw (
int
): Number of simulation crashes allowed before redrawing parameters. - max_delay (
Union[int, float]
): Maximum allowed delay for simulations, in seconds. - rm_script (
bool
): Indicates whether client scripts should be removed after execution. - group_size (
int
): Number of simulations grouped together for batch processing. -
zmq_hwm (
int
): High-water mark for ZeroMQ communication. -
fields (
List[str]
): List of field names used in the study. - nb_parameters (
int
): Number of parameters in the parameter sweep study. - nb_time_steps (
int
): Number of time steps in each simulation. -
nb_clients (
int
): Total number of clients participating in the parameter sweep study. -
nb_groups (
int
): Total number of groups, derived from the number of clients and group size. - nb_submitted_groups (
int
): Tracks the number of groups submitted so far. finished_groups (set
): Tracks the finished set of groups. -
mtt_simulation_completion (
float
): Iteratively keeps track of mean of simulation durations. -
no_fault_tolerance (
bool
): Indicates whether fault tolerance is disabled, based on theMELISSA_FAULT_TOLERANCE
environment variable. - __ft (
FaultTolerance
): Fault tolerance object managing simulation crashes and retries.
time_steps_known
property
¶
Time steps are known prior study or not.
is_direct_scheduler
property
¶
Study is using a direct scheduler or not.
learning
property
¶
Deep learning activated? Required when establishing a connection with clients.
consistency_lock
property
¶
Useful for active sampling.
__loop_pings()
¶
Maintains communication with the launcher to ensure it does not assume the server has become unresponsive.
_start_pinger_thread()
¶
Starts the pinger thread and set the flag.
_stop_pinger_thread()
¶
Stops the pinger thread and unsets the flag.
_save_base_state()
¶
Checkpoints all common attributes in the server class to preserve the current state.
_load_base_state()
¶
Loads all common attributes in the server class from a checkpoint or saved state.
__initialize_ports(connection_port=2003, data_puller_port=5000)
¶
Assigns port numbers for connection and data pulling as class attributes. If the specified ports are already in use, likely due to multiple servers running on the same node, the function attempts to find available ports by incrementing the base port values and rechecking their availability.
Note: When multiple independent melissa-server
jobs are running simultaneously
on the same node, there is a chance that a port may incorrectly appear as available,
leading to potential conflicts.
Parameters¶
- connection_port (
int
, optional): The port number used for establishing the main connection (default is2003
). - data_puller_port (
int
, optional): The port number used for pulling data (default is5000
).
Raises¶
FatalError
: If no ports were found after given number of attempts.
__connect_to_launcher()
¶
Establishes a connection with the launcher and sends metadata about the study.
__setup_sockets()
¶
Sets up ZeroMQ (ZMQ) sockets over a given TCP connection port for communication.
__setup_poller()
¶
This method sets up the polling mechanism by registering three important sockets: - Data Socket: Handles data communication. - Timer Socket: Manages timing events. - Launcher Socket: Facilitates communication with the launcher.
__start_debugger()
¶
Launches the Visual Studio Code (VSCode) debugger for debugging purposes.
initialize_connections()
¶
Initializes socket connections for communication.
_get_group_id_by_simulation(sim_id)
¶
Returns group id of the given simulation id.
_get_sim_id_list_by_group(group_id)
¶
Returns a list of all simulation ids for a given group id.
_get_all_sim_ids()
¶
Yields all simulation ids across all groups.
_verify_and_update_sampler_kwargs(sampler_t, **kwargs)
¶
Updates the parameters that were not provided by the user when
creating a sampler using set_parameter_sampler
method. It also ensures whether
the seed is given or not for a parallel server.
set_parameter_sampler(sampler_t, **kwargs)
¶
Sets the defined parameter sampler type. This dictates how parameters are sampled for experiments. This sampler type can either be pre-defined or customized by inheriting a pre-defined sampling class.
Parameters¶
- sampler_t (
Union[ParameterSamplerType, Type[ParameterSamplerClass]]
):ParameterSamplerType
: Enum specifying pre-defined samplers.Type[ParameterSamplerClass]
: A class type to instantiate.
- kwargs (
Dict[str, Any]
): Dictionary of keyword arguments. Useful to pass custom parameter as well as strict parameter such asl_bounds
,u_bounds
,apply_pick_freeze
,second_order
,seed=0
, etc.
_update_parameter_sampler()
¶
Updates the existing parameter sampler.
_launch_groups(group_ids)
¶
Launches the study groups for the very first run. This process involves generating the client scripts and ensures that no restart has occurred in the case of fault tolerance.
Parameters¶
- group_ids (
List[int]
): A list of group identifiers to launch.
_generate_client_scripts(group_ids, create_new_group=False)
¶
Creates all required client scripts (e.g., client.X.sh
),
and sets up a dictionary for fault tolerance.
Parameters¶
- group_ids (
List[int]
): A list of group identifiers. - create_new_group (
bool
, optional): Flag indicating whether to create a new group of clients (default isFalse
).
__generate_client_script(sim_id, parameters, script_path)
¶
Generates a single client script for a given simulation id and parameters.
Parameters¶
- sim_id (
int
): The simulation id associated with the client script. - parameters (
list
): The list of parameters. - script_path (
str
): The absolute path of the client script to create.
_write_environment_variables(f, sim_id)
¶
Writes environment variables to the client script.
Parameters¶
- f (
Any
): The file object to write to. - sim_id (
int
): The simulation id associated with the client script.
_write_execution_command(f, parameters)
¶
Writes the execution command to the client script.
Parameters¶
- f (
Any
): The file object to write to. - parameters (
list
): The list of parameters.
_launch_group(group_id)
¶
Submits a request to the launcher to run a given group id. For non-Sobol studies, the group id and simulation id are the same.
Parameters¶
- group_id (
int
): The unique identifier of the group to be launched.
_kill_group(group_id)
¶
Submits a request to the launcher to terminate a given group id.
Parameters¶
- group_id (
int
): The unique identifier of the group to be terminated.
_relaunch_group(group_id, create_new_group)
¶
Relaunches a failed group with or without new parameters, depending on the fault tolerance configuration.
Parameters¶
- group_id (
int
): The unique identifier of the group to be relaunched. - create_new_group (
bool
): A flag indicating whether to create a new group with new parameters.
_handle_simulation_connection(msg)
¶
Handles an incoming connection request from a submitted simulation. This method is executed by rank 0 only.
Parameters¶
- msg (
bytes
): The message received from the simulation requesting a connection.
Returns¶
int
: The simulation id of the connected simulation, or-1
if the connection could not be established.
_restart_groups()
¶
Kills and restarts simulations that were running when the server crashed.
poll_sockets(timeout=10)
¶
Performs polling over the registered socket descriptors to monitor various events, including timer, launcher messages, new client connections, and data readiness.
Parameters¶
- timeout (
int
, optional): The maximum time (in seconds) to wait for a socket event before returning. Default is10
seconds.
Returns¶
Optional[Union[ServerStatus, SimulationData, PartialSimulationData]]
:ServerStatus
if the event is related to server status.SimulationData
if new simulation data is received.PartialSimulationData
if partial data from a simulation is received.
__handle_timerfd()
¶
Handles timer messages.
_handle_fd()
¶
Handles the launcher's messages.
_decode_msg(byte_stream)
¶
_encode_msg(msg)
¶
_all_done()
¶
Checks whether all clients' data has been received and unregisters the timer socket if completed.
Returns¶
bool
: if all clients' data has been successfully received.
close_connection(exit_=0)
¶
Signals to the launcher that the study has ended with a specified exit status.
Parameters¶
exit_
(int
, optional): The exit status code to be sent to the launcher. Defaults to0
, indicating successful completion.
get_memory_info_in_gb()
¶
Returns a Tuple[float, float]
containing memory consumed and
the total main memory in GB.
_show_insights()
¶
Logs information gathered from clients, and server processing.
_write_final_report()
¶
Write miscellaneous information about the analysis.
_server_finalize(exit_=0)
¶
Finalizes the server operations.
Parameters¶
exit_
(int
, optional): The exit status code indicating the outcome of the server's operations. Defaults to0
, which signifies a successful termination.
setup_environment()
¶
Optional. A method that sets up the environment or initialization.
Any necessary setup methods go here.
For example, Melissa DL study needs dist.init_process_group
to be called.
_check_simulation_data(simulation, simulation_data)
¶
Tracks and validates incoming simulation data.
- Client Rank Initialization:
Ensures
simulation_data
structures are initialized per client rank. - Dynamic Matrix Expansion: Handles unknown sizes dynamically as new time steps are encountered.
- Duplicate Data Detection: Discards messages if the data for the specified field and time step has already been received.
- Time Step Completion:
- Checks if all fields for a specific time step have been received and processes them
into a
SimulationData
object. - Handles cases where the data is empty.
- Checks if all fields for a specific time step have been received and processes them
into a
- Partial Data Handling: Tracks fields received so far and waits for completion.
Parameters¶
- simulation (
Simulation
): Tracks the state and received data of the simulation. - simulation_data (
PartialSimulationData
): The incoming data message from the simulation.
Returns¶
SimulationDataStatus
: Status of the simulation data (COMPLETE
,PARTIAL
,ALREADY_RECEIVED
,EMPTY
).Union[Optional[SimulationData], Optional[PartialSimulationData]]
:- Sensitivity Analysis:
- A
PartialSimulationData
object regardless of it being incomplete as SA can be computed independently.
- A
- Deep Learning:
- A
SimulationData
object if all fields for the time step are complete. None
if the data is incomplete or invalid.
- A
- Sensitivity Analysis:
__deserialize_message(msg)
¶
_validate_data(simulation_data)
¶
__determine_and_process_simulation_data(simulation_data)
¶
_handle_simulation_data(msg)
¶
This method handles the following tasks:
1. Deserialization: Converts the incoming byte stream
into a PartialSimulationData
object.
2. Validation: Ensures the data is valid based on:
- Time step being within the allowed range.
- Field name being recognized.
3. Simulation Data Handling:
- Updates the status of the simulation based on the received data.
- Detects and logs duplicate messages.
4. Completion Check:
- Marks the simulation as finished if all data is received.
- Updates the count of finished simulations.
Parameters¶
- msg (
bytes
): A serialized message containing simulation data.
Returns¶
Optional[PartialSimulationData]
:PartialSimulationData
, if successful.None
, if the message fails validation.
_server_online()
abstractmethod
¶
An abstract method where user controls the data handling while server is online. Unique to melissa flavors.
_server_offline()
abstractmethod
¶
An abstract method where user controls the data handling while server is offline. Unique to melissa flavors.
_check_group_size()
abstractmethod
¶
An abstract method that checks if the group size was correctly set. Unique to melissa flavors.
_process_partial_data_reception(simulation, simulation_data)
abstractmethod
¶
Returns a value when data has been partially received. Unique to melissa flavors.
_process_complete_data_reception(simulation, simulation_data)
abstractmethod
¶
Returns a value when data has been completely received. Unique to melissa flavors.
_receive()
abstractmethod
¶
Handles data coming from the server object. Unique to melissa flavors.
start()
abstractmethod
¶
The high level organization of server events. Unique to melissa flavors.
_restart_from_checkpoint(**kwargs)
abstractmethod
¶
Restarts the server object from a checkpoint. Unique to melissa flavors.
_checkpoint(**kwargs)
abstractmethod
¶
Checkpoint the server object. Unique to melissa flavors.