Melissa Launcher

Launcher objectives

With Melissa, the launcher is in charge of interacting with the batch scheduler to submit, cancel and monitor jobs. In this context, it first submits the server job and waits for him to connect to its dedicated socket. Then, upon reception of client job requests from the server, the launcher will submit these to the batch scheduler.

As soon as the first job has been submitted, the launcher will monitor its status as well as that of the jobs submitted afterwards. For every client job, the launcher will send its status to the server so that in case of failure, it can ask for its resubmission.

In addition to the server status inferred from the information shared by the scheduler, the launcher is expecting to regularly receive life signals. To this end, the launcher sends the server every T seconds a ping and considers the server dead if it did not receive a message (any message) within the last 2*T seconds. To convince the launcher that the server is alive, it suffices to respond to a ping with a ping. Otherwise, a time out will be detected and a new server job will be submitted.

Warning

The launcher is aggressively restarting the server. This means in the case of a server crash (i.e., the batch scheduler marks the job as failed or the server times out and so on), the launcher will cancel all jobs and submit immediately a new server job without waiting for successful job cancellation. It could happen that two servers are running at the same time and this would cause unexpected launcher behavior if the old server job would try to reconnect.

Launcher design

Design Goals

The goal of the launcher is to simplify job management independent of the batch scheduler at hand. Challenges are the following:

  • the launcher must be responsive to user input at all times, most importantly SIGINT (i.e., the user pressed Ctrl+C),
  • the launcher must separate code specific to a certain batch scheduler from the job management logic,
  • batch scheduler calls may block,
  • there is need to execute certain actions regularly (e.g., pinging processes).

Experience from Melissa SA and DA has shown that:

  • multi-threading is error prone, hard to maintain, and impossible to test reliably because of the lack of control over the thread scheduler,
  • busy loop interspersed with random sleep statements may miss events or process them in the wrong order.

For these reasons, the launcher design is based on the following principles:

  • the launcher waits for events,
  • the code interacting with the environment (operating system, batch scheduler, timers, and so on) is separate from the actual job management logic.

This lead to the design containing the following elements:

The job management is implemented by a state machine consuming events, performing state transitions, and returning actions. Examples for events are:

  • signal receptions,
  • time-outs,
  • completed batch scheduler calls,
  • new network connections, and
  • message receptions.

Examples for actions are:

  • stopping the launcher,
  • job submissions, job updates, and job cancellations,
  • pinging the server.

High-Level Structure

Originally, there were supposed to be two major threads: an I/O master thread and the state machine. The I/O master would put events into a queue, the state machine would consume events from the queue and produce actions for a second queue, then the I/O master would consume actions from the second queue and execute them. Short-lived threads might be spawned to perform any kind of blocking operations, e.g., batch scheduler calls or I/O.

def io_master(events: Queue, actions: Queue):
    while True:
        ev = wait_for(actions, INPUT)
        if isinstance(ev, List[Action]):
            for ac in actions(ev):
                ev := execute(ac)
                events.put(ev)
        else:
            events.put(ev)

The major drawback of this design is that it requires two threads to interact. Moreover, the state machine can be assumed to transition near-instantaneous and for this reason, the I/O master thread began calling the state machine almost directly.

class Processor:
    def execute(ev: Event) -> List[Action]:
        # TODO

def io_master(events: Queue, actions: Queue, processor: Processor):
    while not stop:
        ev = wait_for(events, INPUT)
        actions = processor.execute(ev)
        for ac in actions(ev):
            ev = execute(ac)
            events.put(ev)

The Processor class was introduced solely for testing purposes. With the input, the queues, and the processor implementation under the control of the caller, the I/O master can be easily and thoroughly tested. A Processor.execute() implementation calling the real state machine is as simple as return state.transition(event). Note that there is still an event queue which is needed for events generated by actions. With the two-thread design the event queue would contain all events at some point; in the new design it only contains events created when executing actions.

As of October, 2022, this is still the current high-level launcher structure.

Waiting for Events

The launcher needs to wait for different events:

  • connection attempts, disconnects, and messages from the server,
  • signals,
  • time-outs, and
  • completion of actions (e.g., batch scheduler job updates).

In practice this is achieved by waiting for events on multiple file descriptors. For networking this can be accomplished with sockets, signals can be sent directly to a file descriptor with the Linux-specific function signalfd(2), and time-outs can be realized by passing a time-out to poll(2) or select(2). With respect to the event queue, a trick is performed: the launcher cannot wait for Python's queue like for a file descriptor but the launcher code contains a custom queue implementation that writes one byte to a file descriptor whenever an element is added to the queue.

Keep in mind that signals are only received by the Python main thread.

The implementation is as follows:

  • The different events are distinguished by checking which file descriptor was readable.
  • The Python main thread installs a signal handler. Whenever a signal is received, the handler writes a byte containing the signal number to a file descriptor.
  • A timer thread writes one byte to a file descriptor and sleeps before it writes another byte.
  • Action completions are detected by putting an event into the customized event queue.

The code below demonstrates these ideas:

class CustomQueue:
    def __init__(self, fd):
        self._queue = queue.Queue()
        self._fd = fd

    def put(self, item):
        self._queue.put(item)
        os.write(self._fd, "\0")

    def get(self):
        return self._queue.get()

class Timer:
    def __init__(self, fd):
        self._fd = fd

    def run(self):
        while True:
            ret = poll([self.fd_], POLLIN | POLLERR, timeout)
            if ret is None:
                os.write(self._fd, "\0")
            else:
                return

def main():
    listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP)
    listenfd.bind(("localhost", 0))
    listenfd.listen()
    signalfd_0, signalfd_1 = socketpair()
    timerfd_0, timerfd_1 = socketpair()
    eventfd_r, eventfd_w = pipe()
    event_fifo = CustomQueue(eventfd_w)
    timer = Timer(timerfd_1)
    t_timer = threading.Thread(target=lambda: timer.run())
    t_timer.run()
    io_master(listenfd, signalfd_0, timerfd_0, eventfd_r, event_fifo)

def io_master(listenfd, signalfd, timerfd, eventfd, event_fifo):
    processor = StateMachine()
    while True:
        fd = poll([listenfd, signalfd, timerfd, eventfd], POLLIN | POLLERR)
        if fd == listenfd:
            actions = processor.execute(events.NewConnection())
        elif fd == signalfd:
            actions = processor.execute(events.Signal(signo=os.read(fd)))
        elif fd == timerfd:
            actions = processor.execute(events.Timeout())
        elif fd == eventfd:
            actions = processor.execute(event_fifo.get())
        else:
            raise RuntimeError("BUG")

        for ac in actions:
            pass

Asynchronicity

Some actions cannot be executed instantly including most scheduler calls when a batch scheduler is used. The first consequence is that the I/O master thread cannot wait (synchronously) for the completion of launched processes since the launcher shall be responsive at all times. As a substitute, a minion thread waits for completion of a launched process and adds the pseudoevent melissa.launcher.ProcessCompletion_ (with trailing underscore) to the event queue; it is a pseudoevent because the event is supposed to be consumed by the I/O master and not the state machine (or processor). The second consequence is the need to terminate launched processes when the user demands it (e.g., after pressing Ctrl+C). (For this reason the Python concurrent.futures module on its own does not solve the asynchronicity challenges (running futures cannot be cancelled)). All launched processes are added to a list maintained by the I/O master directly after being started and removed after exiting. This way the I/O master can always terminate running processes.

The code below illustrates the chosen approach:

def io_master(event_fifo: Queue, processor: Processor, scheduler: Scheduler):
    processes = []
    while True:
        ev := event_fifo.get()
        if isinstance(ev, events.ProcessCompletion_):
            # ...
            actions = processor.execute(...)
        else:
            actions = processor.execute(event_fifo.get())

        for ac in actions:
            if isinstance(ac, action.JobSubmission):
                args, env = scheduler.submit_job(ac.job_details)
                proc = subprocess.Popen(args, env)
                processes.append(proc)

                def wait():
                    proc.wait()
                    event_fifo.put(events.ProcessCompletion_(proc))

                threading.Thread(target=wait).run()
            elif isinstance(ac, action.SubprocessTermination):
                for p in processes:
                    p.kill()
            else:
                pass

        processes = [p for p in processes if p.poll() is None]

Configuring the launcher

The launcher help can be displayed with the following command:

melissa-launcher -h
It returns the following message:
usage: melissa-launcher [-h] [--config_name CONFIG_NAME] [--print-options] [--version]

Melissa Launcher

options:
  -h, --help            show this help message and exit
  --config_name CONFIG_NAME, -c CONFIG_NAME
                        User defined configuration file. Path can be relative or absolute.
  --version, -v         show the Melissa version
  --print-options       Show the available configuration options
which indicates the arguments expected by the command line.

The launcher is then configured with the same configuration file as the other Melissa components: config_<scheduler>.json. This file contains a dictionary called launcher_config which is where the user sets the path to the results output directory, the scheduler type and the arguments passed to it (see Scheduler Support). Submission and execution commands will then be built based on the options passed in the configuration file.

Below is the list of most options supported by the launcher, but an exhaustive list with descriptions can be retrieved via melissa-launcher --print-options:

Option Default value  Possible values
output_dir melissa-YYYYMMDDTHHMMSS Any str
scheduler - "oar", "oar-hybrid",
"openmpi", "slurm",
"slurm-global",
"slurm-semiglobal",
"slurm-openmpi"
scheduler_arg [] List of str with scheduler options
scheduler_arg_server [] List of str with scheduler options
scheduler_arg_client [] List of str with scheduler options
scheduler_client_command srun or mpirun
depending on scheduler
Any str
scheduler_server_command srun or mpirun
depending on scheduler
Any str
scheduler_client_command_options [] List of str with command options
scheduler_server_command_options [] List of str with command options
scheduler_arg_container [] List of str with command options
(oar-hybrid)
container_client_size 1 Any positive int (oar-hybrid)
besteffort_allocation_frequency 1 Any positive int (oar-hybrid)
job_limit 1000 Any positive int
timer_delay _interval=5 sec
job_update_interval=30 sec
Any positive float
fault_tolerance true true, false
load_from_checkpoint false true, false
std_output true true, false
verbose 0 Any positive int
protocol "auto" "auto", "sctp", "tcp"
bind None Any IP formatted str
http_port 0 Any positive int
http_token Randomly generated Any str

Note

The output_dir is relative to the directory where the user launched melissa-launcher, and will be created automatically for the user if the directory does not exist. Inside that directory, Melissa will copy all the launch files and write the results. Meanwhile, the client_executable must be a full path (in case the user wishes to store the executable elsewhere on the system).

Note

When combined, the job_limit and timer_delay options can be used to perform an optimized chained allocations of job batches (e.g. with mpirun or srun) with a constrained number of resources. Otherwise, the job_limit aims at meeting the cluster's maximal number of concurrent submissions allowed, while timer_delay sets up _interval (i.e. the time interval triggering timeout transition) and job_update (i.e. the minimal delay between two job status updates)  with the same value.

Warning

Setting fault_tolerance to false will turn the fault-tolerance protocol off hence crashing the study as soon as one of its instances fails.

Warning

Setting std_output to false will prevent the launcher from generating standard outputs for the jobs hence significantly reducing the overall number of files created inside the output directory. Please note that indirect scheduler use the standard outputs to get the job-id and are hence incompatible with this option.

Modifying the launcher

In practice, applications and clusters come with specific constraints requiring additional or new features in the scheduling strategies (see Scheduler Support). New strategies can easily be implemented by following these simple guidelines:

  • Create a new <scheduling-name>.py script in the scheduler folder.
  • Inherit a new YourJob object from the Job class.
  • Inherit a new YourScheduler object from the Scheduler class.
  • Override the necessary functions (e.g. _submit_job_impl, _make_job_impl, _update_jobs_imp, _cancel_jobs_impl, etc.).
  • Add import and argument management code in the launcher __main__.py file.

Besides schedulers and scheduling strategies, other modification can sometimes be required. Examples of such recent modifications are listed below:

  • Supporting group submission i.e. submitting client jobs as groups of multiple simulations. This required to:
    • Add group_size to the state machine configuration attribute.
    • Adapt the _make_client_job_submission function.
    • Define a new GroupSize message.
  • Implementing a Fault-Tolerance switch i.e. adding an option not to automatically resubmit failed server jobs. This required to:
    • Add a new MELISSA_FAULT_TOLERANCE environment variable passed to the server at submission.
    • Force State.stop to become True in case of failure, time out or connection shutdown.

Note

The launcher is supposed to be flexible enough to be adaptable to any situation with minimal changes to its upper layers (namely to the state machine and the I/O master). However infringements of these rules have already occurred. In this case, the launcher must remain as consistent as possible with all its features and pass all unit tests.

FAQ

What are the differences between unique IDs (UID), job IDs, and client IDs?

The job ID is an identifier for a job that is either generated by the batch scheduler or it refers to the process ID for local job execution (e.g., when using the OpenMPI scheduler). In the case of local execution the job ID may not be unique because once all process IDs have been used, the operating system kernel begins to assign process IDs starting from zero again. To avoid problems with duplicate job IDs, the unique IDs were introduced. (NB: there is always at most one process at a time that has a given process ID; it is called "duplicate" because batch schedulers never assign the same job ID twice.)

Unique IDs serve two purposes:

  • they can be used to unambiguously identify jobs,
  • they are used for the generation of log file names.

Before a successful job submission with a batch scheduler and before a job ID is available, it is already be necessary to store the batch scheduler output. At this point unique IDs are used and for every batch scheduler invocation, the unique IDs is increment by one. This allows one to track execution by looking at the files created by the launcher in the output directory. For example, with Slurm, the first server job submission script is found in sbatch.0.sh, the sbatch output of sbatch sbatch.0.sh can be found in slurm.0.out and slurm.0.err, respectively, and the actual job output is is in job.0.melissa-server.out, job.0.melissa-server.err.

The client ID is a unique identifier generated by the server when submitting jobs. Job cancellation messages and job update messages always use the client ID. The unique IDs cannot be re-used because the server does not know their values. For example, the launcher is regularly executing job status updates which influence the computed unique IDs; the server is completely unaware of these tasks.

How to "start" the state machine? How to move it out of its initial state?

The only sensible event in the initial state is a time-out (melissa.launcher.event.Timeout). To trigger this time-out:

  • do a state transition with a Timeout event when using the state machine directly,
  • put a Timeout instance into the event queue of the I/O master, or
  • write a single byte into the timer file descriptor.

Signals (event.Signal) are either ignored or make the state machine stop.

What is a connection ID?

A connection ID is a unique identifier used inside the state machine to tell apart different network connections. Concretely, the connection ID associated with a socket is its file descriptor (which is a non-negative integer). The connection ID is unique.

Why is event.ConnectionShutdown not called event.Disconnect?

When given a socket as argument the function close(2) terminates a network connection and closes the file descriptor, i.e., all resources associated with the file descriptor are freed and the file descriptor must not be used anymore.

The function shutdown(2) allows one to signal that no more message will be sent over a network connection but the file descriptor can still be used (e.g., for receiving data).

In both cases, a call to read(2) on the peer will return zero but sending data may still be possible unless close(2) was called. To highlight this fact, the identifier ConnectionShutdown was chosen.