Melissa Launcher¶
Launcher objectives¶
With Melissa, the launcher is in charge of interacting with the batch scheduler to submit, cancel and monitor jobs. In this context, it first submits the server job and waits for him to connect to its dedicated socket. Then, upon reception of client job requests from the server, the launcher will submit these to the batch scheduler.
As soon as the first job has been submitted, the launcher will monitor its status as well as that of the jobs submitted afterwards. For every client job, the launcher will send its status to the server so that in case of failure, it can ask for its resubmission.
In addition to the server status inferred from the information shared by the scheduler, the launcher is expecting to regularly receive life signals. To this end, the launcher sends the server every T
seconds a ping and considers the server dead if it did not receive a message (any message) within the last 2*T
seconds. To convince the launcher that the server is alive, it suffices to respond to a ping with a ping. Otherwise, a time out will be detected and a new server job will be submitted.
Warning
The launcher is aggressively restarting the server. This means in the case of a server crash (i.e., the batch scheduler marks the job as failed or the server times out and so on), the launcher will cancel all jobs and submit immediately a new server job without waiting for successful job cancellation. It could happen that two servers are running at the same time and this would cause unexpected launcher behavior if the old server job would try to reconnect.
Launcher design¶
Design Goals¶
The goal of the launcher is to simplify job management independent of the batch scheduler at hand. Challenges are the following:
- the launcher must be responsive to user input at all times, most importantly
SIGINT
(i.e., the user pressed Ctrl+C), - the launcher must separate code specific to a certain batch scheduler from the job management logic,
- batch scheduler calls may block,
- there is need to execute certain actions regularly (e.g., pinging processes).
Experience from Melissa SA and DA has shown that:
- multi-threading is error prone, hard to maintain, and impossible to test reliably because of the lack of control over the thread scheduler,
- busy loop interspersed with random sleep statements may miss events or process them in the wrong order.
For these reasons, the launcher design is based on the following principles:
- the launcher waits for events,
- the code interacting with the environment (operating system, batch scheduler, timers, and so on) is separate from the actual job management logic.
This lead to the design containing the following elements:
The job management is implemented by a state machine consuming events, performing state transitions, and returning actions. Examples for events are:
- signal receptions,
- time-outs,
- completed batch scheduler calls,
- new network connections, and
- message receptions.
Examples for actions are:
- stopping the launcher,
- job submissions, job updates, and job cancellations,
- pinging the server.
High-Level Structure¶
Originally, there were supposed to be two major threads: an I/O master thread and the state machine. The I/O master would put events into a queue, the state machine would consume events from the queue and produce actions for a second queue, then the I/O master would consume actions from the second queue and execute them. Short-lived threads might be spawned to perform any kind of blocking operations, e.g., batch scheduler calls or I/O.
def io_master(events: Queue, actions: Queue):
while True:
ev = wait_for(actions, INPUT)
if isinstance(ev, List[Action]):
for ac in actions(ev):
ev := execute(ac)
events.put(ev)
else:
events.put(ev)
The major drawback of this design is that it requires two threads to interact. Moreover, the state machine can be assumed to transition near-instantaneous and for this reason, the I/O master thread began calling the state machine almost directly.
class Processor:
def execute(ev: Event) -> List[Action]:
# TODO
def io_master(events: Queue, actions: Queue, processor: Processor):
while not stop:
ev = wait_for(events, INPUT)
actions = processor.execute(ev)
for ac in actions(ev):
ev = execute(ac)
events.put(ev)
The Processor
class was introduced solely for testing purposes. With the input, the queues, and the processor implementation under the control of the caller, the I/O master can be easily and thoroughly tested. A Processor.execute()
implementation calling the real state machine is as simple as return state.transition(event)
. Note that there is still an event queue which is needed for events generated by actions. With the two-thread design the event queue would contain all events at some point; in the new design it only contains events created when executing actions.
As of October, 2022, this is still the current high-level launcher structure.
Waiting for Events¶
The launcher needs to wait for different events:
- connection attempts, disconnects, and messages from the server,
- signals,
- time-outs, and
- completion of actions (e.g., batch scheduler job updates).
In practice this is achieved by waiting for events on multiple file descriptors. For networking this can be accomplished with sockets, signals can be sent directly to a file descriptor with the Linux-specific function signalfd(2)
, and time-outs can be realized by passing a time-out to poll(2)
or select(2)
. With respect to the event queue, a trick is performed: the launcher cannot wait for Python's queue like for a file descriptor but the launcher code contains a custom queue implementation that writes one byte to a file descriptor whenever an element is added to the queue.
Keep in mind that signals are only received by the Python main thread.
The implementation is as follows:
- The different events are distinguished by checking which file descriptor was readable.
- The Python main thread installs a signal handler. Whenever a signal is received, the handler writes a byte containing the signal number to a file descriptor.
- A timer thread writes one byte to a file descriptor and sleeps before it writes another byte.
- Action completions are detected by putting an event into the customized event queue.
The code below demonstrates these ideas:
class CustomQueue:
def __init__(self, fd):
self._queue = queue.Queue()
self._fd = fd
def put(self, item):
self._queue.put(item)
os.write(self._fd, "\0")
def get(self):
return self._queue.get()
class Timer:
def __init__(self, fd):
self._fd = fd
def run(self):
while True:
ret = poll([self.fd_], POLLIN | POLLERR, timeout)
if ret is None:
os.write(self._fd, "\0")
else:
return
def main():
listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP)
listenfd.bind(("localhost", 0))
listenfd.listen()
signalfd_0, signalfd_1 = socketpair()
timerfd_0, timerfd_1 = socketpair()
eventfd_r, eventfd_w = pipe()
event_fifo = CustomQueue(eventfd_w)
timer = Timer(timerfd_1)
t_timer = threading.Thread(target=lambda: timer.run())
t_timer.run()
io_master(listenfd, signalfd_0, timerfd_0, eventfd_r, event_fifo)
def io_master(listenfd, signalfd, timerfd, eventfd, event_fifo):
processor = StateMachine()
while True:
fd = poll([listenfd, signalfd, timerfd, eventfd], POLLIN | POLLERR)
if fd == listenfd:
actions = processor.execute(events.NewConnection())
elif fd == signalfd:
actions = processor.execute(events.Signal(signo=os.read(fd)))
elif fd == timerfd:
actions = processor.execute(events.Timeout())
elif fd == eventfd:
actions = processor.execute(event_fifo.get())
else:
raise RuntimeError("BUG")
for ac in actions:
pass
Asynchronicity¶
Some actions cannot be executed instantly including most scheduler calls when a batch scheduler is used. The first consequence is that the I/O master thread cannot wait (synchronously) for the completion of launched processes since the launcher shall be responsive at all times. As a substitute, a minion thread waits for completion of a launched process and adds the pseudoevent melissa.launcher.ProcessCompletion_
(with trailing underscore) to the event queue; it is a pseudoevent because the event is supposed to be consumed by the I/O master and not the state machine (or processor). The second consequence is the need to terminate launched processes when the user demands it (e.g., after pressing Ctrl+C). (For this reason the Python concurrent.futures
module on its own does not solve the asynchronicity challenges (running futures cannot be cancelled)). All launched processes are added to a list maintained by the I/O master directly after being started and removed after exiting. This way the I/O master can always terminate running processes.
The code below illustrates the chosen approach:
def io_master(event_fifo: Queue, processor: Processor, scheduler: Scheduler):
processes = []
while True:
ev := event_fifo.get()
if isinstance(ev, events.ProcessCompletion_):
# ...
actions = processor.execute(...)
else:
actions = processor.execute(event_fifo.get())
for ac in actions:
if isinstance(ac, action.JobSubmission):
args, env = scheduler.submit_job(ac.job_details)
proc = subprocess.Popen(args, env)
processes.append(proc)
def wait():
proc.wait()
event_fifo.put(events.ProcessCompletion_(proc))
threading.Thread(target=wait).run()
elif isinstance(ac, action.SubprocessTermination):
for p in processes:
p.kill()
else:
pass
processes = [p for p in processes if p.poll() is None]
Configuring the launcher¶
The launcher help can be displayed with the following command:
It returns the following message:usage: melissa-launcher [-h] [--config_name CONFIG_NAME] [--print-options] [--version]
Melissa Launcher
options:
-h, --help show this help message and exit
--config_name CONFIG_NAME, -c CONFIG_NAME
User defined configuration file. Path can be relative or absolute.
--version, -v show the Melissa version
--print-options Show the available configuration options
The launcher is then configured with the same configuration file as the other Melissa components: config_<scheduler>.json
. This file contains a dictionary called launcher_config
which is where the user sets the path to the results output directory, the scheduler type and the arguments passed to it (see Scheduler Support). Submission and execution commands will then be built based on the options passed in the configuration file.
Below is the list of most options supported by the launcher, but an exhaustive list with descriptions can be retrieved via melissa-launcher --print-options
:
Option | Default value | Possible values |
---|---|---|
output_dir |
melissa-YYYYMMDDTHHMMSS |
Any str |
scheduler |
- | "oar" , "oar-hybrid" , "openmpi" , "slurm" , "slurm-global" , "slurm-semiglobal" , "slurm-openmpi" |
scheduler_arg |
[] |
List of str with scheduler options |
scheduler_arg_server |
[] |
List of str with scheduler options |
scheduler_arg_client |
[] |
List of str with scheduler options |
scheduler_client_command |
srun or mpirun depending on scheduler |
Any str |
scheduler_server_command |
srun or mpirun depending on scheduler |
Any str |
scheduler_client_command_options |
[] |
List of str with command options |
scheduler_server_command_options |
[] |
List of str with command options |
scheduler_arg_container |
[] |
List of str with command options ( oar-hybrid ) |
container_client_size |
1 |
Any positive int (oar-hybrid ) |
besteffort_allocation_frequency |
1 |
Any positive int (oar-hybrid ) |
job_limit |
1000 |
Any positive int |
timer_delay |
_interval=5 sec job_update_interval=30 sec |
Any positive float |
fault_tolerance |
true |
true , false |
load_from_checkpoint |
false |
true , false |
std_output |
true |
true , false |
verbose |
0 |
Any positive int |
protocol |
"auto" |
"auto" , "sctp" , "tcp" |
bind |
None |
Any IP formatted str |
http_port |
0 |
Any positive int |
http_token |
Randomly generated | Any str |
Note
The output_dir
is relative to the directory where the user launched melissa-launcher
, and will be created automatically for the user if the directory does not exist. Inside that directory, Melissa will copy all the launch files and write the results. Meanwhile, the client_executable
must be a full path (in case the user wishes to store the executable elsewhere on the system).
Note
When combined, the job_limit
and timer_delay
options can be used to perform an optimized chained allocations of job batches (e.g. with mpirun
or srun
) with a constrained number of resources. Otherwise, the job_limit
aims at meeting the cluster's maximal number of concurrent submissions allowed, while timer_delay
sets up _interval
(i.e. the time interval triggering timeout transition) and job_update
(i.e. the minimal delay between two job status updates) with the same value.
Warning
Setting fault_tolerance
to false
will turn the fault-tolerance protocol off hence crashing the study as soon as one of its instances fails.
Warning
Setting std_output
to false
will prevent the launcher from generating standard outputs for the jobs hence significantly reducing the overall number of files created inside the output directory. Please note that indirect scheduler use the standard outputs to get the job-id and are hence incompatible with this option.
Modifying the launcher¶
In practice, applications and clusters come with specific constraints requiring additional or new features in the scheduling strategies (see Scheduler Support). New strategies can easily be implemented by following these simple guidelines:
- Create a new
<scheduling-name>.py
script in thescheduler
folder. - Inherit a new
YourJob
object from theJob
class. - Inherit a new
YourScheduler
object from theScheduler
class. - Override the necessary functions (e.g.
_submit_job_impl
,_make_job_impl
,_update_jobs_imp
,_cancel_jobs_impl
, etc.). - Add import and argument management code in the launcher
__main__.py
file.
Besides schedulers and scheduling strategies, other modification can sometimes be required. Examples of such recent modifications are listed below:
- Supporting
group
submission i.e. submitting client jobs as groups of multiple simulations. This required to:- Add
group_size
to the state machine configuration attribute. - Adapt the
_make_client_job_submission
function. - Define a new
GroupSize
message.
- Add
- Implementing a Fault-Tolerance switch i.e. adding an option not to automatically resubmit failed server jobs. This required to:
- Add a new
MELISSA_FAULT_TOLERANCE
environment variable passed to the server at submission. - Force
State.stop
to becomeTrue
in case of failure, time out or connection shutdown.
- Add a new
Note
The launcher is supposed to be flexible enough to be adaptable to any situation with minimal changes to its upper layers (namely to the state machine and the I/O master). However infringements of these rules have already occurred. In this case, the launcher must remain as consistent as possible with all its features and pass all unit tests.
FAQ¶
What are the differences between unique IDs (UID), job IDs, and client IDs?¶
The job ID is an identifier for a job that is either generated by the batch scheduler or it refers to the process ID for local job execution (e.g., when using the OpenMPI
scheduler). In the case of local execution the job ID may not be unique because once all process IDs have been used, the operating system kernel begins to assign process IDs starting from zero again. To avoid problems with duplicate job IDs, the unique IDs were introduced. (NB: there is always at most one process at a time that has a given process ID; it is called "duplicate" because batch schedulers never assign the same job ID twice.)
Unique IDs serve two purposes:
- they can be used to unambiguously identify jobs,
- they are used for the generation of log file names.
Before a successful job submission with a batch scheduler and before a job ID is available, it is already be necessary to store the batch scheduler output. At this point unique IDs are used and for every batch scheduler invocation, the unique IDs is increment by one. This allows one to track execution by looking at the files created by the launcher in the output directory. For example, with Slurm, the first server job submission script is found in sbatch.0.sh
, the sbatch output of sbatch sbatch.0.sh
can be found in slurm.0.out
and slurm.0.err
, respectively, and the actual job output is is in job.0.melissa-server.out
, job.0.melissa-server.err
.
The client ID is a unique identifier generated by the server when submitting jobs. Job cancellation messages and job update messages always use the client ID. The unique IDs cannot be re-used because the server does not know their values. For example, the launcher is regularly executing job status updates which influence the computed unique IDs; the server is completely unaware of these tasks.
How to "start" the state machine? How to move it out of its initial state?¶
The only sensible event in the initial state is a time-out (melissa.launcher.event.Timeout
). To trigger this time-out:
- do a state transition with a
Timeout
event when using the state machine directly, - put a
Timeout
instance into the event queue of the I/O master, or - write a single byte into the timer file descriptor.
Signals (event.Signal
) are either ignored or make the state machine stop.
What is a connection ID?¶
A connection ID is a unique identifier used inside the state machine to tell apart different network connections. Concretely, the connection ID associated with a socket is its file descriptor (which is a non-negative integer). The connection ID is unique.
Why is event.ConnectionShutdown
not called event.Disconnect
?¶
When given a socket as argument the function close(2)
terminates a network connection and closes the file descriptor, i.e., all resources associated with the file descriptor are freed and the file descriptor must not be used anymore.
The function shutdown(2)
allows one to signal that no more message will be sent over a network connection but the file descriptor can still be used (e.g., for receiving data).
In both cases, a call to read(2)
on the peer will return zero but sending data may still be possible unless close(2)
was called. To highlight this fact, the identifier ConnectionShutdown
was chosen.