Skip to content

mpi

execution hpc mpi · maintainer machinable-org · upstream

Launch runs as MPI jobs.

Install

Declare the remote in your project's provider; the module is fetched on first use:

python
# interface/project.py — your project's provider
from machinable import Project


class MyProject(Project):
    def on_resolve_remotes(self):
        return {
            "mpi": "url+https://raw.githubusercontent.com/machinable-org/machinable/5c73557505915231582d680ae26c6404b747e812/integrations/mpi/mpi.py",
        }

The module is fetched once and cached in interface/remotes/; machinable fetch mpi downloads it without importing so you can inspect the code first. See Resolving remotes for how pulling, pinning, and updating work.

Requires an MPI launcher (mpirun or compatible) on the host.

Usage

python
from machinable import get

with get("mpi", {"ranks": 8}):
    ...  # your MPI-ready component

Configuration

OptionTypeDefault
preamblestr | None''
mpistr | None'mpirun'
pythonstr | NoneNone
resume_failedbool | Literal['new', 'skip']False
dryboolFalse

Source

integrations/mpi/mpi.py

mpi.py
py
import os
import sys
from typing import Literal

from pydantic import BaseModel, ConfigDict

from machinable import Execution
from machinable.errors import ExecutionFailed
from machinable.utils import chmodx, run_and_stream


class MPI(Execution):
    class Config(BaseModel):
        model_config = ConfigDict(extra="forbid")

        preamble: str | None = ""
        mpi: str | None = "mpirun"
        python: str | None = None
        resume_failed: bool | Literal["new", "skip"] = False
        dry: bool = False

    def on_compute_default_resources(self, executable):
        resources = {}

        ranks = executable.config.get("ranks", False)
        if ranks not in [None, False]:
            if ranks == -1:
                ranks = os.environ.get("MPI_RANKS", 0)
            if int(ranks) > 0:
                resources["-n"] = int(ranks)

        return resources

    def __call__(self):
        # materialize the container record so submission scripts have a
        # durable home (dispatch only materializes the interfaces)
        if not self.is_materialized():
            super(Execution, self).materialize()

        all_script = "#!/usr/bin/env bash\n"
        for executable in self.pending_executables:
            if self.config.resume_failed is not True:
                if (
                    executable.executions.filter(lambda x: x.is_incomplete()).count()
                    > 0
                ):
                    if self.config.resume_failed == "new":
                        executable = executable.new().materialize()
                    elif self.config.resume_failed == "skip":
                        continue
                    else:
                        msg = (
                            f"{executable.module} <{executable.id})>"
                            " has previously been executed unsuccessfully."
                            " Set `resume_failed` to True, 'new' or 'skip'"
                            " to handle resubmission."
                        )
                        if self.config.dry:
                            print("Dry run ... ", msg)
                            continue

                        raise ExecutionFailed(msg)

            resources = self.computed_resources(executable)
            mpi = executable.config.get("mpi", self.config.mpi)
            python = self.config.python or sys.executable

            script = "#!/usr/bin/env bash\n"

            if self.config.preamble:
                script += self.config.preamble

            # add debug information
            script += "\n"
            script += f"# {executable.module} <{executable.id}>\n"
            script += f"# {executable.local_directory()}\n"
            script += "\n"

            script += self.dispatch_code(executable, python=python)

            script_file = chmodx(
                self.save_file(
                    [executable.id, "mpi.sh"],
                    script,
                )
            )

            if mpi is None:
                cmd = []
            else:
                cmd = [mpi]
            for k, v in resources.items():
                if v is None or v is True:
                    cmd.append(k)
                else:
                    if k.startswith("--"):
                        cmd.append(f"{k}={v}")
                    else:
                        cmd.extend([k, str(v)])

            cmd.append(script_file)

            self.save_file(
                [executable.id, "mpi.json"],
                data={
                    "cmd": cmd,
                    "script": script,
                },
            )

            all_script += f"# {executable}\n"
            all_script += " ".join(cmd) + "\n\n"

            if self.config.dry:
                continue

            print(" ".join(cmd))

            with open(
                self.local_directory(executable.id, "output.log"),
                "w",
                buffering=1,
            ) as f:
                try:
                    run_and_stream(
                        cmd,
                        stdout_handler=lambda o: [
                            sys.stdout.write(o),
                            f.write(o),
                        ],
                        stderr_handler=lambda o: [
                            sys.stderr.write(o),
                            f.write(o),
                        ],
                    )
                except KeyboardInterrupt as _ex:
                    raise KeyboardInterrupt(
                        "Interrupting `" + " ".join(cmd) + "`"
                    ) from _ex

        sp = chmodx(self.save_file("mpi.sh", all_script))

        if self.config.dry:
            print(f"# Dry run ... \n# ==============\n{sp}\n\n")
            print(all_script)

MIT Licensed