mpi
execution hpc mpi · maintainer machinable-org · upstream
Launch runs as MPI jobs.
Install
Declare the remote in your project's provider; the module is fetched on first use:
python
# interface/project.py — your project's provider
from machinable import Project
class MyProject(Project):
def on_resolve_remotes(self):
return {
"mpi": "url+https://raw.githubusercontent.com/machinable-org/machinable/5c73557505915231582d680ae26c6404b747e812/integrations/mpi/mpi.py",
}The module is fetched once and cached in interface/remotes/; machinable fetch mpi downloads it without importing so you can inspect the code first. See Resolving remotes for how pulling, pinning, and updating work.
Requires an MPI launcher (mpirun or compatible) on the host.
Usage
python
from machinable import get
with get("mpi", {"ranks": 8}):
... # your MPI-ready componentConfiguration
| Option | Type | Default |
|---|---|---|
preamble | str | None | '' |
mpi | str | None | 'mpirun' |
python | str | None | None |
resume_failed | bool | Literal['new', 'skip'] | False |
dry | bool | False |
Source
mpi.py
py
import os
import sys
from typing import Literal
from pydantic import BaseModel, ConfigDict
from machinable import Execution
from machinable.errors import ExecutionFailed
from machinable.utils import chmodx, run_and_stream
class MPI(Execution):
class Config(BaseModel):
model_config = ConfigDict(extra="forbid")
preamble: str | None = ""
mpi: str | None = "mpirun"
python: str | None = None
resume_failed: bool | Literal["new", "skip"] = False
dry: bool = False
def on_compute_default_resources(self, executable):
resources = {}
ranks = executable.config.get("ranks", False)
if ranks not in [None, False]:
if ranks == -1:
ranks = os.environ.get("MPI_RANKS", 0)
if int(ranks) > 0:
resources["-n"] = int(ranks)
return resources
def __call__(self):
# materialize the container record so submission scripts have a
# durable home (dispatch only materializes the interfaces)
if not self.is_materialized():
super(Execution, self).materialize()
all_script = "#!/usr/bin/env bash\n"
for executable in self.pending_executables:
if self.config.resume_failed is not True:
if (
executable.executions.filter(lambda x: x.is_incomplete()).count()
> 0
):
if self.config.resume_failed == "new":
executable = executable.new().materialize()
elif self.config.resume_failed == "skip":
continue
else:
msg = (
f"{executable.module} <{executable.id})>"
" has previously been executed unsuccessfully."
" Set `resume_failed` to True, 'new' or 'skip'"
" to handle resubmission."
)
if self.config.dry:
print("Dry run ... ", msg)
continue
raise ExecutionFailed(msg)
resources = self.computed_resources(executable)
mpi = executable.config.get("mpi", self.config.mpi)
python = self.config.python or sys.executable
script = "#!/usr/bin/env bash\n"
if self.config.preamble:
script += self.config.preamble
# add debug information
script += "\n"
script += f"# {executable.module} <{executable.id}>\n"
script += f"# {executable.local_directory()}\n"
script += "\n"
script += self.dispatch_code(executable, python=python)
script_file = chmodx(
self.save_file(
[executable.id, "mpi.sh"],
script,
)
)
if mpi is None:
cmd = []
else:
cmd = [mpi]
for k, v in resources.items():
if v is None or v is True:
cmd.append(k)
else:
if k.startswith("--"):
cmd.append(f"{k}={v}")
else:
cmd.extend([k, str(v)])
cmd.append(script_file)
self.save_file(
[executable.id, "mpi.json"],
data={
"cmd": cmd,
"script": script,
},
)
all_script += f"# {executable}\n"
all_script += " ".join(cmd) + "\n\n"
if self.config.dry:
continue
print(" ".join(cmd))
with open(
self.local_directory(executable.id, "output.log"),
"w",
buffering=1,
) as f:
try:
run_and_stream(
cmd,
stdout_handler=lambda o: [
sys.stdout.write(o),
f.write(o),
],
stderr_handler=lambda o: [
sys.stderr.write(o),
f.write(o),
],
)
except KeyboardInterrupt as _ex:
raise KeyboardInterrupt(
"Interrupting `" + " ".join(cmd) + "`"
) from _ex
sp = chmodx(self.save_file("mpi.sh", all_script))
if self.config.dry:
print(f"# Dry run ... \n# ==============\n{sp}\n\n")
print(all_script)