Server Class

melissa.server.base_server.BaseServer

Bases: ABC

BaseServer class that handles the following tasks:

  • Manages connections with the launcher and clients.
  • Generates client scripts for simulations.
  • Encodes and decodes messages between the server and clients.
  • Provides basic checkpointing functionality to save and restore states.
Parameters
  • config_dict (Dict[str, Any]): A dictionary containing configuration settings for initializing the server.
  • checkpoint_file (str, optional): The filename for the checkpoint file (default is "checkpoint.pkl"). This file is used for saving and restoring the server's state.
Attributes
  • comm (MPI.Intracomm): The MPI communicator for inter-process communication.
  • rank (int): The rank of the current process in the MPI communicator.
  • comm_size (int): The total number of server processes in the MPI communicator.
  • client_comm_size (int): The total number of client processes.
  • server_processes (int): Synonym for comm_size.
  • connection_port (int): The server port to establish request-response connection with the clients.
  • data_puller_port (int): The server port to establish data pulling with the clients.

  • _offline_mode (bool): Internal flag indicating offline mode where no sending operation takes place. Useful when running multiple clients to produce datasets. Call self.make_study_offline to enable.

  • _learning (int): Internal flag indicating the learning state (initially 0).
  • __t0 (float): The timestamp marking the initialization time of the object.
  • _job_limit (int): Maximum number of jobs the launcher can manage concurrently.
  • __is_direct_scheduler (bool): Flag indicating whether the study is using a direct scheduler.

  • _restart (int): Flag indicating if the system is in a restart state; initialized from the MELISSA_RESTART environment variable.

  • _consistency_lock (threading.RLock): Reentrant lock to ensure thread-safe operations on shared resources.
  • _is_receiving (bool): Flag indicating whether data reception is ongoing.
  • _is_online (bool): Flag indicating if the system is in an online operational mode.
  • _sobol_op (bool): Flag indicating whether Sobol operations are being performed.
  • _total_bytes_recv (int): Tracks the total number of bytes received over the network.
  • _active_sim_ids (set): Set of active simulation ids currently being managed.
  • max_sim_id (int): Tracks the maximum simulation id observed so far.

  • _groups (Dict[int, Group]): Dictionary mapping group ids to Group objects.

  • _parameter_sampler (Optional[BaseExperiment]): Sampler for generating parameter values for simulations.
  • __parameter_generator (Any): Internal generator object for producing parameters.

  • verbose_level (int): Determines the verbosity level for logging and debugging output.

  • config_dict (Dict[str, Any]): Configuration dictionary provided during initialization.
  • checkpoint_file (str): File name used for storing checkpoint data.

  • crashes_before_redraw (int): Number of simulation crashes allowed before redrawing parameters.

  • max_delay (Union[int, float]): Maximum allowed delay for simulations, in seconds.
  • rm_script (bool): Indicates whether client scripts should be removed after execution.
  • group_size (int): Number of simulations grouped together for batch processing.
  • zmq_hwm (int): High-water mark for ZeroMQ communication.

  • fields (List[str]): List of field names used in the study.

  • nb_parameters (int): Number of parameters in the parameter sweep study.
  • nb_time_steps (int): Number of time steps in each simulation.
  • nb_clients (int): Total number of clients participating in the parameter sweep study.

  • nb_groups (int): Total number of groups, derived from the number of clients and group size.

  • nb_submitted_groups (int): Tracks the number of groups submitted so far. finished_groups (set): Tracks the finished set of groups.
  • mtt_simulation_completion (float): Iteratively keeps track of mean of simulation durations.

  • no_fault_tolerance (bool): Indicates whether fault tolerance is disabled, based on the MELISSA_FAULT_TOLERANCE environment variable.

  • __ft (FaultTolerance): Fault tolerance object managing simulation crashes and retries.

time_steps_known property

Time steps are known prior study or not.

is_direct_scheduler property

Study is using a direct scheduler or not.

learning property

Deep learning activated? Required when establishing a connection with clients.

consistency_lock property

Useful for active sampling.

__loop_pings()

Maintains communication with the launcher to ensure it does not assume the server has become unresponsive.

_start_pinger_thread()

Starts the pinger thread and set the flag.

_stop_pinger_thread()

Stops the pinger thread and unsets the flag.

_save_base_state()

Checkpoints all common attributes in the server class to preserve the current state.

_load_base_state()

Loads all common attributes in the server class from a checkpoint or saved state.

__initialize_ports(connection_port=2003, data_puller_port=5000)

Assigns port numbers for connection and data pulling as class attributes. If the specified ports are already in use, likely due to multiple servers running on the same node, the function attempts to find available ports by incrementing the base port values and rechecking their availability.

Note: When multiple independent melissa-server jobs are running simultaneously on the same node, there is a chance that a port may incorrectly appear as available, leading to potential conflicts.

Parameters
  • connection_port (int, optional): The port number used for establishing the main connection (default is 2003).
  • data_puller_port (int, optional): The port number used for pulling data (default is 5000).
Raises
  • FatalError: If no ports were found after given number of attempts.

__connect_to_launcher()

Establishes a connection with the launcher and sends metadata about the study.

__setup_sockets()

Sets up ZeroMQ (ZMQ) sockets over a given TCP connection port for communication.

__setup_poller()

This method sets up the polling mechanism by registering three important sockets: - Data Socket: Handles data communication. - Timer Socket: Manages timing events. - Launcher Socket: Facilitates communication with the launcher.

__start_debugger()

Launches the Visual Studio Code (VSCode) debugger for debugging purposes.

initialize_connections()

Initializes socket connections for communication.

_get_group_id_by_simulation(sim_id)

Returns group id of the given simulation id.

_get_sim_id_list_by_group(group_id)

Returns a list of all simulation ids for a given group id.

_get_all_sim_ids()

Yields all simulation ids across all groups.

_verify_and_update_sampler_kwargs(sampler_t, **kwargs)

Updates the parameters that were not provided by the user when creating a sampler using set_parameter_sampler method. It also ensures whether the seed is given or not for a parallel server.

set_parameter_sampler(sampler_t, **kwargs)

Sets the defined parameter sampler type. This dictates how parameters are sampled for experiments. This sampler type can either be pre-defined or customized by inheriting a pre-defined sampling class.

Parameters
  • sampler_t (Union[ParameterSamplerType, Type[ParameterSamplerClass]]):
    • ParameterSamplerType: Enum specifying pre-defined samplers.
    • Type[ParameterSamplerClass]: A class type to instantiate.
  • kwargs (Dict[str, Any]): Dictionary of keyword arguments. Useful to pass custom parameter as well as strict parameter such as l_bounds, u_bounds, apply_pick_freeze, second_order, seed=0, etc.

_update_parameter_sampler()

Updates the existing parameter sampler.

_launch_groups(group_ids)

Launches the study groups for the very first run. This process involves generating the client scripts and ensures that no restart has occurred in the case of fault tolerance.

Parameters
  • group_ids (List[int]): A list of group identifiers to launch.

_generate_client_scripts(group_ids, create_new_group=False)

Creates all required client scripts (e.g., client.X.sh), and sets up a dictionary for fault tolerance.

Parameters
  • group_ids (List[int]): A list of group identifiers.
  • create_new_group (bool, optional): Flag indicating whether to create a new group of clients (default is False).

__generate_client_script(sim_id, parameters, script_path)

Generates a single client script for a given simulation id and parameters.

Parameters
  • sim_id (int): The simulation id associated with the client script.
  • parameters (list): The list of parameters.
  • script_path (str): The absolute path of the client script to create.

_write_environment_variables(f, sim_id)

Writes environment variables to the client script.

Parameters
  • f (Any): The file object to write to.
  • sim_id (int): The simulation id associated with the client script.

_write_execution_command(f, parameters)

Writes the execution command to the client script.

Parameters
  • f (Any): The file object to write to.
  • parameters (list): The list of parameters.

_launch_group(group_id)

Submits a request to the launcher to run a given group id. For non-Sobol studies, the group id and simulation id are the same.

Parameters
  • group_id (int): The unique identifier of the group to be launched.

_kill_group(group_id)

Submits a request to the launcher to terminate a given group id.

Parameters
  • group_id (int): The unique identifier of the group to be terminated.

_relaunch_group(group_id, create_new_group)

Relaunches a failed group with or without new parameters, depending on the fault tolerance configuration.

Parameters
  • group_id (int): The unique identifier of the group to be relaunched.
  • create_new_group (bool): A flag indicating whether to create a new group with new parameters.

_handle_simulation_connection(msg)

Handles an incoming connection request from a submitted simulation. This method is executed by rank 0 only.

Parameters
  • msg (bytes): The message received from the simulation requesting a connection.
Returns
  • int: The simulation id of the connected simulation, or -1 if the connection could not be established.

_restart_groups()

Kills and restarts simulations that were running when the server crashed.

poll_sockets(timeout=10)

Performs polling over the registered socket descriptors to monitor various events, including timer, launcher messages, new client connections, and data readiness.

Parameters
  • timeout (int, optional): The maximum time (in seconds) to wait for a socket event before returning. Default is 10 seconds.
Returns
  • Optional[Union[ServerStatus, SimulationData, PartialSimulationData]]:
    • ServerStatus if the event is related to server status.
    • SimulationData if new simulation data is received.
    • PartialSimulationData if partial data from a simulation is received.

__handle_timerfd()

Handles timer messages.

_handle_fd()

Handles the launcher's messages.

_decode_msg(byte_stream)

Deserializes a message based on the specified protocol.

Parameters
  • byte_stream (bytes): The byte stream to be deserialized, representing the encoded message.
Returns
  • List[Message]: A list of byte sequences representing the deserialized message components.

_encode_msg(msg)

Serializes message based on the specified protocol.

Parameters
  • msg (Message): The message to be serialized, typically a byte sequence that needs encoding.
Returns
  • bytes: The serialized byte stream representing the encoded message.

_all_done()

Checks whether all clients' data has been received and unregisters the timer socket if completed.

Returns
  • bool: if all clients' data has been successfully received.

close_connection(exit_=0)

Signals to the launcher that the study has ended with a specified exit status.

Parameters
  • exit_ (int, optional): The exit status code to be sent to the launcher. Defaults to 0, indicating successful completion.

get_memory_info_in_gb()

Returns a Tuple[float, float] containing memory consumed and the total main memory in GB.

_show_insights()

Logs information gathered from clients, and server processing.

_write_final_report()

Write miscellaneous information about the analysis.

_server_finalize(exit_=0)

Finalizes the server operations.

Parameters
  • exit_ (int, optional): The exit status code indicating the outcome of the server's operations. Defaults to 0, which signifies a successful termination.

setup_environment()

Optional. A method that sets up the environment or initialization. Any necessary setup methods go here. For example, Melissa DL study needs dist.init_process_group to be called.

_check_simulation_data(simulation, simulation_data)

Tracks and validates incoming simulation data.

  1. Client Rank Initialization: Ensures simulation_data structures are initialized per client rank.
  2. Dynamic Matrix Expansion: Handles unknown sizes dynamically as new time steps are encountered.
  3. Duplicate Data Detection: Discards messages if the data for the specified field and time step has already been received.
  4. Time Step Completion:
    • Checks if all fields for a specific time step have been received and processes them into a SimulationData object.
    • Handles cases where the data is empty.
  5. Partial Data Handling: Tracks fields received so far and waits for completion.
Parameters
  • simulation (Simulation): Tracks the state and received data of the simulation.
  • simulation_data (PartialSimulationData): The incoming data message from the simulation.
Returns
  • SimulationDataStatus: Status of the simulation data (COMPLETE, PARTIAL, ALREADY_RECEIVED, EMPTY).
  • Union[Optional[SimulationData], Optional[PartialSimulationData]]:
    • Sensitivity Analysis:
      • A PartialSimulationData object regardless of it being incomplete as SA can be computed independently.
    • Deep Learning:
      • A SimulationData object if all fields for the time step are complete.
      • None if the data is incomplete or invalid.

__deserialize_message(msg)

Deserializes a byte stream into a PartialSimulationData object.

Parameters
  • msg (bytes): Serialized message containing simulation data.
Returns
  • PartialSimulationData: Data objet.

_validate_data(simulation_data)

Validates the time step and field of the received simulation data.

Parameters
  • simulation_data (PartialSimulationData): The data to validate.
Returns
  • bool: if the data is valid.

__determine_and_process_simulation_data(simulation_data)

Determines the status of the simulation data and handles actions accordingly.

Parameters
  • simulation_data (PartialSimulationData): The incoming simulation data to process.
Returns
  • Optional[Union[SimulationData, PartialSimulationData]]: return of the _check_simulation_data method.

_handle_simulation_data(msg)

This method handles the following tasks: 1. Deserialization: Converts the incoming byte stream into a PartialSimulationData object. 2. Validation: Ensures the data is valid based on: - Time step being within the allowed range. - Field name being recognized. 3. Simulation Data Handling: - Updates the status of the simulation based on the received data. - Detects and logs duplicate messages. 4. Completion Check: - Marks the simulation as finished if all data is received. - Updates the count of finished simulations.

Parameters
  • msg (bytes): A serialized message containing simulation data.
Returns
  • Optional[PartialSimulationData]:
    • PartialSimulationData, if successful.
    • None, if the message fails validation.

_server_online() abstractmethod

An abstract method where user controls the data handling while server is online. Unique to melissa flavors.

_server_offline() abstractmethod

An abstract method where user controls the data handling while server is offline. Unique to melissa flavors.

_check_group_size() abstractmethod

An abstract method that checks if the group size was correctly set. Unique to melissa flavors.

_process_partial_data_reception(simulation, simulation_data) abstractmethod

Returns a value when data has been partially received. Unique to melissa flavors.

_process_complete_data_reception(simulation, simulation_data) abstractmethod

Returns a value when data has been completely received. Unique to melissa flavors.

_receive() abstractmethod

Handles data coming from the server object. Unique to melissa flavors.

start() abstractmethod

The high level organization of server events. Unique to melissa flavors.

_restart_from_checkpoint(**kwargs) abstractmethod

Restarts the server object from a checkpoint. Unique to melissa flavors.

_checkpoint(**kwargs) abstractmethod

Checkpoint the server object. Unique to melissa flavors.

melissa.server.base_server.ServerStatus

Bases: Enum

Server status enum.