dmosopt component
Integration for dmosopt.
Usage example
python
from machinable import get
with get("mpi", {"ranks": 8}):
get("dmosopt", {'dopt_params': ...}).launch()Source
py
import copy
import datetime
import inspect
import os
import sys
from collections.abc import Callable
from numbers import Number
from typing import (
Any,
Literal,
Optional,
Union,
)
import distwq
import h5py
import numpy as np
import pandas as pd
from dmosopt import config, dmosopt, indicators
from dmosopt.MOASMO import epsilon_get_best, get_best
from mpi4py import MPI
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, field_validator
from machinable import Component
from machinable.config import match_method, to_dict
from machinable.interface import cachable
sys_excepthook = sys.excepthook
def mpi_excepthook(type, value, traceback):
sys_excepthook(type, value, traceback)
sys.stdout.flush()
sys.stderr.flush()
if MPI.COMM_WORLD.size > 1:
MPI.COMM_WORLD.Abort(1)
sys.excepthook = mpi_excepthook
class Dmosopt(Component):
class Config(BaseModel):
model_config = ConfigDict(extra="forbid")
dopt_params: dict = Field("???")
time_limit: int | None = None
feasible: bool = True
return_features: bool = False
return_constraints: bool = False
spawn_workers: bool = False
sequential_spawn: bool = False
spawn_startup_wait: int | None = None
spawn_executable: str | None = None
spawn_args: list[str] = []
nprocs_per_worker: int = 1
collective_mode: Literal["gather", "sendrecv"] = "gather"
verbose: bool = True
worker_debug: bool = False
nodes: str | None = None
ranks: int | None = None
@field_validator("dopt_params", mode="before")
@classmethod
def valid_optimization_settings(cls, params: dict) -> dict:
_t = {
"opt_id": str,
"obj_fun_name": str | None,
"obj_fun_init_name": str | None,
"obj_fun_init_args": dict,
"controller_init_fun_name": str | None,
"controller_init_fun_args": dict,
"reduce_fun_name": str | None,
"reduce_fun_args": Union[list, tuple],
"broker_fun_name": str | None,
"broker_module_name": str | None,
# DistOptimizer
"objective_names": Union[str, list[str]],
"feature_dtypes": str,
"constraint_names": Union[str, list[str]],
"n_initial": int,
"initial_maxiter": int,
"initial_method": Union[
Callable,
Literal["glp", "slh", "lh", "mc", "sobol"],
dict[str, Any],
str,
],
"dynamic_initial_sampling": str | None,
"dynamic_initial_sampling_kwargs": dict | None,
"verbose": bool,
"problem_ids": set | None,
"problem_parameters": dict | None,
"space": Optional[dict[str, tuple[int | float, int | float]]],
"population_size": int,
"num_generations": int,
"resample_fraction": float,
"distance_metric": Union[
Callable, Literal["crowding", "euclidean"]
],
"n_epochs": int,
"save_eval": bool,
"file_path": str | None,
"save": bool,
"save_surrogate_evals": bool,
"save_optimizer_params": bool,
"metadata": Any,
"surrogate_method_name": Union[
str,
Literal[
"gpr",
"egp",
"megp",
"mdgp",
"mdspp",
"vgp",
"svgp",
"spv",
"siv",
"crv",
],
None,
],
"surrogate_method_kwargs": dict,
"surrogate_custom_training": str | None,
"surrogate_custom_training_kwargs": dict | None,
"optimizer_name": Union[
Literal["nsga2", "age", "smpso", "cmaes"], str
],
"optimizer_kwargs": Union[dict, list[dict]],
"sensitivity_method_name": Literal["dgsm", "fast"],
"sensitivity_method_kwargs": dict,
"local_random": Any,
"random_seed": int | None,
"feasibility_method_name": str | None,
"feasibility_method_kwargs": dict,
"termination_conditions": Union[bool, dict, None],
#
"di_crossover": Any, #
"di_mutation": Any, #
}
payload = copy.deepcopy(to_dict(params))
for k, v in payload.items():
if k == "feature_names":
raise ValueError(
"Use feature_dtypes instead of feature_names"
)
if k not in _t:
raise ValueError(f"Invalid option: {k}")
if isinstance(v, str) and match_method(v):
# config method allowed
continue
try:
TypeAdapter(_t[k]).validate_python(v)
except Exception as _ex:
print(v)
raise ValueError(
f"Invalid type for '{k}'; expected {_t[k]} but got:"
) from _ex
# additional rules
if (payload.get("random_seed", None) is not None) and (
payload.get("local_random", None) is not None
):
raise ValueError(
"Both random_seed and local_random are specified! Only one or the other must be specified."
)
# validate imports eagerly
for path, alias, kw in [
("obj_fun_name", {}, None),
("obj_fun_init_name", {}, "obj_fun_init_args"),
("controller_init_fun_name", {}, "controller_init_fun_args"),
("reduce_fun_name", {}, None),
("broker_fun_name", {}, None),
("initial_method", config.default_sampling_methods, None),
(
"dynamic_initial_sampling",
{},
"dynamic_initial_sampling_kwargs",
),
(
"surrogate_method_name",
config.default_surrogate_methods,
"surrogate_method_kwargs",
),
(
"feasibility_method_name",
config.default_feasibility_methods,
"feasibility_method_kwargs",
),
(
"surrogate_custom_training",
{},
"surrogate_custom_training_kwargs",
),
("optimizer_name", config.default_optimizers, None),
(
"sensitivity_method_name",
config.default_sa_methods,
"sensitivity_method_kwargs",
),
("feature_dtypes", {}, None),
("objective_names", {}, None),
("constraint_names", {}, None),
("metadata", {}, None),
]:
if isinstance(target := payload.get(path, None), str):
if target in alias:
target = alias[target]
try:
obj = config.import_object_by_path(target)
except ImportError as _ex:
raise ValueError(
f"Could not resolve import path '{target}' for '{path}': {_ex}"
) from _ex
if (d := payload.get(kw, None)) is not None:
# verify arguments
sig = inspect.signature(obj)
if any(
[
True
for p in sig.parameters.values()
if p.kind == p.VAR_KEYWORD
]
):
# if function accepts keyword arguments, we cannot validate :-(
continue
for key in d.keys():
if key not in sig.parameters:
message = ""
for name, param in sig.parameters.items():
if param.default is param.empty:
message += f"{name}, "
else:
message += f"{name}={param.default}, "
raise ValueError(
f"Invalid {kw} for {target}. Found `{key}`, but signature is {message[:-2]}"
)
# rewrite the default surrogate_method_name to None
# if surrogate_custom_training is specified, we can't
# know from the configuration if the surrogate method
# is used or not. We thus assume by convention that
# it is used iff surrogate_method_name is overriden
if "surrogate_method_name" not in payload:
payload["surrogate_method_name"] = None
return payload
def on_before_commit(self):
# pre-flight callable check
params = to_dict(self.config.dopt_params)
for f in [
"feature_dtypes",
"objective_names",
"constraint_names",
"metadata",
]:
if f in params and isinstance(params[f], str):
fi = config.import_object_by_path(params[f])
if callable(fi):
fi = fi(self)
def __call__(self) -> None:
if MPI.COMM_WORLD.Get_rank() == 0:
params = to_dict(self.config.dopt_params)
if "file_path" not in params:
params["file_path"] = self.output_filepath
if "local_random" not in params and "random_seed" not in params:
params["random_seed"] = self.seed
for f in [
"feature_dtypes",
"objective_names",
"constraint_names",
"metadata",
]:
# users may specify these fields in terms of importable objects
# to avoid repetition or use custom types
if f in params and isinstance(params[f], str):
fi = config.import_object_by_path(params[f])
if callable(fi):
fi = fi(self)
params[f] = fi
params = MPI.COMM_WORLD.bcast(params, root=0)
else:
params = MPI.COMM_WORLD.bcast(None, root=0)
run = dmosopt.run(
dopt_params=params,
time_limit=self.config.time_limit,
feasible=self.config.feasible,
return_features=self.config.return_features,
return_constraints=self.config.return_constraints,
spawn_workers=self.config.spawn_workers,
sequential_spawn=self.config.sequential_spawn,
spawn_startup_wait=self.config.spawn_startup_wait,
spawn_executable=self.config.spawn_executable,
spawn_args=self.config.spawn_args,
nprocs_per_worker=self.config.nprocs_per_worker,
collective_mode=self.config.collective_mode,
verbose=self.config.verbose,
worker_debug=self.config.worker_debug,
)
if MPI.COMM_WORLD.Get_rank() != getattr(distwq, "controller_rank", 0):
return
try:
self.save_file("run.p", run)
except:
pass
def parameter_vector_to_dict(self, x, include_constants=True):
constants = {}
if include_constants:
constants = self.config.dopt_params.problem_parameters
return {
**constants,
**{
k: x[n]
for n, k in enumerate(self.config.dopt_params.space.keys())
},
}
def evaluate_objective_at(self, x):
import logging
p = x
if not isinstance(p, dict):
p = self.parameter_vector_to_dict(x)
logging.basicConfig(level=logging.INFO)
if "obj_fun_init_name" in self.config.dopt_params:
obj_fun = config.import_object_by_path(
self.config.dopt_params.obj_fun_init_name
)(**self.config.dopt_params.obj_fun_init_args)
else:
obj_fun = config.import_object_by_path(
self.config.dopt_params.obj_fun_name
)
return obj_fun(p)
def evaluate_objective_at_many(self, samples, processes=None):
if processes is False:
return [self.evaluate_objective_at(x) for x in samples]
import multiprocessing
if processes is None:
processes = multiprocessing.cpu_count() - 1
with multiprocessing.Pool(processes=processes) as pool:
return pool.map(self.evaluate_objective_at, samples)
def bounds_normalize(self, x):
q = np.zeros_like(x)
for i in range(x.shape[-1]):
q[:, i] = (x[:, i] - self.xlb[i]) / (self.xub[i] - self.xlb[i])
return q
@property
def output_filepath(self) -> str:
return os.path.abspath(self.local_directory("dmosopt.h5"))
def on_write_meta_data(self):
return MPI.COMM_WORLD.Get_rank() == getattr(
distwq, "controller_rank", 0
)
def on_commit(self):
if MPI.COMM_WORLD.Get_rank() != getattr(distwq, "controller_rank", 0):
return False
@cachable(file=False)
def load_h5(
self,
filepath: str | None = None,
opt_id: str | None = None,
problem_id: int = 0,
):
if filepath is None:
filepath = self.output_filepath
if opt_id is None:
opt_id = self.config.dopt_params.opt_id
with h5py.File(filepath, "r") as h5:
# constraints
if f"{opt_id}/constraint_enum" in h5:
constraint_enum = h5py.check_enum_dtype(
h5[f"{opt_id}/constraint_enum"].dtype
)
constraint_enum_T = {v: k for k, v in constraint_enum.items()}
constraint_names = [
constraint_enum_T[s[0]]
for s in iter(h5[f"{opt_id}/constraint_spec"])
]
constraints = pd.DataFrame(
h5[f"{opt_id}/{problem_id}/constraints"][:],
columns=constraint_names,
)
if self.constraint_names:
constraints = constraints[
self.constraint_names
] # sort for consistency
else:
constraints = None
# epochs
epochs = h5[f"{opt_id}/{problem_id}/epochs"][:]
# features
if f"{opt_id}/feature_enum" in h5:
feature_enum = h5py.check_enum_dtype(
h5[f"{opt_id}/feature_enum"].dtype
)
feature_enum_T = {v: k for k, v in feature_enum.items()}
feature_names = [
feature_enum_T[s[0]]
for s in iter(h5[f"{opt_id}/feature_spec"])
]
features = pd.DataFrame(
[
list(feature)
for feature in h5[f"{opt_id}/{problem_id}/features"]
],
columns=feature_names,
)
if self.feature_names:
features = features[
self.feature_names
] # sort for consistency
else:
features = None
# objectives
objective_enum = h5py.check_enum_dtype(
h5[f"{opt_id}/objective_enum"].dtype
)
objective_enum_T = {v: k for k, v in objective_enum.items()}
objective_names = [
objective_enum_T[s[0]]
for s in iter(h5[f"{opt_id}/objective_spec"])
]
objectives = pd.DataFrame(
h5[f"{opt_id}/{problem_id}/objectives"][:],
columns=objective_names,
)
if self.objective_names:
objectives = objectives[
self.objective_names
] # sort for consistency
# parameters
parameter_enum = h5py.check_enum_dtype(
h5[f"{opt_id}/parameter_enum"].dtype
)
parameter_enum_T = {v: k for k, v in parameter_enum.items()}
parameter_names = [
parameter_enum_T[s[0]]
for s in iter(h5[f"{opt_id}/parameter_spec"])
]
parameters = pd.DataFrame(
h5[f"{opt_id}/{problem_id}/parameters"][:],
columns=parameter_names,
)
# order such that it stays consistent with the space definition
parameters = parameters[list(self.config.dopt_params.space.keys())]
# predictions
predictions = pd.DataFrame(
h5[f"{opt_id}/{problem_id}/predictions"][:],
columns=objective_names,
)
if self.objective_names:
predictions = predictions[
self.objective_names
] # sort for consistency
# metadata
metadata = None
if f"/{opt_id}/metadata" in h5:
metadata = h5[f"/{opt_id}/metadata"][:]
return {
"constraints": constraints,
"epochs": epochs,
"features": features,
"objectives": objectives,
"parameters": parameters,
"predictions": predictions,
"metadata": metadata,
}
def load_h5_arrays(
self,
include="xyc",
region: list | tuple | None = None,
filepath: str | None = None,
opt_id: str | None = None,
problem_id: int = 0,
):
mask = slice(None)
if isinstance(region, (list, tuple)):
mask = slice(*region)
elif isinstance(region, slice):
mask = region
data = self.load_h5(filepath, opt_id, problem_id)
result = []
for i in include:
if i.lower() == "x":
q = data["parameters"].to_numpy()[mask]
if i == "X":
q = self.bounds_normalize(q)
elif i == "y":
q = data["objectives"].to_numpy()[mask]
elif i == "f":
q = None
if data["features"] is not None:
q = data["features"].to_numpy()[mask]
elif i == "c":
q = None
if data["constraints"] is not None:
q = (data["constraints"].to_numpy() > 0).astype(int)[mask]
elif i == "p":
q = data["predictions"].to_numpy()[mask]
else:
raise ValueError(f"Invalid include '{i}'")
result.append(q)
return tuple(result)
@cachable(file=False)
def load_h5_optimizer_data(
self, filepath: str | None = None, opt_id: str | None = None
):
if filepath is None:
filepath = self.output_filepath
if opt_id is None:
opt_id = self.config.dopt_params.opt_id
with h5py.File(filepath, "r") as h5:
stats = None
if f"/{opt_id}/optimizer_stats" in h5:
epoch = 0 if f"/{opt_id}/optimizer_stats/0" in h5 else 1
stats = []
while True:
if f"/{opt_id}/optimizer_stats/{epoch}" not in h5:
break
epoch_stats = h5[f"/{opt_id}/optimizer_stats/{epoch}/stats"]
stats.append(
{
n: v
for n, v in zip(
epoch_stats[0].dtype.names, epoch_stats[0]
)
}
)
epoch += 1
stats = pd.DataFrame(stats)
params = None
if f"/{opt_id}/optimizer_params" in h5:
epoch = 1
params = []
while True:
if f"/{opt_id}/optimizer_params/{epoch}" not in h5:
break
epoch_params = h5[f"/{opt_id}/optimizer_params/{epoch}"]
row = {"epoch": epoch}
for dset in epoch_params:
row[dset] = epoch_params[dset][()]
params.append(row)
epoch += 1
params = pd.DataFrame(params)
return {"stats": stats, "params": params}
@cachable(file=False)
def load_h5_surrogate_evals(
self,
filepath: str | None = None,
opt_id: str | None = None,
problem_id: int = 0,
):
if filepath is None:
filepath = self.output_filepath
if opt_id is None:
opt_id = self.config.dopt_params.opt_id
with h5py.File(filepath, "r") as h5:
epochs = None
if f"/{opt_id}/surrogate_evals/epochs" in h5:
epochs = h5[f"/{opt_id}/surrogate_evals/epochs"][:]
generations = None
if f"/{opt_id}/surrogate_evals/generations" in h5:
generations = h5[f"/{opt_id}/surrogate_evals/generations"][:]
objectives = None
if f"/{opt_id}/surrogate_evals/objectives" in h5:
objective_enum = h5py.check_enum_dtype(
h5[f"{opt_id}/objective_enum"].dtype
)
objective_enum_T = {v: k for k, v in objective_enum.items()}
objective_names = [
objective_enum_T[s[0]]
for s in iter(h5[f"{opt_id}/objective_spec"])
]
objectives = pd.DataFrame(
h5[f"/{opt_id}/surrogate_evals/objectives"][:],
columns=objective_names,
)
parameters = None
if f"/{opt_id}/surrogate_evals/parameters" in h5:
parameter_enum = h5py.check_enum_dtype(
h5[f"{opt_id}/parameter_enum"].dtype
)
parameter_enum_T = {v: k for k, v in parameter_enum.items()}
parameter_names = [
parameter_enum_T[s[0]]
for s in iter(h5[f"{opt_id}/parameter_spec"])
]
parameters = pd.DataFrame(
h5[f"/{opt_id}/surrogate_evals/parameters"][:],
columns=parameter_names,
)
return {
"epochs": epochs,
"generations": generations,
"objectives": objectives,
"parameters": parameters,
}
def infer_num_initial_samples(self, problem_id: int = 0) -> int:
with h5py.File(self.output_filepath, "r") as h5:
epochs = h5[
f"{self.config.dopt_params.opt_id}/{problem_id}/epochs"
][:]
self.inferred_num_initial_samples = len(epochs[epochs == 0])
return self.inferred_num_initial_samples
@cachable()
def get_best(
self,
region: list | tuple | None = None,
sort_by: str = "-np.std(y, axis=1)",
as_dataframes: bool = True,
epsilon=None,
):
data = self.load_h5()
if region is None:
if len(data["epochs"]) > 5000:
# optimize for speed since best solutions will be found in the last epochs
region = slice(-5000, None)
else:
region = slice(None)
else:
region = slice(*region)
objectives = data["objectives"].to_numpy()[region]
valid = np.logical_not(np.any(np.isnan(objectives), axis=1))
y = objectives[valid]
x = data["parameters"].to_numpy()[region][valid]
if data["constraints"] is not None:
C = data["constraints"].to_numpy()[region][valid]
else:
C = None
if data["features"] is not None:
f = data["features"].to_numpy()[region][valid]
else:
f = None
epochs = data["epochs"][region][valid]
if epsilon is not None or len(x) == 0:
best_x, best_y, best_f, best_c, eps = epsilon_get_best(
x, y, f, C, epsilons=epsilon
)
best_epoch = None
else:
# strict non-dominated sort
best_x, best_y, best_f, best_c, best_epoch, perm = get_best(
x, y, f, C, None, None, epochs=epochs
)
if isinstance(sort_by, str):
if len(best_x) > 0:
context = {
"reduced": None,
"x": best_x,
"y": best_y,
"f": best_f,
"c": best_c,
"epochs": best_epoch,
"np": np,
}
exec(f"reduced={sort_by}", context)
sort_by = np.argsort(context["reduced"])
else:
sort_by = None
best = {
"x": best_x,
"y": best_y,
"f": best_f,
"c": best_c,
"epoch": best_epoch,
}
# apply sort
if sort_by is not None:
for k in best.keys():
if best[k] is not None:
best[k] = best[k][sort_by]
if as_dataframes:
best["x"] = pd.DataFrame(
best["x"], columns=data["parameters"].columns
)
best["y"] = pd.DataFrame(
best["y"], columns=data["objectives"].columns
)
if best["f"] is not None:
best["f"] = pd.DataFrame(
best["f"], columns=data["features"].columns
)
if best["c"] is not None:
best["c"] = pd.DataFrame(
best["c"], columns=data["constraints"].columns
)
if best["epoch"] is not None:
best["epoch"] = pd.DataFrame(best["epoch"], columns=["epoch"])
return best
def front(self, pf=None):
if pf is None:
return self.get_best()["y"].to_numpy()
elif isinstance(pf, pd.DataFrame):
return pf.to_numpy()
elif isinstance(pf, pd.Series):
return pf.to_numpy()
elif isinstance(pf, Dmosopt):
return pf.get_best()["y"].to_numpy()
else:
return np.array(pf)
def norm_front(self, pf, min_max=None):
pf = self.front(pf)
if not isinstance(min_max, (list, tuple)):
fmin, fmax = np.min(pf, axis=0), np.max(pf, axis=1)
else:
fmin, fmax = np.array(min_max[0]), np.array(min_max[1])
return (pf - fmin) / (fmax - fmin + 1e-8)
def igd(self, ref_front, pf=None):
ref_front, pf = self.front(ref_front), self.front(pf)
indicator = indicators.IGD(np.array(pf))
return indicator.do(np.array(ref_front))
@cachable()
def hypervolume(self, ref_point, pf=None, normalize=False):
if normalize:
pf = self.norm_front(pf, normalize)
else:
pf = self.front(pf)
indicator = indicators.Hypervolume(np.array(ref_point))
return indicator.do(np.array(pf))
def norm_hv(self, nadir, pf=None):
return self.hypervolume(
ref_point=[1.1] * len(nadir),
pf=pf,
normalize=[[0.0] * len(nadir), nadir],
)
def norm_hv_region(self, nadir, region):
if region[1] > 5000:
# optimize for speed
region = [region[1] - 5000, region[1]]
pf = self.get_best(region=list(region))["y"].to_numpy().tolist()
return self.norm_hv(nadir, pf=pf)
def norm_hv_epochs(self, nadir, from_zero=True):
return [
self.norm_hv_region(nadir, region)
for region in self.epoch_ranges(from_zero=from_zero)
]
def c_metric(self, ref_front, pf=None):
"""
Calculates the set coverage of A over B, i.e. C(A, B),
which is the fraction of solutions in B that are
dominated by at least one solution in A.
ref_front: B front array
pf: A front array
"""
ref_front, pf = self.front(ref_front), self.front(pf)
coverage_count = 0
for candidate in ref_front:
for solution in pf:
# solution dominates candidate?
if all(r <= c for r, c in zip(solution, candidate)) and any(
r < c for r, c in zip(solution, candidate)
):
coverage_count += 1
break
return coverage_count / len(ref_front)
@property
def dc(self):
return self.config.dopt_params
@property
def parameter_names(self) -> list[str]:
return list(self.config.dopt_params.space.keys())
@property
def constraint_names(self) -> list[str]:
cn = self.config.dopt_params.get("constraint_names", [])
if isinstance(cn, str):
cn = config.import_object_by_path(cn)
if callable(cn):
cn = cn(self)
return cn
@property
def num_constraints(self) -> int:
return len(self.constraint_names)
@property
def objective_names(self) -> list[str]:
on = self.config.dopt_params.get("objective_names", [])
if isinstance(on, str):
on = config.import_object_by_path(on)
if callable(on):
on = on(self)
return on
@property
def num_objectives(self) -> int:
return len(self.objective_names)
@property
def feature_names(self) -> list[str]:
return [f[0] for f in self.feature_dtypes]
@property
def feature_dtypes(self) -> list[str]:
fn = self.config.dopt_params.get("feature_dtypes", [])
if isinstance(fn, str):
fn = config.import_object_by_path(fn)
if callable(fn):
fn = fn(self)
return fn
@property
def resample_fraction(self) -> float:
return self.config.dopt_params.get("resample_fraction", 0.25)
@property
def population_size(self) -> int:
return self.config.dopt_params.get("population_size", 100)
@property
def surrogate_method_name(self) -> str:
return self.config.dopt_params.get("surrogate_method_name", "gpr")
@property
def initial_method(self) -> str:
return self.config.dopt_params.get("initial_method", "slh")
@property
def num_generations(self) -> int:
return self.config.dopt_params.get("num_generations", 200)
@property
def n_epochs(self) -> int:
return self.config.dopt_params.get("n_epochs", 10)
@property
def n_initial(self) -> int:
return self.config.dopt_params.get("n_initial", 10)
@property
def num_features(self) -> int:
return len(self.feature_names)
@property
def num_parameters(self) -> int:
return len(self.space)
@property
def num_initial_samples(self) -> int:
if (
self.config.dopt_params.get("dynamic_initial_sampling", None)
is not None
):
n_initial = getattr(self, "inferred_num_initial_samples", None)
if n_initial is None:
return self.infer_num_initial_samples()
else:
return n_initial
return self.n_initial * self.num_parameters
@property
def num_resample(self) -> int:
return int(self.resample_fraction * self.population_size)
@property
def num_evals_per_epoch(self) -> int:
if (
self.surrogate_method_name is None
and self.config.dopt_params.get("surrogate_custom_training", None)
is None
):
return (
self.population_size * self.num_generations + self.num_resample
)
return self.num_resample
@property
def num_evals_total(self) -> int:
# n_epochs - 1 since epoch 0 is using the initial sampling, so there are no additional evals
return (
self.num_initial_samples
+ (self.n_epochs - 1) * self.num_evals_per_epoch
)
@property
def num_max_surrogate_evals(self) -> int:
if (
self.surrogate_method_name is None
and self.config.dopt_params.get("surrogate_custom_training", None)
is None
):
return 0
evals = 0
for epoch in range(1, self.n_epochs - 1):
# initial sampling
evals += self.num_initial_samples
evals += self.population_size * epoch
# generation
evals += self.population_size * (self.num_generations + 1)
return evals
@property
def space(self) -> dict[str, tuple[Number, Number]]:
return self.config.dopt_params.get("space", {})
@property
def xub(self) -> list[Number]:
return [v[1] for v in self.space.values()]
@property
def xlb(self) -> list[Number]:
return [v[0] for v in self.space.values()]
def epoch_ranges(self, inferred=True, from_zero=False):
if not inferred:
return [
(0, self.num_initial_samples),
] + [
(
(
int(
self.num_initial_samples
+ (self.num_evals_per_epoch * e)
)
if not from_zero
else 0
),
self.num_initial_samples
+ (self.num_evals_per_epoch * (e + 1)),
)
for e in range(self.n_epochs - 1)
]
epoch_array = self.load_h5()["epochs"]
change_indices = np.where(np.diff(epoch_array) != 0)[0] + 1
all_indices = np.concatenate(([0], change_indices, [len(epoch_array)]))
return [
(
int(all_indices[i]) if not from_zero else 0,
int(all_indices[i + 1]),
)
for i in range(len(all_indices) - 1)
]
def estimate_run_time(self, eval_seconds, surrogate_eval_seconds=None):
seconds = self.num_evals_total * eval_seconds
if surrogate_eval_seconds is not None:
seconds += self.num_max_surrogate_evals * surrogate_eval_seconds
return datetime.timedelta(seconds=seconds)
def h5_config_consistency(self) -> list[tuple[str, Number, Number]]:
inconsistencies = []
data = self.load_h5()
# num_features
if self.num_features == 0:
if data["features"] is not None:
inconsistencies.append(
(
"num_features",
self.num_features,
data["features"].shape[1],
)
)
elif self.num_features != data["features"].shape[1]:
inconsistencies.append(
("num_features", self.num_features, data["features"].shape[1])
)
if self.feature_names != data["features"].columns.tolist():
inconsistencies.append(
(
"feature_names",
self.feature_names,
data["features"].columns.tolist(),
)
)
# num_constraints
if self.num_constraints == 0:
if data["constraints"] is not None:
inconsistencies.append(
(
"num_constraints",
self.num_constraints,
data["constraints"].shape[1],
)
)
elif self.num_constraints != data["constraints"].shape[1]:
inconsistencies.append(
(
"num_constraints",
self.num_constraints,
data["constraints"].shape[1],
)
)
if self.constraint_names != data["constraints"].columns.tolist():
inconsistencies.append(
(
"constraint_names",
self.constraint_names,
data["constraints"].columns.tolist(),
)
)
# num_parameters
if self.num_parameters != data["parameters"].shape[1]:
inconsistencies.append(
(
"num_parameters",
self.num_parameters,
data["parameters"].shape[1],
)
)
if (
self.config.dopt_params.space.keys()
!= data["parameters"].columns.tolist()
):
inconsistencies.append(
(
"parameter_names",
self.config.dopt_params.space.keys(),
data["parameters"].columns.tolist(),
)
)
# num_evals_total
if self.num_evals_total != len(data["epochs"]):
inconsistencies.append(
("num_evals_total", self.num_evals_total, len(data["epochs"]))
)
return inconsistencies