Coverage for jetgp/full_degp/degp.py: 96%
131 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-02 14:19 -0500
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-02 14:19 -0500
1import numpy as np
2from numpy.linalg import cholesky, solve
3from scipy.linalg import cho_solve, cho_factor, solve_triangular
4from jetgp.full_degp import degp_utils
5import jetgp.utils as utils
6from jetgp.kernel_funcs.kernel_funcs import KernelFactory, get_oti_module
7from jetgp.full_degp.optimizer import Optimizer
10class degp:
11 """
12 Derivative-Enhanced Gaussian Process (DEGP) model.
14 Supports coordinate-aligned partial derivatives, hypercomplex representation,
15 and automatic normalization. Includes methods for training, prediction,
16 and uncertainty quantification using kernel methods.
18 Parameters
19 ----------
20 x_train : ndarray
21 Training input data of shape (n_samples, n_features).
22 y_train : list or ndarray
23 Training targets or list of partial derivatives.
24 n_order : int
25 Maximum derivative order.
26 n_bases : int
27 Number of input dimensions.
28 der_indices : list of lists
29 Derivative multi-indices corresponding to each derivative term.
30 derivative_locations : list of lists
31 Which training points have which derivatives.
32 normalize : bool, default=True
33 Whether to normalize inputs and outputs.
34 sigma_data : float or array-like, optional
35 Observation noise standard deviation or diagonal noise values.
36 kernel : str, default='SE'
37 Kernel type ('SE', 'RQ', 'Matern', 'SI', etc.).
38 kernel_type : str, default='anisotropic'
39 Kernel anisotropy ('anisotropic' or 'isotropic').
40 smoothness_parameter : float, optional
41 Smoothness parameter for Matern kernel.
42 """
44 def __init__(
45 self,
46 x_train,
47 y_train,
48 n_order,
49 n_bases,
50 der_indices,
51 derivative_locations=None,
52 normalize=True,
53 sigma_data=None,
54 kernel="SE",
55 kernel_type="anisotropic",
56 smoothness_parameter=None
57 ):
58 if n_order > 0 and derivative_locations is None:
59 import warnings
60 # Count total number of derivative components across all orders
61 n_derivs = sum(len(order_derivs) for order_derivs in der_indices)
62 n_train = len(x_train)
63 derivative_locations = [[i for i in range(n_train)] for _ in range(n_derivs)]
64 warnings.warn(
65 f"derivative_locations not provided. Assuming all {n_derivs} derivative(s) "
66 f"are available at all {n_train} training point(s).",
67 UserWarning
68 )
70 elif der_indices is None and n_order == 0:
71 der_indices = []
72 derivative_locations = []
73 self.n_order = n_order
74 self.n_bases = n_bases
75 self.dim = x_train.shape[1]
76 self.num_points = x_train.shape[0]
77 self.kernel = kernel
78 self.kernel_type = kernel_type
79 self.der_indices = der_indices
80 self.normalize = normalize
81 self.derivative_locations = derivative_locations
82 self.oti = get_oti_module(n_bases, n_order)
83 self.y_train_input = y_train
84 self.x_train_input = x_train
86 # Prepare indices and powers
87 self.flattened_der_indices = utils.flatten_der_indices(der_indices)
88 self.powers = utils.build_companion_array(n_bases, n_order, der_indices)
90 # Normalize data if required
91 if normalize:
92 (
93 self.y_train,
94 self.mu_y,
95 self.sigma_y,
96 self.sigmas_x,
97 self.mus_x,
98 sigma_data,
99 ) = utils.normalize_y_data(
100 x_train, y_train, sigma_data, self.flattened_der_indices
101 )
102 self.x_train = utils.normalize_x_data_train(x_train)
103 else:
104 self.x_train = x_train
105 self.y_train = utils.reshape_y_train(y_train)
107 # Compute differences for the kernel
108 # if kernel == 'SI':
109 # self.differences_by_dim = degp_utils.differences_by_dim_func_SI(
110 # self.x_train, self.x_train, n_order
111 # )
112 # else:
113 self.differences_by_dim = degp_utils.differences_by_dim_func(
114 self.x_train, self.x_train, n_order, self.oti
115 )
117 # Initialize noise matrix
118 self.sigma_data = (
119 np.zeros((self.y_train.shape[0], self.y_train.shape[0]))
120 if sigma_data is None
121 else np.diag(sigma_data)
122 )
124 # Initialize kernel factory and optimizer
125 self.kernel_factory = KernelFactory(
126 dim=n_bases,
127 normalize=normalize,
128 differences_by_dim=self.differences_by_dim,
129 n_order=n_order,
130 smoothness_parameter=smoothness_parameter,
131 oti_module=self.oti
132 )
133 self.kernel_func = self.kernel_factory.create_kernel(
134 kernel_name=self.kernel, kernel_type=self.kernel_type
135 )
136 self.bounds = self.kernel_factory.bounds
137 self.optimizer = Optimizer(self)
139 def optimize_hyperparameters(self, *args, **kwargs):
140 """
141 Optimize model hyperparameters using the optimizer.
142 Returns optimized hyperparameter vector.
143 """
144 self.params = self.optimizer.optimize_hyperparameters(*args, **kwargs)
145 return self.params
147 def predict(self, X_test, params, calc_cov=False, return_deriv=False, derivs_to_predict=None):
148 """
149 Compute posterior predictive mean and (optionally) covariance at X_test.
151 Parameters
152 ----------
153 X_test : ndarray
154 Test input points of shape (n_test, n_features).
155 params : ndarray
156 Log-scaled kernel hyperparameters.
157 calc_cov : bool, default=False
158 Whether to compute predictive variance.
159 return_deriv : bool, default=False
160 Whether to return derivative predictions.
161 derivs_to_predict : list, optional
162 Specific derivatives to predict. Can include derivatives not present in the
163 training set — the cross-covariance K_* is constructed from kernel derivatives
164 and does not require the requested derivative to have been observed during
165 training. Each entry must be a valid derivative spec within n_bases and n_order
166 (e.g. ``[[3, 1]]`` for df/dx3 in a first-order model).
167 If None, defaults to all derivatives used in training.
169 Returns
170 -------
171 f_mean : ndarray
172 Predictive mean vector.
173 f_var : ndarray, optional
174 Predictive variance vector (only if calc_cov=True).
175 """
176 length_scales = params[:-1]
177 sigma_n = params[-1]
179 # Set up derivative prediction configuration
180 if return_deriv:
181 if derivs_to_predict is not None:
182 common_derivs = derivs_to_predict
183 else:
184 if self.n_order == 0:
185 raise ValueError(
186 "derivs_to_predict must be provided when predicting derivatives "
187 "from a model trained with n_order=0 (no derivative training data)."
188 )
189 common_derivs = self.flattened_der_indices
190 print(
191 f"Note: derivs_to_predict is None. Predictions will include all derivatives "
192 f"used in training: {self.flattened_der_indices}"
193 )
195 # Determine prediction order from requested derivatives
196 required_order = max(
197 sum(pair[1] for pair in deriv_spec)
198 for deriv_spec in common_derivs
199 )
200 predict_order = max(required_order, self.n_order)
202 if predict_order > self.n_order:
203 predict_oti = get_oti_module(self.n_bases, predict_order)
204 smoothness_param = getattr(self.kernel_factory, 'alpha', None)
205 predict_kernel_factory = KernelFactory(
206 dim=self.n_bases,
207 normalize=self.normalize,
208 differences_by_dim=self.differences_by_dim,
209 n_order=predict_order,
210 smoothness_parameter=smoothness_param,
211 oti_module=predict_oti
212 )
213 predict_kernel_func = predict_kernel_factory.create_kernel(
214 kernel_name=self.kernel, kernel_type=self.kernel_type
215 )
216 else:
217 predict_oti = self.oti
218 predict_kernel_func = self.kernel_func
220 self.powers_predict = utils.build_companion_array_predict(
221 self.n_bases, predict_order, common_derivs)
222 else:
223 common_derivs = []
224 self.powers_predict = None
225 predict_order = self.n_order
226 predict_oti = self.oti
227 predict_kernel_func = self.kernel_func
229 # Build training kernel matrix
230 phi_train = self.kernel_func(self.differences_by_dim, length_scales)
232 if self.n_order > 0:
233 phi_exp_train = phi_train.get_all_derivs(self.n_bases, 2 * self.n_order)
234 else:
235 phi_exp_train = phi_train.real
236 phi_exp_train = phi_exp_train[np.newaxis, :, :]
238 K = degp_utils.rbf_kernel(
239 phi_train, phi_exp_train, self.n_order, self.n_bases,
240 self.flattened_der_indices, self.powers,
241 index=self.derivative_locations
242 )
243 K += (10 ** sigma_n) ** 2 * np.eye(K.shape[0])
244 K += self.sigma_data ** 2
246 # Solve linear system
247 try:
248 cho_solve_failed = False
249 L, low = cho_factor(K, lower=True)
250 alpha = cho_solve((L, low), self.y_train)
251 except:
252 cho_solve_failed = True
253 alpha = np.linalg.solve(K, self.y_train)
254 print('Warning: Cholesky decomposition failed via scipy, using standard np solve instead.')
256 # Normalize test inputs
257 if self.normalize:
258 X_test = utils.normalize_x_data_test(X_test, self.sigmas_x, self.mus_x)
260 # Set up test derivative locations
261 if return_deriv:
262 derivative_locations_test = [
263 list(range(X_test.shape[0])) for _ in range(len(common_derivs))]
264 else:
265 derivative_locations_test = None
267 # Compute train-test differences
268 # if self.kernel == 'SI':
269 # diff_x_test_x_train = degp_utils.differences_by_dim_func_SI(
270 # self.x_train, X_test, self.n_order, return_deriv=return_deriv
271 # )
272 # else:
273 diff_x_test_x_train = degp_utils.differences_by_dim_func(
274 self.x_train, X_test, predict_order, predict_oti, return_deriv=return_deriv
275 )
277 # Compute train-test kernel
278 phi_train_test = predict_kernel_func(diff_x_test_x_train, length_scales)
279 if predict_order > 0:
280 if return_deriv:
281 phi_exp_train_test = phi_train_test.get_all_derivs(self.n_bases, 2 * predict_order)
282 else:
283 phi_exp_train_test = phi_train_test.get_all_derivs(self.n_bases, predict_order)
284 else:
285 phi_exp_train_test = phi_train_test.real
286 phi_exp_train_test = phi_exp_train_test[np.newaxis, :, :]
288 K_s = degp_utils.rbf_kernel_predictions(
289 phi_train_test, phi_exp_train_test, predict_order, self.n_bases,
290 self.flattened_der_indices, self.powers,
291 return_deriv=return_deriv,
292 index=self.derivative_locations,
293 common_derivs=common_derivs,
294 powers_predict=self.powers_predict
295 )
297 # Compute posterior mean
298 f_mean = K_s.T @ alpha
300 # Denormalize predictions
301 if self.normalize:
302 if return_deriv:
303 f_mean = utils.transform_predictions(
304 f_mean, self.mu_y, self.sigma_y, self.sigmas_x,
305 common_derivs, X_test)
306 else:
307 f_mean = self.mu_y + f_mean * self.sigma_y
309 # Reshape predictions
310 f_mean = f_mean.reshape(-1, 1)
311 n = X_test.shape[0]
312 m = f_mean.shape[0]
313 num_derivs = m // n
314 reshaped_mean = f_mean.reshape(num_derivs, n)
316 if not calc_cov:
317 return reshaped_mean
319 # Compute test-test differences
320 diff_x_test_x_test = degp_utils.differences_by_dim_func(
321 X_test, X_test, predict_order, predict_oti, return_deriv=return_deriv
322 )
324 # Compute test-test kernel
325 phi_test_test = predict_kernel_func(diff_x_test_x_test, length_scales)
326 if predict_order > 0:
327 phi_exp_test_test = phi_test_test.get_all_derivs(self.n_bases, 2 * predict_order)
328 else:
329 phi_exp_test_test = phi_test_test.real
330 phi_exp_test_test = phi_exp_test_test[np.newaxis,:,:]
332 K_ss = degp_utils.rbf_kernel_predictions(
333 phi_test_test, phi_exp_test_test, predict_order, self.n_bases,
334 self.flattened_der_indices, self.powers,
335 return_deriv=return_deriv,
336 index=derivative_locations_test,
337 common_derivs=common_derivs,
338 calc_cov=True,
339 powers_predict=self.powers_predict
340 )
342 # Compute predictive covariance
343 if cho_solve_failed:
344 f_cov = K_ss - K_s.T @ np.linalg.inv(K) @ K_s
345 else:
346 v = solve_triangular(L, K_s, lower=low)
347 f_cov = K_ss - v.T @ v
349 # Transform covariance
350 if self.normalize:
351 if return_deriv:
352 f_var = utils.transform_cov(
353 f_cov, self.sigma_y, self.sigmas_x,
354 common_derivs, X_test)
355 else:
356 f_var = self.sigma_y ** 2 * np.diag(np.abs(f_cov))
357 else:
358 f_var = np.diag(np.abs(f_cov))
360 reshaped_var = f_var.reshape(num_derivs, n)
361 return reshaped_mean, reshaped_var