Coverage for jetgp/full_degp/acquisition_funcs.py: 0%

110 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-31 11:46 -0500

1# acquisition_functions.py 

2 

3import numpy as np 

4from jetgp.full_degp import degp_utils 

5import jetgp.utils as utils 

6from line_profiler import profile 

7from scipy.stats.qmc import Sobol 

8from scipy.optimize import minimize 

9def find_next_point_batch( 

10 gp, 

11 params, 

12 dist_params, 

13 acquisition_func, 

14 integration_points=None, 

15 candidate_points=None, 

16 n_candidate_points: int = 1024, 

17 n_local_starts: int = 1, 

18 n_batch_points: int = 10, 

19 local_opt = True, 

20 **acq_kwargs 

21): 

22 """ 

23 Batch active learning with support for aggregated mode (multi-GP). 

24 

25 If use_agg_al=False: single GP loop. 

26 If use_agg_al=True: use gp_list + params_list and acquisition_func must handle them. 

27 Returns: np.ndarray of selected points, shape (n_selected, d). 

28 """ 

29 

30 selected_points = [] 

31 n_integration_points = integration_points.shape[0] 

32 # helper to generate candidates 

33 def _generate_candidates(dimension, seed_offset=0): 

34 sampler = Sobol(d=dimension, scramble=True, seed=acq_kwargs.get('seed', 0) + seed_offset) 

35 coarse_points_unit = sampler.random(n=n_candidate_points) 

36 return utils.get_inverse(dist_params, coarse_points_unit) 

37 

38 if type(gp) is not list or type(params) is not list: 

39 raise ValueError("gp and params must be provided as list") 

40 

41 

42 

43 for n in range(n_batch_points): 

44 if candidate_points is None: 

45 candidate_points = _generate_candidates(gp[0].n_bases,seed_offset=n) 

46 # Normalize once before the loop 

47 cand_scores_all = [] 

48 

49 if gp[0].normalize: 

50 sigmas_x = gp[0].sigmas_x 

51 mus_x = gp[0].mus_x 

52 

53 candidate_points_norm = utils.normalize_x_data_test(candidate_points.copy(), sigmas_x, mus_x) 

54 if integration_points is not None: 

55 integration_points_norm = utils.normalize_x_data_test(integration_points.copy(), sigmas_x, mus_x) 

56 n_integration_points = integration_points.shape[0] 

57 else: 

58 candidate_points_norm = candidate_points.copy() 

59 integration_points_norm = integration_points.copy() 

60 n_integration_points = None 

61 

62 # Precompute all differences once 

63 precomputed_diffs = { 

64 'train_cand': degp_utils.differences_by_dim_func( 

65 gp[0].x_train, candidate_points_norm, gp[0].n_order, return_deriv=False 

66 ), 

67 'cand_cand': degp_utils.differences_by_dim_func( 

68 candidate_points_norm, candidate_points_norm, gp[0].n_order, return_deriv=False 

69 ), 

70 'domain_cand': degp_utils.differences_by_dim_func( 

71 integration_points_norm, candidate_points_norm, gp[0].n_order, return_deriv=False 

72 ), 

73 'train_domain': degp_utils.differences_by_dim_func( 

74 gp[0].x_train, integration_points_norm, gp[0].n_order, return_deriv=False 

75 ) 

76 } 

77 # Loop over time steps - much simpler now! 

78 for gp_t, params_t in zip(gp, params): 

79 scores_t = acquisition_func( 

80 gp_t, 

81 params_t, 

82 precomputed_diffs, 

83 n_integration_points = n_integration_points 

84 ) 

85 

86 

87 

88 cand_scores_all.append(scores_t.reshape(-1, 1)) 

89 

90 

91 cand_scores = np.mean(np.hstack(cand_scores_all), axis=1) 

92 # --- Pick top starting points from aggregated candidate scores --- 

93 top_indices = np.argsort(cand_scores.ravel())[-n_local_starts:] 

94 top_starting_points = candidate_points[top_indices] 

95 

96 if local_opt: 

97 # --- Get bounds for local optimization --- 

98 bounds = utils.get_optimization_bounds(dist_params) 

99 all_unbounded = all(b[0] is None for b in bounds) 

100 

101 # --- Define objective function for local refinement --- 

102 def objective_function(x: np.ndarray) -> float: 

103 """ 

104 Objective for local optimization.  

105 Note: Can't use precomputed diffs here since x is varying during optimization. 

106 """ 

107 x_cand = x.reshape(1, -1) 

108 

109 # Normalize the candidate point if needed 

110 if gp[0].normalize: 

111 x_cand_norm = utils.normalize_x_data_test(x_cand.copy(), sigmas_x, mus_x) 

112 else: 

113 x_cand_norm = x_cand.copy() 

114 

115 # Compute differences for this single candidate 

116 diff_train_cand = degp_utils.differences_by_dim_func( 

117 gp[0].x_train, x_cand_norm, gp[0].n_order, return_deriv=False 

118 ) 

119 diff_cand_cand = degp_utils.differences_by_dim_func( 

120 x_cand_norm, x_cand_norm, gp[0].n_order, return_deriv=False 

121 ) 

122 diff_domain_cand = degp_utils.differences_by_dim_func( 

123 integration_points_norm, x_cand_norm, gp[0].n_order, return_deriv=False 

124 ) 

125 

126 # Precomputed diffs for this candidate 

127 local_precomputed_diffs = { 

128 'train_cand': diff_train_cand, 

129 'cand_cand': diff_cand_cand, 

130 'domain_cand': diff_domain_cand, 

131 'train_domain': precomputed_diffs['train_domain'] # Reuse this one 

132 } 

133 

134 # Compute aggregated score across all GPs 

135 score_list = [ 

136 acquisition_func( 

137 gp_t, 

138 params_t, 

139 precomputed_diffs=local_precomputed_diffs, 

140 n_integration_points = n_integration_points 

141 ) 

142 for gp_t, params_t in zip(gp, params) 

143 ] 

144 return -np.mean([s[0] if isinstance(s, np.ndarray) else s for s in score_list]) 

145 

146 # --- Local optimization loop --- 

147 best_x, best_score = None, -np.inf 

148 for start_point in top_starting_points: 

149 if all_unbounded: 

150 res = minimize(fun=objective_function, x0=start_point, method="BFGS") 

151 else: 

152 res = minimize(fun=objective_function, x0=start_point, method="L-BFGS-B", bounds=bounds) 

153 

154 if -res.fun > best_score: 

155 best_score, best_x = -res.fun, res.x 

156 

157 best_x = best_x.reshape(1, -1) 

158 selected_points.append(best_x) 

159 else: 

160 top_index = np.argmax(cand_scores.ravel()) 

161 best_x = candidate_points[top_index].reshape(1, -1) 

162 selected_points.append(best_x) 

163 candidate_points = np.delete(candidate_points, top_index, axis=0) 

164 

165 # Augment each GP with predicted outputs at best_x 

166 for t, gp_t in enumerate(gp): 

167 params_t = params[t] 

168 y_pred = gp_t.predict(best_x, params_t, calc_cov=False, return_deriv=True) 

169 y_to_add = utils._format_y_to_add(y_pred) 

170 

171 x_aug = np.vstack([gp_t.x_train_input, best_x]) 

172 y_aug = [] 

173 for i_y, y_block in enumerate(gp_t.y_train_input): 

174 if i_y < len(y_to_add): 

175 y_aug.append(np.vstack([y_block, y_to_add[i_y]])) 

176 else: 

177 y_aug.append(y_block) 

178 

179 gp[t] = gp_t.__class__( 

180 x_aug, y_aug, 

181 gp_t.n_order, 

182 n_bases=gp_t.n_bases, 

183 der_indices=gp_t.der_indices, 

184 normalize=gp_t.normalize, 

185 kernel=gp_t.kernel, 

186 kernel_type=gp_t.kernel_type, 

187 ) 

188 

189 return np.vstack(selected_points) 

190 

191# def imse_reduction_verify(gp, params, x_train, y_train_list,der_indices, candidate_points, full_domain, noise_var=None): 

192# """ 

193# Brute-force IMSE verification using temporary DE-GP for each candidate point. 

194# Handles flattened y_train_candidates output from gp.predict correctly. 

195 

196# Args: 

197# gp: Original degp object. 

198# params: Kernel hyperparameters. 

199# y_train_list: List of arrays (function + derivatives) for the original training points. 

200# candidate_points: Candidate points to evaluate (n_candidates, d). 

201# full_domain: Domain points for IMSE computation (n_domain, d). 

202# noise_var: Optional noise variance for function observations. 

203 

204# Returns: 

205# imse_brute: Array of IMSE reductions for each candidate. 

206# """ 

207# n_candidates = candidate_points.shape[0] 

208# n_domain = full_domain.shape[0] 

209# n_derivs = len(y_train_list) - 1 # number of derivatives 

210# imse_brute = np.zeros(n_candidates) 

211 

212# # 1. Original GP posterior variance at full_domain (function values only) 

213# _, sigma2_orig = gp.predict( 

214# full_domain, params, calc_cov=True, return_deriv=False) 

215# sigma2_orig = np.maximum(sigma2_orig, 0.0) 

216 

217# # 2. Predict all function + derivatives over full domain 

218# y_train_candidates_flat = gp.predict( 

219# full_domain, params, calc_cov=False, return_deriv=True) 

220 

221# for i_cand, x_cand in enumerate(candidate_points): 

222# # 3. Extract candidate outputs from the flattened array 

223# y_cand_list = [] 

224# for k in range(n_derivs + 1): # include function 

225# start = k * n_domain + i_cand 

226# end = start + 1 

227# y_cand_list.append(y_train_candidates_flat[start:end]) 

228 

229# # 4. Construct temporary training outputs (list of arrays) 

230# y_train_temp = [] 

231# for j in range(len(y_train_list)): 

232# y_train_temp.append(np.vstack([y_train_list[j], y_cand_list[j]])) 

233 

234# # 5. Construct augmented training inputs 

235# X_train_temp = np.vstack([x_train.copy(), x_cand]) 

236 

237# # 6. Initialize temporary GP with augmented training data 

238# gp_temp = degp( 

239# X_train_temp, y_train_temp, gp.n_order, gp.n_bases, 

240# normalize=gp.normalize, 

241# kernel=gp.kernel, 

242# der_indices=gp.der_indices, 

243# kernel_type=gp.kernel_type 

244# ) 

245 

246# # 7. Predict posterior variance at full domain (function values only) 

247# _, sigma2_new = gp_temp.predict( 

248# full_domain, params, calc_cov=True, return_deriv=False) 

249# sigma2_new = np.maximum(sigma2_new, 0.0) 

250 

251# # 8. Compute IMSE reduction for this candidate 

252# imse_brute[i_cand] = np.mean((sigma2_orig - sigma2_new)) 

253# return imse_brute 

254 

255@profile 

256def mse_reduction( 

257 gp, params, precomputed_diffs, 

258 noise_var=None, return_deriv=False, **kwargs 

259): 

260 """ 

261 Compute MSE reduction (posterior variance) at candidate points. 

262  

263 Args: 

264 gp: GP object 

265 params: Kernel hyperparameters 

266 x_train: Training inputs 

267 y_train_list: Training outputs 

268 candidate_points: Normalized candidate points to evaluate 

269 precomputed_diffs: Dict with precomputed difference matrices: 

270 - 'train_cand': differences between training and candidate points 

271 - 'cand_cand': differences between candidate points 

272 noise_var: Observation noise variance 

273 return_deriv: Whether to return derivatives 

274 normalize: Whether to normalize y data (x data assumed already normalized) 

275  

276 Returns: 

277 1D array of posterior variances at candidate points 

278 """ 

279 

280 

281 

282 length_scales = params[:-1] 

283 sigma_n = params[-1] 

284 

285 # Training kernel 

286 diff_train_train = gp.differences_by_dim 

287 K = degp_utils.rbf_kernel( 

288 diff_train_train, length_scales, gp.n_order, gp.n_bases, gp.kernel_func, 

289 gp.flattened_der_indicies, gp.powers, return_deriv=True 

290 ) 

291 K += (10**sigma_n) ** 2 * np.eye(K.shape[0]) 

292 

293 # Use precomputed differences 

294 diff_train_cand = precomputed_diffs['train_cand'] 

295 diff_cand_cand = precomputed_diffs['cand_cand'] 

296 

297 # Compute kernels from differences 

298 K_s = degp_utils.rbf_kernel( 

299 diff_train_cand, length_scales, gp.n_order, gp.n_bases, gp.kernel_func, 

300 gp.flattened_der_indicies, gp.powers, return_deriv=return_deriv 

301 ) 

302 

303 n = K_s.shape[1] 

304 

305 K_ss = degp_utils.rbf_kernel( 

306 diff_cand_cand, length_scales, gp.n_order, gp.n_bases, gp.kernel_func, 

307 gp.flattened_der_indicies, gp.powers, return_deriv=return_deriv 

308 ) 

309 

310 # Compute posterior covariance 

311 f_cov = ( 

312 K_ss - K_s.T @ np.linalg.solve(K, K_s) 

313 if return_deriv 

314 else K_ss[:n, :n] - K_s[:, :n].T @ np.linalg.solve(K, K_s[:, :n]) 

315 ) 

316 

317 return np.diag(f_cov) 

318 

319 

320def imse_reduction( 

321 gp, params, precomputed_diffs, 

322 noise_var=None, return_deriv=False, **kwargs 

323): 

324 """ 

325 Vectorized exact IMSE reduction with precomputed differences. 

326  

327 Args: 

328 gp: GP object 

329 params: Kernel hyperparameters 

330 x_train: Training inputs 

331 y_train_list: Training outputs 

332 candidate_points: Normalized candidate points to evaluate 

333 integration_points: Normalized integration points for IMSE 

334 precomputed_diffs: Dict with precomputed difference matrices: 

335 - 'train_domain': differences between training and integration points 

336 - 'train_cand': differences between training and candidate points 

337 - 'cand_cand': differences between candidate points 

338 - 'domain_cand': differences between integration and candidate points 

339 noise_var: Observation noise variance 

340 return_deriv: Whether to return derivatives 

341 normalize: Whether to normalize y data (x data assumed already normalized) 

342  

343 Returns: 

344 1D array of IMSE reductions for each candidate point 

345 """ 

346 

347 

348 

349 

350 length_scales = params[:-1] 

351 sigma_n = params[-1] 

352 n_integration_points = kwargs['n_integration_points'] 

353 # Training kernel 

354 diff_train_train = gp.differences_by_dim 

355 K = degp_utils.rbf_kernel( 

356 diff_train_train, length_scales, gp.n_order, gp.n_bases, gp.kernel_func, 

357 gp.flattened_der_indicies, gp.powers, return_deriv=True 

358 ) 

359 K += (10**sigma_n) ** 2 * np.eye(K.shape[0]) 

360 

361 # Use precomputed differences 

362 diff_train_domain = precomputed_diffs['train_domain'] 

363 diff_train_cand = precomputed_diffs['train_cand'] 

364 diff_cand_cand = precomputed_diffs['cand_cand'] 

365 diff_domain_cand = precomputed_diffs['domain_cand'] 

366 

367 # Compute kernels from differences 

368 K_s = degp_utils.rbf_kernel( 

369 diff_train_domain, length_scales, gp.n_order, gp.n_bases, gp.kernel_func, 

370 gp.flattened_der_indicies, gp.powers, return_deriv=return_deriv 

371 ) 

372 

373 K_train_cand = degp_utils.rbf_kernel( 

374 diff_train_cand, length_scales, gp.n_order, gp.n_bases, gp.kernel_func, 

375 gp.flattened_der_indicies, gp.powers, return_deriv=return_deriv 

376 ) 

377 

378 K_cand_cand = np.diag(degp_utils.rbf_kernel( 

379 diff_cand_cand, length_scales, gp.n_order, gp.n_bases, gp.kernel_func, 

380 gp.flattened_der_indicies, gp.powers, return_deriv=return_deriv 

381 )) 

382 

383 

384 K_domain_cand = degp_utils.rbf_kernel( diff_domain_cand, length_scales, gp.n_order, gp.n_bases, gp.kernel_func, gp.flattened_der_indicies, gp.powers, return_deriv=return_deriv ) 

385 # shape: (n_domain, n_candidates) # Solve K^{-1} @ K_train_cand for all candidates  

386 v = np.linalg.solve(K, K_train_cand) # shape: (n_train, n_candidates) # Numerator and denominator of variance reduction  

387 numerator = (K_domain_cand[:n_integration_points,:] - K_s.T @ v) ** 2 # shape: (n_domain, n_candidates)  

388 denominator = K_cand_cand- np.sum(K_train_cand * v, axis=0) # shape: (n_candidates,) denominator = np.maximum(denominator, 1e-16)  

389 variance_reduction = (numerator / denominator) # shape: (n_domain, n_candidates) # IMSE reduction for each candidate = mean over domain points 

390 imse_reductions = variance_reduction.mean(axis=0) # shape: (n_candidates,) 

391 return imse_reductions