Coverage for jetgp/full_degp/degp.py: 96%

131 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-04-02 14:19 -0500

1import numpy as np 

2from numpy.linalg import cholesky, solve 

3from scipy.linalg import cho_solve, cho_factor, solve_triangular 

4from jetgp.full_degp import degp_utils 

5import jetgp.utils as utils 

6from jetgp.kernel_funcs.kernel_funcs import KernelFactory, get_oti_module 

7from jetgp.full_degp.optimizer import Optimizer 

8 

9 

10class degp: 

11 """ 

12 Derivative-Enhanced Gaussian Process (DEGP) model. 

13 

14 Supports coordinate-aligned partial derivatives, hypercomplex representation, 

15 and automatic normalization. Includes methods for training, prediction, 

16 and uncertainty quantification using kernel methods. 

17 

18 Parameters 

19 ---------- 

20 x_train : ndarray 

21 Training input data of shape (n_samples, n_features). 

22 y_train : list or ndarray 

23 Training targets or list of partial derivatives. 

24 n_order : int 

25 Maximum derivative order. 

26 n_bases : int 

27 Number of input dimensions. 

28 der_indices : list of lists 

29 Derivative multi-indices corresponding to each derivative term. 

30 derivative_locations : list of lists 

31 Which training points have which derivatives. 

32 normalize : bool, default=True 

33 Whether to normalize inputs and outputs. 

34 sigma_data : float or array-like, optional 

35 Observation noise standard deviation or diagonal noise values. 

36 kernel : str, default='SE' 

37 Kernel type ('SE', 'RQ', 'Matern', 'SI', etc.). 

38 kernel_type : str, default='anisotropic' 

39 Kernel anisotropy ('anisotropic' or 'isotropic'). 

40 smoothness_parameter : float, optional 

41 Smoothness parameter for Matern kernel. 

42 """ 

43 

44 def __init__( 

45 self, 

46 x_train, 

47 y_train, 

48 n_order, 

49 n_bases, 

50 der_indices, 

51 derivative_locations=None, 

52 normalize=True, 

53 sigma_data=None, 

54 kernel="SE", 

55 kernel_type="anisotropic", 

56 smoothness_parameter=None 

57 ): 

58 if n_order > 0 and derivative_locations is None: 

59 import warnings 

60 # Count total number of derivative components across all orders 

61 n_derivs = sum(len(order_derivs) for order_derivs in der_indices) 

62 n_train = len(x_train) 

63 derivative_locations = [[i for i in range(n_train)] for _ in range(n_derivs)] 

64 warnings.warn( 

65 f"derivative_locations not provided. Assuming all {n_derivs} derivative(s) " 

66 f"are available at all {n_train} training point(s).", 

67 UserWarning 

68 ) 

69 

70 elif der_indices is None and n_order == 0: 

71 der_indices = [] 

72 derivative_locations = [] 

73 self.n_order = n_order 

74 self.n_bases = n_bases 

75 self.dim = x_train.shape[1] 

76 self.num_points = x_train.shape[0] 

77 self.kernel = kernel 

78 self.kernel_type = kernel_type 

79 self.der_indices = der_indices 

80 self.normalize = normalize 

81 self.derivative_locations = derivative_locations 

82 self.oti = get_oti_module(n_bases, n_order) 

83 self.y_train_input = y_train 

84 self.x_train_input = x_train 

85 

86 # Prepare indices and powers 

87 self.flattened_der_indices = utils.flatten_der_indices(der_indices) 

88 self.powers = utils.build_companion_array(n_bases, n_order, der_indices) 

89 

90 # Normalize data if required 

91 if normalize: 

92 ( 

93 self.y_train, 

94 self.mu_y, 

95 self.sigma_y, 

96 self.sigmas_x, 

97 self.mus_x, 

98 sigma_data, 

99 ) = utils.normalize_y_data( 

100 x_train, y_train, sigma_data, self.flattened_der_indices 

101 ) 

102 self.x_train = utils.normalize_x_data_train(x_train) 

103 else: 

104 self.x_train = x_train 

105 self.y_train = utils.reshape_y_train(y_train) 

106 

107 # Compute differences for the kernel 

108 # if kernel == 'SI': 

109 # self.differences_by_dim = degp_utils.differences_by_dim_func_SI( 

110 # self.x_train, self.x_train, n_order 

111 # ) 

112 # else: 

113 self.differences_by_dim = degp_utils.differences_by_dim_func( 

114 self.x_train, self.x_train, n_order, self.oti 

115 ) 

116 

117 # Initialize noise matrix 

118 self.sigma_data = ( 

119 np.zeros((self.y_train.shape[0], self.y_train.shape[0])) 

120 if sigma_data is None 

121 else np.diag(sigma_data) 

122 ) 

123 

124 # Initialize kernel factory and optimizer 

125 self.kernel_factory = KernelFactory( 

126 dim=n_bases, 

127 normalize=normalize, 

128 differences_by_dim=self.differences_by_dim, 

129 n_order=n_order, 

130 smoothness_parameter=smoothness_parameter, 

131 oti_module=self.oti 

132 ) 

133 self.kernel_func = self.kernel_factory.create_kernel( 

134 kernel_name=self.kernel, kernel_type=self.kernel_type 

135 ) 

136 self.bounds = self.kernel_factory.bounds 

137 self.optimizer = Optimizer(self) 

138 

139 def optimize_hyperparameters(self, *args, **kwargs): 

140 """ 

141 Optimize model hyperparameters using the optimizer. 

142 Returns optimized hyperparameter vector. 

143 """ 

144 self.params = self.optimizer.optimize_hyperparameters(*args, **kwargs) 

145 return self.params 

146 

147 def predict(self, X_test, params, calc_cov=False, return_deriv=False, derivs_to_predict=None): 

148 """ 

149 Compute posterior predictive mean and (optionally) covariance at X_test. 

150 

151 Parameters 

152 ---------- 

153 X_test : ndarray 

154 Test input points of shape (n_test, n_features). 

155 params : ndarray 

156 Log-scaled kernel hyperparameters. 

157 calc_cov : bool, default=False 

158 Whether to compute predictive variance. 

159 return_deriv : bool, default=False 

160 Whether to return derivative predictions. 

161 derivs_to_predict : list, optional 

162 Specific derivatives to predict. Can include derivatives not present in the 

163 training set — the cross-covariance K_* is constructed from kernel derivatives 

164 and does not require the requested derivative to have been observed during 

165 training. Each entry must be a valid derivative spec within n_bases and n_order 

166 (e.g. ``[[3, 1]]`` for df/dx3 in a first-order model). 

167 If None, defaults to all derivatives used in training. 

168 

169 Returns 

170 ------- 

171 f_mean : ndarray 

172 Predictive mean vector. 

173 f_var : ndarray, optional 

174 Predictive variance vector (only if calc_cov=True). 

175 """ 

176 length_scales = params[:-1] 

177 sigma_n = params[-1] 

178 

179 # Set up derivative prediction configuration 

180 if return_deriv: 

181 if derivs_to_predict is not None: 

182 common_derivs = derivs_to_predict 

183 else: 

184 if self.n_order == 0: 

185 raise ValueError( 

186 "derivs_to_predict must be provided when predicting derivatives " 

187 "from a model trained with n_order=0 (no derivative training data)." 

188 ) 

189 common_derivs = self.flattened_der_indices 

190 print( 

191 f"Note: derivs_to_predict is None. Predictions will include all derivatives " 

192 f"used in training: {self.flattened_der_indices}" 

193 ) 

194 

195 # Determine prediction order from requested derivatives 

196 required_order = max( 

197 sum(pair[1] for pair in deriv_spec) 

198 for deriv_spec in common_derivs 

199 ) 

200 predict_order = max(required_order, self.n_order) 

201 

202 if predict_order > self.n_order: 

203 predict_oti = get_oti_module(self.n_bases, predict_order) 

204 smoothness_param = getattr(self.kernel_factory, 'alpha', None) 

205 predict_kernel_factory = KernelFactory( 

206 dim=self.n_bases, 

207 normalize=self.normalize, 

208 differences_by_dim=self.differences_by_dim, 

209 n_order=predict_order, 

210 smoothness_parameter=smoothness_param, 

211 oti_module=predict_oti 

212 ) 

213 predict_kernel_func = predict_kernel_factory.create_kernel( 

214 kernel_name=self.kernel, kernel_type=self.kernel_type 

215 ) 

216 else: 

217 predict_oti = self.oti 

218 predict_kernel_func = self.kernel_func 

219 

220 self.powers_predict = utils.build_companion_array_predict( 

221 self.n_bases, predict_order, common_derivs) 

222 else: 

223 common_derivs = [] 

224 self.powers_predict = None 

225 predict_order = self.n_order 

226 predict_oti = self.oti 

227 predict_kernel_func = self.kernel_func 

228 

229 # Build training kernel matrix 

230 phi_train = self.kernel_func(self.differences_by_dim, length_scales) 

231 

232 if self.n_order > 0: 

233 phi_exp_train = phi_train.get_all_derivs(self.n_bases, 2 * self.n_order) 

234 else: 

235 phi_exp_train = phi_train.real 

236 phi_exp_train = phi_exp_train[np.newaxis, :, :] 

237 

238 K = degp_utils.rbf_kernel( 

239 phi_train, phi_exp_train, self.n_order, self.n_bases, 

240 self.flattened_der_indices, self.powers, 

241 index=self.derivative_locations 

242 ) 

243 K += (10 ** sigma_n) ** 2 * np.eye(K.shape[0]) 

244 K += self.sigma_data ** 2 

245 

246 # Solve linear system 

247 try: 

248 cho_solve_failed = False 

249 L, low = cho_factor(K, lower=True) 

250 alpha = cho_solve((L, low), self.y_train) 

251 except: 

252 cho_solve_failed = True 

253 alpha = np.linalg.solve(K, self.y_train) 

254 print('Warning: Cholesky decomposition failed via scipy, using standard np solve instead.') 

255 

256 # Normalize test inputs 

257 if self.normalize: 

258 X_test = utils.normalize_x_data_test(X_test, self.sigmas_x, self.mus_x) 

259 

260 # Set up test derivative locations 

261 if return_deriv: 

262 derivative_locations_test = [ 

263 list(range(X_test.shape[0])) for _ in range(len(common_derivs))] 

264 else: 

265 derivative_locations_test = None 

266 

267 # Compute train-test differences 

268 # if self.kernel == 'SI': 

269 # diff_x_test_x_train = degp_utils.differences_by_dim_func_SI( 

270 # self.x_train, X_test, self.n_order, return_deriv=return_deriv 

271 # ) 

272 # else: 

273 diff_x_test_x_train = degp_utils.differences_by_dim_func( 

274 self.x_train, X_test, predict_order, predict_oti, return_deriv=return_deriv 

275 ) 

276 

277 # Compute train-test kernel 

278 phi_train_test = predict_kernel_func(diff_x_test_x_train, length_scales) 

279 if predict_order > 0: 

280 if return_deriv: 

281 phi_exp_train_test = phi_train_test.get_all_derivs(self.n_bases, 2 * predict_order) 

282 else: 

283 phi_exp_train_test = phi_train_test.get_all_derivs(self.n_bases, predict_order) 

284 else: 

285 phi_exp_train_test = phi_train_test.real 

286 phi_exp_train_test = phi_exp_train_test[np.newaxis, :, :] 

287 

288 K_s = degp_utils.rbf_kernel_predictions( 

289 phi_train_test, phi_exp_train_test, predict_order, self.n_bases, 

290 self.flattened_der_indices, self.powers, 

291 return_deriv=return_deriv, 

292 index=self.derivative_locations, 

293 common_derivs=common_derivs, 

294 powers_predict=self.powers_predict 

295 ) 

296 

297 # Compute posterior mean 

298 f_mean = K_s.T @ alpha 

299 

300 # Denormalize predictions 

301 if self.normalize: 

302 if return_deriv: 

303 f_mean = utils.transform_predictions( 

304 f_mean, self.mu_y, self.sigma_y, self.sigmas_x, 

305 common_derivs, X_test) 

306 else: 

307 f_mean = self.mu_y + f_mean * self.sigma_y 

308 

309 # Reshape predictions 

310 f_mean = f_mean.reshape(-1, 1) 

311 n = X_test.shape[0] 

312 m = f_mean.shape[0] 

313 num_derivs = m // n 

314 reshaped_mean = f_mean.reshape(num_derivs, n) 

315 

316 if not calc_cov: 

317 return reshaped_mean 

318 

319 # Compute test-test differences 

320 diff_x_test_x_test = degp_utils.differences_by_dim_func( 

321 X_test, X_test, predict_order, predict_oti, return_deriv=return_deriv 

322 ) 

323 

324 # Compute test-test kernel 

325 phi_test_test = predict_kernel_func(diff_x_test_x_test, length_scales) 

326 if predict_order > 0: 

327 phi_exp_test_test = phi_test_test.get_all_derivs(self.n_bases, 2 * predict_order) 

328 else: 

329 phi_exp_test_test = phi_test_test.real 

330 phi_exp_test_test = phi_exp_test_test[np.newaxis,:,:] 

331 

332 K_ss = degp_utils.rbf_kernel_predictions( 

333 phi_test_test, phi_exp_test_test, predict_order, self.n_bases, 

334 self.flattened_der_indices, self.powers, 

335 return_deriv=return_deriv, 

336 index=derivative_locations_test, 

337 common_derivs=common_derivs, 

338 calc_cov=True, 

339 powers_predict=self.powers_predict 

340 ) 

341 

342 # Compute predictive covariance 

343 if cho_solve_failed: 

344 f_cov = K_ss - K_s.T @ np.linalg.inv(K) @ K_s 

345 else: 

346 v = solve_triangular(L, K_s, lower=low) 

347 f_cov = K_ss - v.T @ v 

348 

349 # Transform covariance 

350 if self.normalize: 

351 if return_deriv: 

352 f_var = utils.transform_cov( 

353 f_cov, self.sigma_y, self.sigmas_x, 

354 common_derivs, X_test) 

355 else: 

356 f_var = self.sigma_y ** 2 * np.diag(np.abs(f_cov)) 

357 else: 

358 f_var = np.diag(np.abs(f_cov)) 

359 

360 reshaped_var = f_var.reshape(num_derivs, n) 

361 return reshaped_mean, reshaped_var