Coverage for jetgp/hyperparameter_optimizers/rprop.py: 93%

72 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-31 11:46 -0500

1import numpy as np 

2 

3def rprop(func, lb, ub, **kwargs): 

4 """ 

5 RProp optimizer - pure NumPy implementation for performance. 

6 Resilient backpropagation with adaptive step sizes per parameter. 

7 """ 

8 x0 = kwargs.pop("x0", None) 

9 num_restart_optimizer = kwargs.pop("n_restart_optimizer", 10) 

10 maxiter = kwargs.pop("maxiter", 1000) 

11 initial_step = kwargs.pop("learning_rate", 0.01) 

12 etas = kwargs.pop("etas", (0.5, 1.2)) # (etaminus, etaplus) 

13 step_sizes = kwargs.pop("step_sizes", (1e-6, 50)) # (min_step, max_step) 

14 ftol = kwargs.pop("ftol", 1e-8) 

15 gtol = kwargs.pop("gtol", 1e-8) 

16 debug = kwargs.pop("debug", False) 

17 disp = kwargs.pop("disp", False) 

18 

19 lb = np.asarray(lb, dtype=np.float64) 

20 ub = np.asarray(ub, dtype=np.float64) 

21 n_dim = len(lb) 

22 

23 eta_minus, eta_plus = etas 

24 min_step, max_step = step_sizes 

25 

26 def forward_gradient(f, x, f_x, h=1e-7): 

27 """ 

28 Forward differences - reuses f(x) from current evaluation. 

29 Only n function evaluations instead of 2n for central differences. 

30 """ 

31 grad = np.empty(n_dim) 

32 x_pert = x.copy() 

33 for i in range(n_dim): 

34 x_pert[i] += h 

35 f_plus = f(x_pert) 

36 if isinstance(f_plus, tuple): 

37 f_plus = f_plus[0] 

38 grad[i] = (f_plus - f_x) / h 

39 x_pert[i] = x[i] # Reset in-place 

40 return grad 

41 

42 best_x = None 

43 best_val = np.inf 

44 

45 for restart in range(num_restart_optimizer): 

46 # Initialize starting point 

47 if x0 is not None and restart == 0: 

48 x = np.array(x0, dtype=np.float64) 

49 else: 

50 x = np.random.uniform(lb, ub) 

51 

52 # RProp state variables (pure NumPy) 

53 step = np.full(n_dim, initial_step) # Per-parameter step sizes 

54 prev_grad = np.zeros(n_dim) # Previous gradient for sign comparison 

55 

56 prev_val = np.inf 

57 

58 for t in range(maxiter): 

59 # Evaluate function 

60 result = func(x) 

61 if isinstance(result, tuple): 

62 f_val, grad = result[0], result[1] 

63 else: 

64 f_val = result 

65 grad = forward_gradient(func, x, f_val) 

66 

67 # Convergence check 

68 grad_norm = np.linalg.norm(grad) 

69 if t > 0: 

70 f_diff = abs(prev_val - f_val) 

71 if f_diff < ftol and grad_norm < gtol: 

72 if disp: 

73 print(f"Converged at iteration {t}") 

74 break 

75 

76 prev_val = f_val 

77 

78 # RProp update (pure NumPy implementation) 

79 if t > 0: 

80 # Compute sign agreement: grad * prev_grad 

81 sign_product = grad * prev_grad 

82 

83 # Where signs agree (positive product): increase step 

84 increase_mask = sign_product > 0 

85 step[increase_mask] *= eta_plus 

86 

87 # Where signs disagree (negative product): decrease step 

88 decrease_mask = sign_product < 0 

89 step[decrease_mask] *= eta_minus 

90 

91 # For sign changes, zero out gradient to skip update 

92 # (iRProp- variant: don't update if sign changed) 

93 grad[decrease_mask] = 0.0 

94 

95 # Clamp step sizes 

96 np.clip(step, min_step, max_step, out=step) 

97 

98 # Parameter update: x = x - sign(grad) * step 

99 x -= np.sign(grad) * step 

100 

101 # Project onto bounds (in-place) 

102 np.clip(x, lb, ub, out=x) 

103 

104 # Store gradient for next iteration (only non-zeroed values) 

105 prev_grad = grad.copy() 

106 

107 if disp and t % 100 == 0: 

108 print(f"Iteration {t}: f(x) = {f_val:.6e}, ||grad|| = {grad_norm:.6e}") 

109 

110 # Final evaluation 

111 result = func(x) 

112 final_val = result[0] if isinstance(result, tuple) else result 

113 

114 if final_val < best_val: 

115 best_val = final_val 

116 best_x = x.copy() 

117 

118 if debug: 

119 print(f"[RProp] Restart {restart+1}/{num_restart_optimizer} -> best_val={best_val}") 

120 

121 return best_x, best_val