Coverage for jetgp/full_ddegp/ddegp.py: 90%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-31 11:46 -0500

1import numpy as np 

2from numpy.linalg import cholesky, solve 

3import jetgp.utils as utils 

4from jetgp.kernel_funcs.kernel_funcs import KernelFactory, get_oti_module 

5from jetgp.full_ddegp.optimizer import Optimizer 

6from jetgp.full_ddegp import ddegp_utils 

7from scipy.linalg import cho_solve, cho_factor, solve_triangular 

8 

9 

10class ddegp: 

11 """ 

12 Directional Derivative-Enhanced Gaussian Process (dDEGP) model. 

13 

14 Supports multiple directional derivatives, hypercomplex representation, 

15 and automatic normalization. Includes methods for training, prediction, 

16 and uncertainty quantification using kernel methods. 

17 

18 Parameters 

19 ---------- 

20 x_train : ndarray 

21 Training input data of shape (n_samples, n_features). 

22 y_train : list or ndarray 

23 Training targets or list of directional derivatives. 

24 n_order : int 

25 Maximum derivative order. 

26 der_indices : list of lists 

27 Derivative multi-indices corresponding to each derivative term. 

28 rays : ndarray 

29 Array of shape (d, n_rays), where each column is a direction vector. 

30 **Important:** ``rays`` defines the OTI space dimension used internally 

31 (``n_rays = rays.shape[1]``). Every direction you may ever want to 

32 predict — including directions for which no training data exists — must 

33 appear as a column here. A direction absent from ``rays`` cannot be 

34 requested via ``derivs_to_predict`` at prediction time. 

35 derivative_locations : list of lists 

36 Which training points have which derivatives. 

37 normalize : bool, default=True 

38 Whether to normalize inputs and outputs. 

39 sigma_data : float or array-like, optional 

40 Observation noise standard deviation or diagonal noise values. 

41 kernel : str, default='SE' 

42 Kernel type ('SE', 'RQ', 'Matern', etc.). 

43 kernel_type : str, default='anisotropic' 

44 Kernel anisotropy ('anisotropic' or 'isotropic'). 

45 smoothness_parameter : float, optional 

46 Smoothness parameter for Matern kernel. 

47 """ 

48 

49 def __init__(self, x_train, y_train, n_order, der_indices, rays, 

50 derivative_locations=None, normalize=True, sigma_data=None, 

51 kernel="SE", kernel_type="anisotropic", smoothness_parameter=None): 

52 

53 if n_order > 0 and derivative_locations is None: 

54 import warnings 

55 # Count total number of derivative components across all orders 

56 n_derivs = sum(len(order_derivs) for order_derivs in der_indices) 

57 n_train = len(x_train) 

58 derivative_locations = [[i for i in range(n_train)] for _ in range(n_derivs)] 

59 warnings.warn( 

60 f"derivative_locations not provided. Assuming all {n_derivs} derivative(s) " 

61 f"are available at all {n_train} training point(s).", 

62 UserWarning 

63 ) 

64 

65 elif der_indices is None and n_order == 0: 

66 der_indices = [] 

67 derivative_locations = [] 

68 

69 self.x_train = x_train 

70 self.y_train = y_train 

71 self.sigma_data = sigma_data 

72 self.n_order = n_order 

73 self.rays = rays 

74 self.n_rays = rays.shape[1] 

75 self.dim = x_train.shape[1] 

76 self.kernel = kernel 

77 self.kernel_type = kernel_type 

78 self.der_indices = der_indices 

79 self.normalize = normalize 

80 self.derivative_locations = derivative_locations 

81 self.oti = get_oti_module(self.n_rays, n_order) 

82 

83 self.flattened_der_indices = utils.flatten_der_indices(der_indices) 

84 

85 if normalize: 

86 self.y_train, self.mu_y, self.sigma_y, self.sigmas_x, self.mus_x, sigma_data = \ 

87 utils.normalize_y_data_directional( 

88 x_train, y_train, sigma_data, self.flattened_der_indices) 

89 self.rays = utils.normalize_directions(self.sigmas_x, self.rays) 

90 self.x_train = utils.normalize_x_data_train(x_train) 

91 else: 

92 self.x_train = x_train 

93 self.y_train = utils.reshape_y_train(y_train) 

94 

95 self.powers = utils.build_companion_array(self.n_rays, n_order, der_indices) 

96 self.differences_by_dim = ddegp_utils.differences_by_dim_func( 

97 self.x_train, self.x_train, self.rays, n_order, self.oti) 

98 

99 self.sigma_data = ( 

100 np.zeros((self.y_train.shape[0], self.y_train.shape[0])) 

101 if sigma_data is None else 10 * np.diag(sigma_data) 

102 ) 

103 

104 self.kernel_factory = KernelFactory( 

105 dim=self.dim, 

106 normalize=self.normalize, 

107 n_order=self.n_order, 

108 differences_by_dim=self.differences_by_dim, 

109 smoothness_parameter=smoothness_parameter, 

110 oti_module=self.oti 

111 ) 

112 self.kernel_func = self.kernel_factory.create_kernel( 

113 kernel_name=self.kernel, 

114 kernel_type=self.kernel_type 

115 ) 

116 self.bounds = self.kernel_factory.bounds 

117 self.optimizer = Optimizer(self) 

118 

119 def optimize_hyperparameters(self, *args, **kwargs): 

120 """ 

121 Run the optimizer to find the best kernel hyperparameters. 

122 Returns optimized hyperparameter vector. 

123 """ 

124 return self.optimizer.optimize_hyperparameters(*args, **kwargs) 

125 

126 def predict(self, X_test, params, calc_cov=False, return_deriv=False, derivs_to_predict=None): 

127 """ 

128 Predict posterior mean and optional variance at test points. 

129 

130 Parameters 

131 ---------- 

132 X_test : ndarray 

133 Test input points of shape (n_test, n_features). 

134 params : ndarray 

135 Log-scaled kernel hyperparameters. 

136 calc_cov : bool, default=False 

137 Whether to compute predictive variance. 

138 return_deriv : bool, default=False 

139 Whether to return derivative predictions. 

140 derivs_to_predict : list, optional 

141 Specific derivatives to predict. Can include derivatives not present in the 

142 training set — the cross-covariance K_* is constructed from kernel derivatives 

143 and does not require the requested derivative to have been observed during 

144 training. Each entry must be a valid derivative spec within n_rays and n_order. 

145 If None, defaults to all derivatives used in training. 

146 

147 **DDEGP-specific constraint:** each index must reference a ray that exists 

148 in the ``rays`` array passed at construction. For example, ``[[4, 1]]`` 

149 requires ``rays`` to have at least 4 columns. Unlike DEGP — where the OTI 

150 space always spans the fixed coordinate axes — the DDEGP OTI space is 

151 spanned by the columns of ``rays``, so any direction not included there is 

152 inaccessible at prediction time. 

153 

154 Returns 

155 ------- 

156 f_mean : ndarray 

157 Predictive mean vector. 

158 f_var : ndarray, optional 

159 Predictive variance vector (only if calc_cov=True). 

160 """ 

161 length_scales = params[:-1] 

162 sigma_n = params[-1] 

163 

164 # Set up derivative prediction configuration 

165 if return_deriv: 

166 if derivs_to_predict is not None: 

167 common_derivs = derivs_to_predict 

168 else: 

169 if self.n_order == 0: 

170 raise ValueError( 

171 "derivs_to_predict must be provided when predicting derivatives " 

172 "from a model trained with n_order=0 (no derivative training data)." 

173 ) 

174 common_derivs = self.flattened_der_indices 

175 print( 

176 f"Note: derivs_to_predict is None. Predictions will include all derivatives " 

177 f"used in training: {self.flattened_der_indices}" 

178 ) 

179 

180 # Determine prediction order from requested derivatives 

181 required_order = max( 

182 sum(pair[1] for pair in deriv_spec) 

183 for deriv_spec in common_derivs 

184 ) 

185 predict_order = max(required_order, self.n_order) 

186 

187 if predict_order > self.n_order: 

188 predict_oti = get_oti_module(self.n_rays, predict_order) 

189 smoothness_param = getattr(self.kernel_factory, 'alpha', None) 

190 predict_kernel_factory = KernelFactory( 

191 dim=self.dim, 

192 normalize=self.normalize, 

193 differences_by_dim=self.differences_by_dim, 

194 n_order=predict_order, 

195 smoothness_parameter=smoothness_param, 

196 oti_module=predict_oti 

197 ) 

198 predict_kernel_func = predict_kernel_factory.create_kernel( 

199 kernel_name=self.kernel, kernel_type=self.kernel_type 

200 ) 

201 else: 

202 predict_oti = self.oti 

203 predict_kernel_func = self.kernel_func 

204 

205 self.powers_predict = utils.build_companion_array_predict( 

206 self.n_rays, predict_order, common_derivs) 

207 else: 

208 common_derivs = [] 

209 self.powers_predict = None 

210 predict_order = self.n_order 

211 predict_oti = self.oti 

212 predict_kernel_func = self.kernel_func 

213 

214 # Build training kernel matrix 

215 phi_train = self.kernel_func(self.differences_by_dim, length_scales) 

216 self.n_bases_rays = phi_train.get_active_bases()[-1] 

217 if self.n_order > 0: 

218 phi_exp_train = phi_train.get_all_derivs(self.n_bases_rays, 2 * self.n_order) 

219 else: 

220 phi_exp_train = phi_train.real 

221 phi_exp_train = phi_exp_train[np.newaxis, :, :] 

222 

223 K = ddegp_utils.rbf_kernel( 

224 phi_train, phi_exp_train, self.n_order, self.n_bases_rays, 

225 self.flattened_der_indices, self.powers, 

226 index=self.derivative_locations 

227 ) 

228 K += (10 ** sigma_n) ** 2 * np.eye(K.shape[0]) 

229 K += self.sigma_data ** 2 

230 

231 # Solve linear system 

232 try: 

233 cho_solve_failed = False 

234 L, low = cho_factor(K, lower=True) 

235 alpha = cho_solve((L, low), self.y_train) 

236 except: 

237 cho_solve_failed = True 

238 alpha = np.linalg.solve(K, self.y_train) 

239 print('Warning: Cholesky decomposition failed via scipy, using standard np solve instead.') 

240 

241 # Normalize test inputs 

242 if self.normalize: 

243 X_test = utils.normalize_x_data_test(X_test, self.sigmas_x, self.mus_x) 

244 

245 # Set up test derivative locations 

246 if return_deriv: 

247 derivative_locations_test = [ 

248 list(range(X_test.shape[0])) for _ in range(len(common_derivs))] 

249 else: 

250 derivative_locations_test = None 

251 

252 # Compute train-test differences and kernel 

253 diff_x_test_x_train = ddegp_utils.differences_by_dim_func( 

254 self.x_train, X_test, self.rays, predict_order, predict_oti, return_deriv=return_deriv) 

255 

256 phi_train_test = predict_kernel_func(diff_x_test_x_train, length_scales) 

257 if predict_order > 0: 

258 if return_deriv: 

259 phi_exp_train_test = phi_train_test.get_all_derivs(self.n_bases_rays, 2 * predict_order) 

260 else: 

261 phi_exp_train_test = phi_train_test.get_all_derivs(self.n_bases_rays, predict_order) 

262 else: 

263 phi_exp_train_test = phi_train_test.real 

264 phi_exp_train_test = phi_exp_train_test[np.newaxis, :, :] 

265 K_s = ddegp_utils.rbf_kernel_predictions( 

266 phi_train_test, phi_exp_train_test, predict_order, self.n_bases_rays, 

267 self.flattened_der_indices, self.powers, 

268 return_deriv=return_deriv, 

269 index=self.derivative_locations, 

270 common_derivs=common_derivs, 

271 powers_predict=self.powers_predict 

272 ) 

273 

274 f_mean = K_s.T @ alpha 

275 

276 # Denormalize predictions 

277 if self.normalize: 

278 if return_deriv: 

279 f_mean = utils.transform_predictions_directional( 

280 f_mean, self.mu_y, self.sigma_y, self.sigmas_x, 

281 common_derivs, X_test) 

282 else: 

283 f_mean = self.mu_y + f_mean * self.sigma_y 

284 

285 # Reshape predictions 

286 f_mean = f_mean.reshape(-1, 1) 

287 n = X_test.shape[0] 

288 m = f_mean.shape[0] 

289 num_derivs = m // n 

290 reshaped_mean = f_mean.reshape(num_derivs, n) 

291 

292 if not calc_cov: 

293 return reshaped_mean 

294 

295 # Compute test-test differences and kernel for covariance 

296 diff_x_test_x_test = ddegp_utils.differences_by_dim_func( 

297 X_test, X_test, self.rays, predict_order, predict_oti, return_deriv=return_deriv) 

298 

299 phi_test_test = predict_kernel_func(diff_x_test_x_test, length_scales) 

300 bases = phi_test_test.get_active_bases() 

301 n_bases = bases[-1] if len(bases) > 0 else 0 

302 

303 if predict_order > 0: 

304 phi_exp_test_test = phi_test_test.get_all_derivs(n_bases, 2 * predict_order) 

305 else: 

306 phi_exp_test_test = phi_test_test.real 

307 phi_exp_test_test = phi_exp_test_test [np.newaxis, :, :] 

308 K_ss = ddegp_utils.rbf_kernel_predictions( 

309 phi_test_test, phi_exp_test_test, predict_order, n_bases, 

310 self.flattened_der_indices, self.powers, 

311 return_deriv=return_deriv, 

312 index=derivative_locations_test, 

313 common_derivs=common_derivs, 

314 calc_cov=True, 

315 powers_predict=self.powers_predict 

316 ) 

317 

318 # Compute predictive covariance 

319 if cho_solve_failed: 

320 f_cov = K_ss - K_s.T @ np.linalg.inv(K) @ K_s 

321 else: 

322 v = solve_triangular(L, K_s, lower=low) 

323 f_cov = K_ss - v.T @ v 

324 

325 # Transform covariance 

326 if self.normalize: 

327 if return_deriv: 

328 f_var = utils.transform_cov_directional( 

329 f_cov, self.sigma_y, self.sigmas_x, 

330 common_derivs, X_test) 

331 else: 

332 f_var = self.sigma_y ** 2 * np.diag(np.abs(f_cov)) 

333 else: 

334 f_var = np.diag(np.abs(f_cov)) 

335 

336 reshaped_var = f_var.reshape(num_derivs, n) 

337 return reshaped_mean, reshaped_var