Skip to content
On this page

Slurm and MPI execution

Integration for Slurm and/or MPI.

MPI

py
import os
import stat
import subprocess

from machinable import Execution


class Mpi(Execution):
    class Config:
        runner: str = "mpirun"
        n: int = 1

    def __call__(self):
        for executable in self.pending_executables:
            script_file = self.save_file(
                [executable, "mpi.sh"],
                executable.dispatch_code(),
            )
            st = os.stat(script_file)
            os.chmod(script_file, st.st_mode | stat.S_IEXEC)
            print(
                subprocess.check_output(
                    [
                        self.config.runner,
                        "-n",
                        str(self.config.n),
                        script_file,
                    ]
                ).decode("ascii")
            )

Slurm

py
import subprocess

from machinable import Execution
from machinable.errors import ExecutionFailed


class Slurm(Execution):
    def __call__(self):
        script = "#!/usr/bin/env bash\n"
        for executable in self.pending_executables:
            resources = executable.resources()
            if "--job-name" not in resources:
                resources["--job-name"] = f"{executable.id}"
            if "--output" not in resources:
                resources["--output"] = self.local_directory(
                    executable.id, "output.log"
                )
            if "--open-mode" not in resources:
                resources["--open-mode"] = "append"

            sbatch_arguments = []
            for k, v in resources.items():
                if not k.startswith("--"):
                    continue
                line = "#SBATCH " + k
                if v not in [None, True]:
                    line += f"={v}"
                sbatch_arguments.append(line)
            script += "\n".join(sbatch_arguments) + "\n"

            script += executable.dispatch_code()

            # submit to slurm
            process = subprocess.Popen(
                ["sbatch"],
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                stdin=subprocess.PIPE,
            )
            process.stdin.write(script.encode("utf8"))
            stdoutput, _ = process.communicate()
            returncode = process.returncode
            if returncode != 0:
                raise ExecutionFailed(
                    self.__repr__(),
                    returncode,
                    stdoutput.decode("utf8").strip(),
                )

            output = stdoutput.decode("utf8").strip()

            try:
                job_id = int(output.rsplit(" ", maxsplit=1)[-1])
            except ValueError:
                job_id = False
            print(
                f"{output} for component {executable.id} ({executable.local_directory()})"
            )

            # save job information
            self.save_file(
                [executable, "slurm.json"],
                {
                    "job_id": job_id,
                    "cmd": sbatch_arguments,
                    "script": script,
                },
            )

    def canonicalize_resources(self, resources):
        if resources is None:
            return {}

        shorthands = {
            "A": "account",
            "B": "extra-node-info",
            "C": "constraint",
            "c": "cpus-per-task",
            "d": "dependency",
            "D": "workdir",
            "e": "error",
            "F": "nodefile",
            "H": "hold",
            "h": "help",
            "I": "immediate",
            "i": "input",
            "J": "job-name",
            "k": "no-kill",
            "L": "licenses",
            "M": "clusters",
            "m": "distribution",
            "N": "nodes",
            "n": "ntasks",
            "O": "overcommit",
            "o": "output",
            "p": "partition",
            "Q": "quiet",
            "s": "share",
            "t": "time",
            "u": "usage",
            "V": "version",
            "v": "verbose",
            "w": "nodelist",
            "x": "exclude",
            "g": "geometry",
            "R": "no-rotate",
        }

        canonicalized = {}
        for k, v in resources.items():
            prefix = ""
            if k.startswith("#"):
                prefix = "#"
                k = k[1:]

            if k.startswith("--"):
                # already correct
                canonicalized[prefix + k] = str(v)
                continue
            if k.startswith("-"):
                # -p => --partition
                try:
                    if len(k) != 2:
                        raise KeyError("Invalid length")
                    canonicalized[prefix + "--" + shorthands[k[1]]] = str(v)
                    continue
                except KeyError as _ex:
                    raise ValueError(f"Invalid short option: {k}") from _ex
            if len(k) == 1:
                # p => --partition
                try:
                    canonicalized[prefix + "--" + shorthands[k]] = str(v)
                    continue
                except KeyError as _ex:
                    raise ValueError(f"Invalid short option: -{k}") from _ex
            else:
                # option => --option
                canonicalized[prefix + "--" + k] = str(v)

        return canonicalized

MIT Licensed