Source code for jetgp.full_ddegp.optimizer

import numpy as np

from scipy.linalg import cho_solve, cho_factor
from jetgp.full_ddegp import ddegp_utils as utils
from line_profiler import profile
import jetgp.utils as gen_utils
from jetgp.hyperparameter_optimizers import OPTIMIZERS
from jetgp.utils import matern_kernel_grad_builder

[docs] class Optimizer: """ Optimizer class to perform hyperparameter tuning for derivative-enhanced Gaussian Process models by minimizing the negative log marginal likelihood (NLL). Parameters ---------- model : object An instance of a model (e.g., ddegp) containing the necessary training data and kernel configuration. """ def __init__(self, model): self.model = model self._kernel_plan = None self._deriv_buf = None self._deriv_buf_shape = None self._deriv_factors = None self._deriv_factors_key = None self._K_buf = None self._dK_buf = None self._kernel_buf_size = None self._W_proj_buf = None self._W_proj_shape = None def _get_deriv_buf(self, phi, n_bases, order): from math import comb ndir = comb(n_bases + order, order) shape = (ndir, phi.shape[0], phi.shape[1]) if self._deriv_buf is None or self._deriv_buf_shape != shape: self._deriv_buf = np.zeros(shape, dtype=np.float64) self._deriv_buf_shape = shape return self._deriv_buf def _expand_derivs(self, phi, n_bases, deriv_order): """Expand OTI derivatives, using fast struct path if available.""" if hasattr(phi, 'get_all_derivs_fast'): buf = self._get_deriv_buf(phi, n_bases, deriv_order) factors = self._get_deriv_factors(n_bases, deriv_order) return phi.get_all_derivs_fast(factors, buf) return phi.get_all_derivs(n_bases, deriv_order) @staticmethod def _enum_factors(max_basis, ordi): from math import factorial from collections import Counter if ordi == 1: for _ in range(max_basis): yield 1.0 return for last in range(1, max_basis + 1): if ordi == 2: for i in range(1, last + 1): counts = Counter((i, last)) f = 1 for c in counts.values(): f *= factorial(c) yield float(f) else: for _, prefix_counts in Optimizer._enum_factors_with_counts(last, ordi - 1): counts = dict(prefix_counts) counts[last] = counts.get(last, 0) + 1 f = 1 for c in counts.values(): f *= factorial(c) yield float(f) @staticmethod def _enum_factors_with_counts(max_basis, ordi): from math import factorial from collections import Counter if ordi == 1: for i in range(1, max_basis + 1): yield 1.0, {i: 1} return for last in range(1, max_basis + 1): for _, prefix_counts in Optimizer._enum_factors_with_counts(last, ordi - 1): counts = dict(prefix_counts) counts[last] = counts.get(last, 0) + 1 f = 1 for c in counts.values(): f *= factorial(c) yield float(f), counts def _get_deriv_factors(self, n_bases, order): key = (n_bases, order) if self._deriv_factors is not None and self._deriv_factors_key == key: return self._deriv_factors factors = [1.0] for ordi in range(1, order + 1): factors.extend(self._enum_factors(n_bases, ordi)) self._deriv_factors = np.array(factors, dtype=np.float64) self._deriv_factors_key = key return self._deriv_factors def _ensure_kernel_plan(self, n_bases): """Lazily precompute kernel plan (once per n_bases).""" if self._kernel_plan is not None and self._kernel_plan_n_bases == n_bases: return if not hasattr(utils, 'precompute_kernel_plan'): self._kernel_plan = None return self._kernel_plan = utils.precompute_kernel_plan( self.model.n_order, n_bases, self.model.flattened_der_indices, self.model.powers, self.model.derivative_locations, ) self._kernel_plan_n_bases = n_bases self._K_buf = None self._dK_buf = None self._kernel_buf_size = None def _ensure_kernel_bufs(self, n_rows_func): """Pre-allocate reusable K and dK buffers (avoids repeated malloc).""" if self._kernel_plan is None: return total = n_rows_func + self._kernel_plan['n_pts_with_derivs'] if self._kernel_buf_size != total: self._K_buf = np.empty((total, total)) self._dK_buf = np.empty((total, total)) self._kernel_buf_size = total if 'row_offsets_abs' not in self._kernel_plan: self._kernel_plan['row_offsets_abs'] = self._kernel_plan['row_offsets'] + n_rows_func self._kernel_plan['col_offsets_abs'] = self._kernel_plan['col_offsets'] + n_rows_func def _build_K(self, phi_exp, phi, n_bases): """Build kernel matrix using fast path if available.""" self._ensure_kernel_plan(n_bases) if self._kernel_plan is not None: base_shape = phi.shape self._ensure_kernel_bufs(base_shape[0]) phi_3d = phi_exp.reshape(phi_exp.shape[0], base_shape[0], base_shape[1]) return utils.rbf_kernel_fast(phi_3d, self._kernel_plan, out=self._K_buf) return utils.rbf_kernel( phi, phi_exp, self.model.n_order, n_bases, self.model.flattened_der_indices, self.model.powers, self.model.derivative_locations, )
[docs] @profile def negative_log_marginal_likelihood(self, x0): """ Compute the negative log marginal likelihood (NLL) of the model. NLL = 0.5 * y^T K^-1 y + 0.5 * log|K| + 0.5 * N * log(2π) Parameters ---------- x0 : ndarray Vector of log-scaled hyperparameters (length scales and noise). Returns ------- float Value of the negative log marginal likelihood. """ ell = x0[:-1] sigma_n = x0[-1] llhood = 0 diffs = self.model.differences_by_dim phi = self.model.kernel_func(diffs, ell) n_bases = phi.get_active_bases()[-1] # Extract ALL derivative components deriv_order = 2 * self.model.n_order phi_exp = self._expand_derivs(phi, n_bases, deriv_order) K = self._build_K(phi_exp, phi, n_bases) K += ((10 ** sigma_n) ** 2) * np.eye(len(K)) K += self.model.sigma_data**2 try: L, low = cho_factor(K) alpha = cho_solve( (L, low), self.model.y_train ) data_fit = 0.5 * np.dot(self.model.y_train, alpha) log_det_K = np.sum(np.log(np.diag(L))) complexity = log_det_K N = len(self.model.y_train) const = 0.5 * N * np.log(2 * np.pi) return data_fit + complexity + const except Exception: return 1e6
[docs] def nll_wrapper(self, x0): """ Wrapper function to compute NLL for optimizer. Parameters ---------- x0 : ndarray Hyperparameter vector. Returns ------- float NLL evaluated at x0. """ return self.negative_log_marginal_likelihood(x0)
[docs] def nll_grad(self, x0): """Analytic gradient of the NLL w.r.t. log10-scaled hyperparameters.""" ln10 = np.log(10.0) kernel = self.model.kernel kernel_type = self.model.kernel_type D = len(self.model.differences_by_dim) sigma_n_sq = (10.0 ** x0[-1]) ** 2 diffs = self.model.differences_by_dim oti = self.model.kernel_factory.oti phi = self.model.kernel_func(diffs, x0[:-1]) n_bases = phi.get_active_bases()[-1] deriv_order = 2 * self.model.n_order phi_exp = self._expand_derivs(phi, n_bases, deriv_order) K = self._build_K(phi_exp, phi, n_bases) K.flat[::K.shape[0] + 1] += sigma_n_sq K += self.model.sigma_data ** 2 try: L, low = cho_factor(K) alpha_v = cho_solve((L, low), self.model.y_train) N = len(self.model.y_train) K_inv = cho_solve((L, low), np.eye(N)) W = K_inv - np.outer(alpha_v, alpha_v) except Exception: return np.zeros(len(x0)) grad = np.zeros(len(x0)) use_fast = self._kernel_plan is not None base_shape = phi.shape W_proj = None if use_fast: from math import comb ndir = comb(n_bases + deriv_order, deriv_order) proj_shape = (ndir, base_shape[0], base_shape[1]) if self._W_proj_buf is None or self._W_proj_shape != proj_shape: self._W_proj_buf = np.empty(proj_shape) self._W_proj_shape = proj_shape W_proj = self._W_proj_buf plan = self._kernel_plan row_off = plan.get('row_offsets_abs', plan['row_offsets'] + base_shape[0]) col_off = plan.get('col_offsets_abs', plan['col_offsets'] + base_shape[1]) utils._project_W_to_phi_space( W, W_proj, base_shape[0], base_shape[1], plan['fd_flat_indices'], plan['df_flat_indices'], plan['dd_flat_indices'], plan['idx_flat'], plan['idx_offsets'], plan['index_sizes'], plan['signs'], plan['n_deriv_types'], row_off, col_off, ) def _gc(dphi): dphi_exp = self._expand_derivs(dphi, n_bases, deriv_order) if W_proj is not None: dphi_3d = dphi_exp.reshape(W_proj.shape) return 0.5 * np.vdot(W_proj, dphi_3d) elif use_fast: dphi_3d = dphi_exp.reshape(dphi_exp.shape[0], base_shape[0], base_shape[1]) dK = utils.rbf_kernel_fast(dphi_3d, self._kernel_plan, out=self._dK_buf) return 0.5 * np.vdot(W, dK) else: dK = utils.rbf_kernel( dphi, dphi_exp, self.model.n_order, n_bases, self.model.flattened_der_indices, self.model.powers, self.model.derivative_locations, ) return 0.5 * np.vdot(W, dK) grad[-2] = _gc(oti.mul(2.0 * ln10, phi)) grad[-1] = ln10 * sigma_n_sq * np.trace(W) if kernel == 'SE': if kernel_type == 'anisotropic': ell = 10.0 ** x0[:D] if hasattr(phi, 'fused_scale_sq_mul'): dphi_buf = oti.zeros(phi.shape) for d in range(D): dphi_buf.fused_scale_sq_mul(diffs[d], phi, -ln10 * ell[d] ** 2) grad[d] = _gc(dphi_buf) else: for d in range(D): grad[d] = _gc(oti.mul(-ln10 * ell[d] ** 2, oti.mul(oti.mul(diffs[d], diffs[d]), phi))) else: ell = 10.0 ** float(x0[0]) if hasattr(phi, 'fused_sum_sq'): sum_sq = oti.zeros(phi.shape) sum_sq.fused_sum_sq(diffs) else: sum_sq = oti.mul(diffs[0], diffs[0]) for d in range(1, D): sum_sq = oti.sum(sum_sq, oti.mul(diffs[d], diffs[d])) grad[0] = _gc(oti.mul(-ln10 * ell ** 2, oti.mul(sum_sq, phi))) elif kernel == 'RQ': if kernel_type == 'anisotropic': ell = 10.0 ** x0[:D]; alpha_rq = 10.0 ** float(x0[D]); alpha_idx = D else: ell = np.full(D, 10.0 ** float(x0[0])) alpha_rq = np.exp(float(x0[1])); alpha_idx = 1 if hasattr(phi, 'fused_sqdist'): r2 = oti.zeros(phi.shape) ell_sq = np.ascontiguousarray(ell ** 2, dtype=np.float64) r2.fused_sqdist(diffs, ell_sq) else: r2 = oti.mul(ell[0], diffs[0]); r2 = oti.mul(r2, r2) for d in range(1, D): td = oti.mul(ell[d], diffs[d]); r2 = oti.sum(r2, oti.mul(td, td)) base = oti.sum(1.0, oti.mul(r2, 1.0 / (2.0 * alpha_rq))) inv_base = oti.pow(base, -1) phi_over_base = oti.mul(phi, inv_base) if kernel_type == 'anisotropic': if hasattr(phi, 'fused_scale_sq_mul'): dphi_buf = oti.zeros(phi.shape) for d in range(D): dphi_buf.fused_scale_sq_mul(diffs[d], phi_over_base, -ln10 * ell[d] ** 2) grad[d] = _gc(dphi_buf) else: for d in range(D): grad[d] = _gc(oti.mul(-ln10 * ell[d] ** 2, oti.mul(oti.mul(diffs[d], diffs[d]), phi_over_base))) else: if hasattr(phi, 'fused_sum_sq'): sum_sq = oti.zeros(phi.shape) sum_sq.fused_sum_sq(diffs) else: sum_sq = oti.mul(diffs[0], diffs[0]) for d in range(1, D): sum_sq = oti.sum(sum_sq, oti.mul(diffs[d], diffs[d])) grad[0] = _gc(oti.mul(-ln10 * ell[0] ** 2, oti.mul(sum_sq, phi_over_base))) log_base = oti.log(base) term = oti.sub(oti.sub(1.0, inv_base), log_base) alpha_factor = ln10 * alpha_rq if kernel_type == 'anisotropic' else alpha_rq grad[alpha_idx] = _gc(oti.mul(alpha_factor, oti.mul(phi, term))) elif kernel == 'SineExp': if kernel_type == 'anisotropic': ell = 10.0 ** x0[:D]; p = 10.0 ** x0[D:2*D] pip = np.pi / p; p_start = D else: ell = np.full(D, 10.0 ** float(x0[0])) pip = np.full(D, np.pi / 10.0 ** float(x0[1])); p_start = 1 sin_d = [oti.sin(oti.mul(pip[d], diffs[d])) for d in range(D)] cos_d = [oti.cos(oti.mul(pip[d], diffs[d])) for d in range(D)] if kernel_type == 'anisotropic': if hasattr(phi, 'fused_scale_sq_mul'): dphi_buf = oti.zeros(phi.shape) for d in range(D): dphi_buf.fused_scale_sq_mul(sin_d[d], phi, -4.0 * ln10 * ell[d] ** 2) grad[d] = _gc(dphi_buf) else: for d in range(D): grad[d] = _gc(oti.mul(-4.0 * ln10 * ell[d] ** 2, oti.mul(oti.mul(sin_d[d], sin_d[d]), phi))) for d in range(D): sc = oti.mul(sin_d[d], oti.mul(cos_d[d], diffs[d])) grad[p_start + d] = _gc(oti.mul(4.0 * ln10 * ell[d] ** 2 * pip[d], oti.mul(sc, phi))) else: if hasattr(phi, 'fused_sum_sq'): ss = oti.zeros(phi.shape) ss.fused_sum_sq(sin_d) else: ss = oti.mul(sin_d[0], sin_d[0]) for d in range(1, D): ss = oti.sum(ss, oti.mul(sin_d[d], sin_d[d])) grad[0] = _gc(oti.mul(-4.0 * ln10 * ell[0] ** 2, oti.mul(ss, phi))) scd = oti.mul(sin_d[0], oti.mul(cos_d[0], diffs[0])) for d in range(1, D): scd = oti.sum(scd, oti.mul(sin_d[d], oti.mul(cos_d[d], diffs[d]))) grad[p_start] = _gc(oti.mul(4.0 * ln10 * ell[0] ** 2 * pip[0], oti.mul(scd, phi))) elif kernel == 'Matern': kf = self.model.kernel_factory if not hasattr(kf, '_matern_grad_prebuild'): kf._matern_grad_prebuild = matern_kernel_grad_builder(getattr(kf, "nu", 1.5), oti_module=oti) ell = (10.0 ** x0[:D] if kernel_type == 'anisotropic' else np.full(D, 10.0 ** float(x0[0]))) sigma_f_sq = (10.0 ** float(x0[-2])) ** 2 _eps = 1e-10 if hasattr(phi, 'fused_sqdist'): r2 = oti.zeros(phi.shape) ell_sq = np.ascontiguousarray(ell ** 2, dtype=np.float64) r2.fused_sqdist(diffs, ell_sq) else: r2 = oti.mul(ell[0], diffs[0]); r2 = oti.mul(r2, r2) for d in range(1, D): td = oti.mul(ell[d], diffs[d]); r2 = oti.sum(r2, oti.mul(td, td)) r_oti = oti.sqrt(oti.sum(r2, _eps ** 2)) f_prime_r = kf._matern_grad_prebuild(r_oti) inv_r = oti.pow(r_oti, -1) base_matern = oti.mul(sigma_f_sq, oti.mul(f_prime_r, inv_r)) if kernel_type == 'anisotropic': if hasattr(phi, 'fused_scale_sq_mul'): dphi_buf = oti.zeros(phi.shape) for d in range(D): dphi_buf.fused_scale_sq_mul(diffs[d], base_matern, ln10 * ell[d] ** 2) grad[d] = _gc(dphi_buf) else: for d in range(D): d_sq = oti.mul(diffs[d], diffs[d]) dphi_d = oti.mul(ln10 * ell[d] ** 2, oti.mul(d_sq, base_matern)) grad[d] = _gc(dphi_d) else: if hasattr(phi, 'fused_sum_sq'): sum_dsq = oti.zeros(phi.shape) sum_dsq.fused_sum_sq(diffs) else: sum_dsq = oti.mul(diffs[0], diffs[0]) for d in range(1, D): sum_dsq = oti.sum(sum_dsq, oti.mul(diffs[d], diffs[d])) dphi_e = oti.mul(ln10 * ell[0] ** 2, oti.mul(sum_dsq, base_matern)) grad[0] = _gc(dphi_e) return grad
[docs] def nll_and_grad(self, x0): """Compute NLL and its gradient in a single pass, sharing one Cholesky.""" ln10 = np.log(10.0) kernel = self.model.kernel kernel_type = self.model.kernel_type D = len(self.model.differences_by_dim) sigma_n_sq = (10.0 ** x0[-1]) ** 2 diffs = self.model.differences_by_dim oti = self.model.kernel_factory.oti # --- shared kernel computation (done ONCE) --- phi = self.model.kernel_func(diffs, x0[:-1]) if self.model.n_order == 0: n_bases = 0 phi_exp = phi.real[np.newaxis, :, :] else: n_bases = phi.get_active_bases()[-1] deriv_order = 2 * self.model.n_order phi_exp = self._expand_derivs(phi, n_bases, deriv_order) K = self._build_K(phi_exp, phi, n_bases) K.flat[::K.shape[0] + 1] += sigma_n_sq K += self.model.sigma_data ** 2 try: L, low = cho_factor(K) alpha_v = cho_solve((L, low), self.model.y_train) N = len(self.model.y_train) # NLL nll = (0.5 * np.dot(self.model.y_train, alpha_v) + np.sum(np.log(np.diag(L))) + 0.5 * N * np.log(2 * np.pi)) # W matrix for gradient (reuse same Cholesky) K_inv = cho_solve((L, low), np.eye(N)) W = K_inv - np.outer(alpha_v, alpha_v) except Exception: return 1e6, np.zeros(len(x0)) # --- gradient from W (no second kernel build / Cholesky) --- grad = np.zeros(len(x0)) use_fast = self._kernel_plan is not None base_shape = phi.shape deriv_order_gc = 2 * self.model.n_order W_proj = None if use_fast and self.model.n_order > 0: from math import comb ndir = comb(n_bases + deriv_order_gc, deriv_order_gc) proj_shape = (ndir, base_shape[0], base_shape[1]) if self._W_proj_buf is None or self._W_proj_shape != proj_shape: self._W_proj_buf = np.empty(proj_shape) self._W_proj_shape = proj_shape W_proj = self._W_proj_buf plan = self._kernel_plan row_off = plan.get('row_offsets_abs', plan['row_offsets'] + base_shape[0]) col_off = plan.get('col_offsets_abs', plan['col_offsets'] + base_shape[1]) utils._project_W_to_phi_space( W, W_proj, base_shape[0], base_shape[1], plan['fd_flat_indices'], plan['df_flat_indices'], plan['dd_flat_indices'], plan['idx_flat'], plan['idx_offsets'], plan['index_sizes'], plan['signs'], plan['n_deriv_types'], row_off, col_off, ) def _gc(dphi): if self.model.n_order == 0: dphi_exp = dphi.real[np.newaxis, :, :] else: dphi_exp = self._expand_derivs(dphi, n_bases, deriv_order_gc) if W_proj is not None: dphi_3d = dphi_exp.reshape(W_proj.shape) return 0.5 * np.vdot(W_proj, dphi_3d) elif use_fast: dphi_3d = dphi_exp.reshape(dphi_exp.shape[0], base_shape[0], base_shape[1]) dK = utils.rbf_kernel_fast(dphi_3d, self._kernel_plan, out=self._dK_buf) return 0.5 * np.vdot(W, dK) else: dK = utils.rbf_kernel( dphi, dphi_exp, self.model.n_order, n_bases, self.model.flattened_der_indices, self.model.powers, self.model.derivative_locations, ) return 0.5 * np.vdot(W, dK) grad[-2] = _gc(oti.mul(2.0 * ln10, phi)) grad[-1] = ln10 * sigma_n_sq * np.trace(W) if kernel == 'SE': if kernel_type == 'anisotropic': ell = 10.0 ** x0[:D] if hasattr(phi, 'fused_scale_sq_mul'): dphi_buf = oti.zeros(phi.shape) for d in range(D): dphi_buf.fused_scale_sq_mul(diffs[d], phi, -ln10 * ell[d] ** 2) grad[d] = _gc(dphi_buf) else: for d in range(D): grad[d] = _gc(oti.mul(-ln10 * ell[d] ** 2, oti.mul(oti.mul(diffs[d], diffs[d]), phi))) else: ell = 10.0 ** float(x0[0]) if hasattr(phi, 'fused_sum_sq'): sum_sq = oti.zeros(phi.shape) sum_sq.fused_sum_sq(diffs) else: sum_sq = oti.mul(diffs[0], diffs[0]) for d in range(1, D): sum_sq = oti.sum(sum_sq, oti.mul(diffs[d], diffs[d])) grad[0] = _gc(oti.mul(-ln10 * ell ** 2, oti.mul(sum_sq, phi))) elif kernel == 'RQ': if kernel_type == 'anisotropic': ell = 10.0 ** x0[:D]; alpha_rq = 10.0 ** float(x0[D]); alpha_idx = D else: ell = np.full(D, 10.0 ** float(x0[0])) alpha_rq = np.exp(float(x0[1])); alpha_idx = 1 if hasattr(phi, 'fused_sqdist'): r2 = oti.zeros(phi.shape) ell_sq = np.ascontiguousarray(ell ** 2, dtype=np.float64) r2.fused_sqdist(diffs, ell_sq) else: r2 = oti.mul(ell[0], diffs[0]); r2 = oti.mul(r2, r2) for d in range(1, D): td = oti.mul(ell[d], diffs[d]); r2 = oti.sum(r2, oti.mul(td, td)) base = oti.sum(1.0, oti.mul(r2, 1.0 / (2.0 * alpha_rq))) inv_base = oti.pow(base, -1) phi_over_base = oti.mul(phi, inv_base) if kernel_type == 'anisotropic': if hasattr(phi, 'fused_scale_sq_mul'): dphi_buf = oti.zeros(phi.shape) for d in range(D): dphi_buf.fused_scale_sq_mul(diffs[d], phi_over_base, -ln10 * ell[d] ** 2) grad[d] = _gc(dphi_buf) else: for d in range(D): grad[d] = _gc(oti.mul(-ln10 * ell[d] ** 2, oti.mul(oti.mul(diffs[d], diffs[d]), phi_over_base))) else: if hasattr(phi, 'fused_sum_sq'): sum_sq = oti.zeros(phi.shape) sum_sq.fused_sum_sq(diffs) else: sum_sq = oti.mul(diffs[0], diffs[0]) for d in range(1, D): sum_sq = oti.sum(sum_sq, oti.mul(diffs[d], diffs[d])) grad[0] = _gc(oti.mul(-ln10 * ell[0] ** 2, oti.mul(sum_sq, phi_over_base))) log_base = oti.log(base) term = oti.sub(oti.sub(1.0, inv_base), log_base) alpha_factor = ln10 * alpha_rq if kernel_type == 'anisotropic' else alpha_rq grad[alpha_idx] = _gc(oti.mul(alpha_factor, oti.mul(phi, term))) elif kernel == 'SineExp': if kernel_type == 'anisotropic': ell = 10.0 ** x0[:D]; p = 10.0 ** x0[D:2*D] pip = np.pi / p; p_start = D else: ell = np.full(D, 10.0 ** float(x0[0])) pip = np.full(D, np.pi / 10.0 ** float(x0[1])); p_start = 1 sin_d = [oti.sin(oti.mul(pip[d], diffs[d])) for d in range(D)] cos_d = [oti.cos(oti.mul(pip[d], diffs[d])) for d in range(D)] if kernel_type == 'anisotropic': if hasattr(phi, 'fused_scale_sq_mul'): dphi_buf = oti.zeros(phi.shape) for d in range(D): dphi_buf.fused_scale_sq_mul(sin_d[d], phi, -4.0 * ln10 * ell[d] ** 2) grad[d] = _gc(dphi_buf) else: for d in range(D): grad[d] = _gc(oti.mul(-4.0 * ln10 * ell[d] ** 2, oti.mul(oti.mul(sin_d[d], sin_d[d]), phi))) for d in range(D): sc = oti.mul(sin_d[d], oti.mul(cos_d[d], diffs[d])) grad[p_start + d] = _gc(oti.mul(4.0 * ln10 * ell[d] ** 2 * pip[d], oti.mul(sc, phi))) else: if hasattr(phi, 'fused_sum_sq'): ss = oti.zeros(phi.shape) ss.fused_sum_sq(sin_d) else: ss = oti.mul(sin_d[0], sin_d[0]) for d in range(1, D): ss = oti.sum(ss, oti.mul(sin_d[d], sin_d[d])) grad[0] = _gc(oti.mul(-4.0 * ln10 * ell[0] ** 2, oti.mul(ss, phi))) scd = oti.mul(sin_d[0], oti.mul(cos_d[0], diffs[0])) for d in range(1, D): scd = oti.sum(scd, oti.mul(sin_d[d], oti.mul(cos_d[d], diffs[d]))) grad[p_start] = _gc(oti.mul(4.0 * ln10 * ell[0] ** 2 * pip[0], oti.mul(scd, phi))) elif kernel == 'Matern': kf = self.model.kernel_factory if not hasattr(kf, '_matern_grad_prebuild'): kf._matern_grad_prebuild = matern_kernel_grad_builder(getattr(kf, "nu", 1.5), oti_module=oti) ell = (10.0 ** x0[:D] if kernel_type == 'anisotropic' else np.full(D, 10.0 ** float(x0[0]))) sigma_f_sq = (10.0 ** float(x0[-2])) ** 2 _eps = 1e-10 if hasattr(phi, 'fused_sqdist'): r2 = oti.zeros(phi.shape) ell_sq = np.ascontiguousarray(ell ** 2, dtype=np.float64) r2.fused_sqdist(diffs, ell_sq) else: r2 = oti.mul(ell[0], diffs[0]); r2 = oti.mul(r2, r2) for d in range(1, D): td = oti.mul(ell[d], diffs[d]); r2 = oti.sum(r2, oti.mul(td, td)) r_oti = oti.sqrt(oti.sum(r2, _eps ** 2)) f_prime_r = kf._matern_grad_prebuild(r_oti) inv_r = oti.pow(r_oti, -1) base_matern = oti.mul(sigma_f_sq, oti.mul(f_prime_r, inv_r)) if kernel_type == 'anisotropic': if hasattr(phi, 'fused_scale_sq_mul'): dphi_buf = oti.zeros(phi.shape) for d in range(D): dphi_buf.fused_scale_sq_mul(diffs[d], base_matern, ln10 * ell[d] ** 2) grad[d] = _gc(dphi_buf) else: for d in range(D): d_sq = oti.mul(diffs[d], diffs[d]) dphi_d = oti.mul(ln10 * ell[d] ** 2, oti.mul(d_sq, base_matern)) grad[d] = _gc(dphi_d) else: if hasattr(phi, 'fused_sum_sq'): sum_dsq = oti.zeros(phi.shape) sum_dsq.fused_sum_sq(diffs) else: sum_dsq = oti.mul(diffs[0], diffs[0]) for d in range(1, D): sum_dsq = oti.sum(sum_dsq, oti.mul(diffs[d], diffs[d])) dphi_e = oti.mul(ln10 * ell[0] ** 2, oti.mul(sum_dsq, base_matern)) grad[0] = _gc(dphi_e) return float(nll), grad
[docs] def optimize_hyperparameters( self, optimizer="pso", **kwargs): """ Optimize the DEGP model hyperparameters using Particle Swarm Optimization (PSO). Parameters: ---------- n_restart_optimizer : int, default=20 Maximum number of iterations for PSO. swarm_size : int, default=20 Number of particles in the swarm. verbose : bool, default=True Controls verbosity of PSO output. Returns: ------- best_x : ndarray The optimal set of hyperparameters found. """ if isinstance(optimizer, str): if optimizer not in OPTIMIZERS: raise ValueError( f"Unknown optimizer '{optimizer}'. Available: {list(OPTIMIZERS.keys())}" ) optimizer_fn = OPTIMIZERS[optimizer] else: optimizer_fn = optimizer # allow passing a callable directly bounds = self.model.bounds lb = [b[0] for b in bounds] ub = [b[1] for b in bounds] if optimizer in ('lbfgs', 'jade', 'pso') and 'func_and_grad' not in kwargs and 'grad_func' not in kwargs: kwargs['func_and_grad'] = self.nll_and_grad best_x, best_val = optimizer_fn(self.nll_wrapper, lb, ub, **kwargs) self.model.opt_x0 = best_x self.model.opt_nll = best_val return best_x