Coverage for melissa/scheduler/slurm_openmpi.py: 20%
54 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
1#!/usr/bin/python3
3# Copyright (c) 2020-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31import logging
32import os
33from typing import List, Tuple
34from pathlib import Path
36from melissa.utility.process import ArgumentList, Environment
38from .scheduler import Options
39from .slurm import SlurmScheduler
41logger = logging.getLogger(__name__)
44class OpenmpiSlurmScheduler(SlurmScheduler):
46 def _sanity_check_impl(self, options: Options) -> List[str]:
47 args = options.raw_arguments
48 es = []
50 for a in args:
51 e = None
52 if "do-not-launch" in a:
53 e = f"remove `{a}` argument"
54 elif a in ["-N", "-c", "-n", "--n", "-np"]:
55 e = f"remove `{a}` argument"
57 if e is not None:
58 es.append(e)
60 return es
62 def _submit_job_impl(
63 self, commands: List[ArgumentList],
64 env: Environment, options: Options,
65 name: str,
66 uid: int
67 ) -> Tuple[ArgumentList, Environment]:
69 # job submission with mpirun MPMD syntax
70 # by mixing slurm and openmpi submit functions
71 sbatch_env = os.environ.copy()
72 sbatch_env.update(env)
74 output_filename = f"./stdout/job.{uid}.{name}.out"
75 error_filename = f"./stdout/job.{uid}.{name}.err"
77 propagated_options = [
78 f"--output={output_filename}",
79 f"--error={error_filename}"
80 ]
82 # get #SBATCH options from scheduler_arg options:
83 # -> sbatch options (e.g. -N or --nodes) correspond to JOB values
84 job_options: List[str] = []
85 env_args: List[str] = []
86 job_options = options.raw_arguments
88 sbatch_options = propagated_options + job_options
90 # serialize sbatch options
91 def options2str(options: str) -> str:
92 return "#SBATCH " + options
94 sbatch_options_str = [options2str(o) for o in sbatch_options]
96 # assemble mpirun arguments
97 ompi_env = os.environ.copy()
98 for key in sorted(env.keys()):
99 ompi_env[key] = env[key]
100 env_args += ["-x", key]
102 # get mpirun options from scheduler_command_options:
103 # -> mpirun options (e.g. -n NTASKS, --oversubscribe, -x MELISSA_COUPLING=1)
104 # where per-tasks options are SIMULATION values
105 ompi_options = options.sched_cmd_opt
107 ompi_commands: List[str] = []
108 for i, cmd in enumerate(commands):
109 ompi_cmd = (
110 ompi_options
111 + env_args
112 + ["--"]
113 + cmd
114 + ([":"] if i + 1 < len(commands) else [])
115 )
117 ompi_commands = ompi_commands + ompi_cmd
119 sched_cmd = options.sched_cmd
120 if not sched_cmd:
121 raise ValueError("the executable command cannot be empty with this scheduler")
123 # write srun calls to file
124 Path('./sbatch').mkdir(parents=True, exist_ok=True)
125 sbatch_script_filename = f"./sbatch/sbatch.{uid}.sh"
126 sbatch_script = ["#!/bin/sh"] \
127 + [f"# sbatch script for job {name}"] \
128 + sbatch_options_str \
129 + [""] \
130 + [f"exec {sched_cmd} \\"] \
131 + [" ".join(ompi_commands)]
133 sbatch_script_str_noeol = "\n".join(sbatch_script)
135 with open(sbatch_script_filename, "w") as f:
136 print(sbatch_script_str_noeol, file=f)
138 sbatch_call = (
139 ["sbatch"]
140 + ["--parsable"]
141 + ["--job-name={:s}".format(name)]
142 + [sbatch_script_filename]
143 )
145 return sbatch_call, sbatch_env