optimization
1from optimization.base import Optimization 2from optimization.gradients import ( 3 FiniteDifferenceGradient, 4 FirstOrderGradient, 5 GaussSteinGradient, 6 GradientMethod, 7 SPSAGradient, 8 SteinDifferenceGradient, 9) 10from optimization.steps import STEP_RULE_ARMIJO, STEP_RULE_CONSTANT, STEP_RULES 11 12__all__ = [ 13 "Optimization", 14 "GradientMethod", 15 "FirstOrderGradient", 16 "FiniteDifferenceGradient", 17 "GaussSteinGradient", 18 "SPSAGradient", 19 "SteinDifferenceGradient", 20 "STEP_RULE_ARMIJO", 21 "STEP_RULE_CONSTANT", 22 "STEP_RULES", 23]
29class Optimization: 30 """SciPy L-BFGS-B optimizer for theta-level objectives with pluggable gradient methods.""" 31 32 def __init__( 33 self, 34 objective: Objective, 35 x_samples: np.ndarray, 36 gradient: Any, 37 *, 38 algorithm: str = STEP_RULE_LBFGSB, 39 t_steps: int, 40 n_grad_samples: int, 41 sigma: float, 42 perturbation_space: str = "theta", 43 step_size: float = 0.01, 44 batch_size: int | None = None, 45 true_grad_theta_fn: TrueThetaGradFn | None = None, 46 grad_norm_tol: float | None = None, 47 ftol: float | None = None, 48 step_reporter: "StepReporter | None" = None, 49 method_label: str | None = None, 50 rng: np.random.Generator | None = None, 51 minimize_fn: MinimizeFn = minimize, 52 ) -> None: 53 """Configure SciPy-based optimization over theta. 54 55 Args: 56 objective: Theta-level objective implementing value(theta, x_batch) and grad(theta, x_batch). 57 x_samples: State samples array, shape (n_samples, state_dim). 58 gradient: Gradient method object with setup() and theta_grad() methods. 59 algorithm: Optimization algorithm (default: 'l-bfgs-b'). 60 t_steps: Maximum number of optimization steps. 61 n_grad_samples: Number of gradient samples for zeroth-order methods. 62 sigma: Perturbation scale for zeroth-order methods. 63 perturbation_space: Space in which zeroth-order perturbations are applied: 64 ``"theta"`` (default) perturbs policy parameters directly; ``"u"`` perturbs 65 actions and maps back via chain rule (requires objective.policy). 66 step_size: Step size for gradient descent algorithms. 67 batch_size: Mini-batch size (None for full batch). 68 true_grad_theta_fn: Optional function for computing true theta gradients. 69 grad_norm_tol: Early stopping threshold on gradient norm. 70 ftol: SciPy function tolerance. 71 step_reporter: Optional reporter for per-step metrics. 72 method_label: Label for this optimization method. 73 rng: Random number generator. 74 minimize_fn: SciPy minimize function (for testing). 75 """ 76 if perturbation_space not in {"theta", "u"}: 77 raise ValueError("perturbation_space must be 'theta' or 'u'.") 78 self.objective = objective 79 self.gradient = gradient 80 self.perturbation_space = perturbation_space 81 self.algorithm = algorithm 82 self.t_steps = int(t_steps) 83 self.n_grad_samples = int(n_grad_samples) 84 self.sigma = float(sigma) 85 self.step_size = float(step_size) 86 self.batch_size = batch_size 87 self.true_grad_theta_fn = true_grad_theta_fn 88 self.grad_norm_tol = grad_norm_tol 89 self.ftol = ftol 90 self.step_reporter = step_reporter 91 self.method_label = method_label if method_label is not None else getattr(self.gradient, "name", "opt") 92 self.rng = rng if rng is not None else np.random.default_rng(0) 93 self._minimize_fn = minimize_fn 94 95 x_arr = np.asarray(x_samples, dtype=float) 96 if x_arr.ndim != 2: 97 raise ValueError("x_samples must be a 2D array.") 98 if x_arr.shape[0] < 1: 99 raise ValueError("x_samples must contain at least one sample.") 100 self.x_array = x_arr 101 self.n_total = self.x_array.shape[0] 102 self.batch_size_eff = self.n_total if batch_size is None else int(batch_size) 103 if self.batch_size_eff <= 0 or self.batch_size_eff > self.n_total: 104 raise ValueError("batch_size must be in [1, len(x_samples)].") 105 if self.grad_norm_tol is not None and self.grad_norm_tol <= 0.0: 106 raise ValueError("grad_norm_tol must be positive when provided.") 107 if self.ftol is not None and self.ftol <= 0.0: 108 raise ValueError("ftol must be positive when provided.") 109 if self.t_steps <= 0: 110 raise ValueError("t_steps must be positive.") 111 self._full_indices = np.arange(self.n_total, dtype=int) 112 113 def solve(self, theta_start: np.ndarray) -> tuple[np.ndarray, "OptimizationTrace"]: 114 """Run SciPy ``minimize`` and return final theta with trace.""" 115 theta0 = np.asarray(theta_start, dtype=float) 116 self.gradient.setup(self, theta0) 117 118 steps: list[int] = [] 119 u_values: list[float] = [] 120 values: list[float] = [] 121 u_grad_estimates: list[float] = [] 122 theta_grad_norms: list[float] = [] 123 true_theta_grad_norms: list[float] = [] 124 theta_values: list[np.ndarray] = [] 125 126 def value_fn(theta_vec: np.ndarray) -> float: 127 theta_arr = np.asarray(theta_vec, dtype=float) 128 indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices) 129 return objective_value_on_indices(self.objective, self.x_array, self.n_total, theta_arr, indices) 130 131 def grad_fn(theta_vec: np.ndarray) -> np.ndarray: 132 theta_arr = np.asarray(theta_vec, dtype=float) 133 indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices) 134 return np.asarray(self.gradient.theta_grad(self, theta_arr, indices), dtype=float) 135 136 def record(theta_vec: np.ndarray) -> None: 137 theta_arr = np.asarray(theta_vec, dtype=float) 138 indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices) 139 value = objective_value_on_indices(self.objective, self.x_array, self.n_total, theta_arr, indices) 140 grad_theta = np.asarray(self.gradient.theta_grad(self, theta_arr, indices), dtype=float) 141 theta_grad_norm = float(np.linalg.norm(grad_theta)) 142 143 true_theta_grad_norm = None 144 if self.true_grad_theta_fn is not None: 145 grad_true = np.asarray( 146 self.true_grad_theta_fn(theta_arr, x_batch(self.x_array, indices, self.n_total)), 147 dtype=float, 148 ) 149 true_theta_grad_norm = float(np.linalg.norm(grad_true)) 150 151 step = len(steps) 152 153 # Compute mean action if policy is available 154 policy = getattr(self.objective, "policy", None) 155 if policy is not None: 156 mean_u = _mean_action(policy, theta_arr, x_batch(self.x_array, indices, self.n_total)) 157 else: 158 mean_u = float("nan") 159 160 steps.append(step) 161 u_values.append(mean_u) 162 values.append(value) 163 u_grad_estimates.append(float("nan")) 164 theta_grad_norms.append(theta_grad_norm) 165 theta_values.append(theta_arr.copy()) 166 if true_theta_grad_norm is not None: 167 true_theta_grad_norms.append(true_theta_grad_norm) 168 if self.step_reporter is not None: 169 self.step_reporter.log_step(self.method_label, step, mean_u, value, theta_grad_norm) 170 171 record(theta0) 172 173 options: dict[str, float | int] = {"maxiter": int(self.t_steps)} 174 if self.grad_norm_tol is not None: 175 options["gtol"] = float(self.grad_norm_tol) 176 if self.ftol is not None: 177 options["ftol"] = float(self.ftol) 178 179 result = self._minimize_fn( 180 value_fn, 181 x0=theta0, 182 jac=grad_fn, 183 method=scipy_method(self.algorithm), 184 options=options, 185 callback=record, 186 ) 187 188 theta_final = np.asarray(result.x, dtype=float) 189 if not np.allclose(theta_final, theta_values[-1]): 190 record(theta_final) 191 192 from experiments.results import OptimizationTrace 193 194 trace = OptimizationTrace( 195 steps=steps, 196 u_values=u_values, 197 objective_values=values, 198 u_grad_estimates=u_grad_estimates, 199 u_true_gradients=None, 200 theta_grad_norms=theta_grad_norms, 201 true_theta_grad_norms=true_theta_grad_norms if true_theta_grad_norms else None, 202 theta_values=theta_values, 203 optimizer_status=int(result.status), 204 optimizer_message=str(result.message), 205 ) 206 return theta_final, trace
SciPy L-BFGS-B optimizer for theta-level objectives with pluggable gradient methods.
32 def __init__( 33 self, 34 objective: Objective, 35 x_samples: np.ndarray, 36 gradient: Any, 37 *, 38 algorithm: str = STEP_RULE_LBFGSB, 39 t_steps: int, 40 n_grad_samples: int, 41 sigma: float, 42 perturbation_space: str = "theta", 43 step_size: float = 0.01, 44 batch_size: int | None = None, 45 true_grad_theta_fn: TrueThetaGradFn | None = None, 46 grad_norm_tol: float | None = None, 47 ftol: float | None = None, 48 step_reporter: "StepReporter | None" = None, 49 method_label: str | None = None, 50 rng: np.random.Generator | None = None, 51 minimize_fn: MinimizeFn = minimize, 52 ) -> None: 53 """Configure SciPy-based optimization over theta. 54 55 Args: 56 objective: Theta-level objective implementing value(theta, x_batch) and grad(theta, x_batch). 57 x_samples: State samples array, shape (n_samples, state_dim). 58 gradient: Gradient method object with setup() and theta_grad() methods. 59 algorithm: Optimization algorithm (default: 'l-bfgs-b'). 60 t_steps: Maximum number of optimization steps. 61 n_grad_samples: Number of gradient samples for zeroth-order methods. 62 sigma: Perturbation scale for zeroth-order methods. 63 perturbation_space: Space in which zeroth-order perturbations are applied: 64 ``"theta"`` (default) perturbs policy parameters directly; ``"u"`` perturbs 65 actions and maps back via chain rule (requires objective.policy). 66 step_size: Step size for gradient descent algorithms. 67 batch_size: Mini-batch size (None for full batch). 68 true_grad_theta_fn: Optional function for computing true theta gradients. 69 grad_norm_tol: Early stopping threshold on gradient norm. 70 ftol: SciPy function tolerance. 71 step_reporter: Optional reporter for per-step metrics. 72 method_label: Label for this optimization method. 73 rng: Random number generator. 74 minimize_fn: SciPy minimize function (for testing). 75 """ 76 if perturbation_space not in {"theta", "u"}: 77 raise ValueError("perturbation_space must be 'theta' or 'u'.") 78 self.objective = objective 79 self.gradient = gradient 80 self.perturbation_space = perturbation_space 81 self.algorithm = algorithm 82 self.t_steps = int(t_steps) 83 self.n_grad_samples = int(n_grad_samples) 84 self.sigma = float(sigma) 85 self.step_size = float(step_size) 86 self.batch_size = batch_size 87 self.true_grad_theta_fn = true_grad_theta_fn 88 self.grad_norm_tol = grad_norm_tol 89 self.ftol = ftol 90 self.step_reporter = step_reporter 91 self.method_label = method_label if method_label is not None else getattr(self.gradient, "name", "opt") 92 self.rng = rng if rng is not None else np.random.default_rng(0) 93 self._minimize_fn = minimize_fn 94 95 x_arr = np.asarray(x_samples, dtype=float) 96 if x_arr.ndim != 2: 97 raise ValueError("x_samples must be a 2D array.") 98 if x_arr.shape[0] < 1: 99 raise ValueError("x_samples must contain at least one sample.") 100 self.x_array = x_arr 101 self.n_total = self.x_array.shape[0] 102 self.batch_size_eff = self.n_total if batch_size is None else int(batch_size) 103 if self.batch_size_eff <= 0 or self.batch_size_eff > self.n_total: 104 raise ValueError("batch_size must be in [1, len(x_samples)].") 105 if self.grad_norm_tol is not None and self.grad_norm_tol <= 0.0: 106 raise ValueError("grad_norm_tol must be positive when provided.") 107 if self.ftol is not None and self.ftol <= 0.0: 108 raise ValueError("ftol must be positive when provided.") 109 if self.t_steps <= 0: 110 raise ValueError("t_steps must be positive.") 111 self._full_indices = np.arange(self.n_total, dtype=int)
Configure SciPy-based optimization over theta.
Args:
objective: Theta-level objective implementing value(theta, x_batch) and grad(theta, x_batch).
x_samples: State samples array, shape (n_samples, state_dim).
gradient: Gradient method object with setup() and theta_grad() methods.
algorithm: Optimization algorithm (default: 'l-bfgs-b').
t_steps: Maximum number of optimization steps.
n_grad_samples: Number of gradient samples for zeroth-order methods.
sigma: Perturbation scale for zeroth-order methods.
perturbation_space: Space in which zeroth-order perturbations are applied:
"theta" (default) perturbs policy parameters directly; "u" perturbs
actions and maps back via chain rule (requires objective.policy).
step_size: Step size for gradient descent algorithms.
batch_size: Mini-batch size (None for full batch).
true_grad_theta_fn: Optional function for computing true theta gradients.
grad_norm_tol: Early stopping threshold on gradient norm.
ftol: SciPy function tolerance.
step_reporter: Optional reporter for per-step metrics.
method_label: Label for this optimization method.
rng: Random number generator.
minimize_fn: SciPy minimize function (for testing).
113 def solve(self, theta_start: np.ndarray) -> tuple[np.ndarray, "OptimizationTrace"]: 114 """Run SciPy ``minimize`` and return final theta with trace.""" 115 theta0 = np.asarray(theta_start, dtype=float) 116 self.gradient.setup(self, theta0) 117 118 steps: list[int] = [] 119 u_values: list[float] = [] 120 values: list[float] = [] 121 u_grad_estimates: list[float] = [] 122 theta_grad_norms: list[float] = [] 123 true_theta_grad_norms: list[float] = [] 124 theta_values: list[np.ndarray] = [] 125 126 def value_fn(theta_vec: np.ndarray) -> float: 127 theta_arr = np.asarray(theta_vec, dtype=float) 128 indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices) 129 return objective_value_on_indices(self.objective, self.x_array, self.n_total, theta_arr, indices) 130 131 def grad_fn(theta_vec: np.ndarray) -> np.ndarray: 132 theta_arr = np.asarray(theta_vec, dtype=float) 133 indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices) 134 return np.asarray(self.gradient.theta_grad(self, theta_arr, indices), dtype=float) 135 136 def record(theta_vec: np.ndarray) -> None: 137 theta_arr = np.asarray(theta_vec, dtype=float) 138 indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices) 139 value = objective_value_on_indices(self.objective, self.x_array, self.n_total, theta_arr, indices) 140 grad_theta = np.asarray(self.gradient.theta_grad(self, theta_arr, indices), dtype=float) 141 theta_grad_norm = float(np.linalg.norm(grad_theta)) 142 143 true_theta_grad_norm = None 144 if self.true_grad_theta_fn is not None: 145 grad_true = np.asarray( 146 self.true_grad_theta_fn(theta_arr, x_batch(self.x_array, indices, self.n_total)), 147 dtype=float, 148 ) 149 true_theta_grad_norm = float(np.linalg.norm(grad_true)) 150 151 step = len(steps) 152 153 # Compute mean action if policy is available 154 policy = getattr(self.objective, "policy", None) 155 if policy is not None: 156 mean_u = _mean_action(policy, theta_arr, x_batch(self.x_array, indices, self.n_total)) 157 else: 158 mean_u = float("nan") 159 160 steps.append(step) 161 u_values.append(mean_u) 162 values.append(value) 163 u_grad_estimates.append(float("nan")) 164 theta_grad_norms.append(theta_grad_norm) 165 theta_values.append(theta_arr.copy()) 166 if true_theta_grad_norm is not None: 167 true_theta_grad_norms.append(true_theta_grad_norm) 168 if self.step_reporter is not None: 169 self.step_reporter.log_step(self.method_label, step, mean_u, value, theta_grad_norm) 170 171 record(theta0) 172 173 options: dict[str, float | int] = {"maxiter": int(self.t_steps)} 174 if self.grad_norm_tol is not None: 175 options["gtol"] = float(self.grad_norm_tol) 176 if self.ftol is not None: 177 options["ftol"] = float(self.ftol) 178 179 result = self._minimize_fn( 180 value_fn, 181 x0=theta0, 182 jac=grad_fn, 183 method=scipy_method(self.algorithm), 184 options=options, 185 callback=record, 186 ) 187 188 theta_final = np.asarray(result.x, dtype=float) 189 if not np.allclose(theta_final, theta_values[-1]): 190 record(theta_final) 191 192 from experiments.results import OptimizationTrace 193 194 trace = OptimizationTrace( 195 steps=steps, 196 u_values=u_values, 197 objective_values=values, 198 u_grad_estimates=u_grad_estimates, 199 u_true_gradients=None, 200 theta_grad_norms=theta_grad_norms, 201 true_theta_grad_norms=true_theta_grad_norms if true_theta_grad_norms else None, 202 theta_values=theta_values, 203 optimizer_status=int(result.status), 204 optimizer_message=str(result.message), 205 ) 206 return theta_final, trace
Run SciPy minimize and return final theta with trace.
82class GradientMethod: 83 """Base interface for theta-gradient estimators used by the optimizer.""" 84 85 name = "gradient" 86 87 def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None: 88 del optimizer, theta0 89 90 def theta_grad( 91 self, 92 optimizer: "Optimization", 93 theta: np.ndarray, 94 indices: np.ndarray, 95 ) -> np.ndarray: 96 raise NotImplementedError
Base interface for theta-gradient estimators used by the optimizer.
104class FirstOrderGradient(GradientMethod): 105 """Exact theta-gradient: $$\\nabla_\\theta J$$ from ``objective.grad``.""" 106 107 name = "first-order" 108 109 def theta_grad( 110 self, 111 optimizer: "Optimization", 112 theta: np.ndarray, 113 indices: np.ndarray, 114 ) -> np.ndarray: 115 return objective_grad_on_indices( 116 optimizer.objective, 117 optimizer.x_array, 118 optimizer.n_total, 119 theta, 120 indices, 121 )
Exact theta-gradient: $$\nabla_\theta J$$ from objective.grad.
124class FiniteDifferenceGradient(GradientMethod): 125 """Central finite-difference estimator. 126 127 - **theta-space**: $$\\sum_{k=1}^d \\frac{J(\\theta+\\sigma e_k)-J(\\theta-\\sigma e_k)}{2\\sigma} e_k$$ 128 — ``2 * dim(theta)`` objective evaluations per step. 129 - **u-space**: $$\\frac{M(x, u+\\sigma)-M(x, u-\\sigma)}{2\\sigma}$$ per sample, chain-ruled to theta 130 — only 2 evaluations per step regardless of ``dim(theta)``. 131 """ 132 133 name = "finite-difference" 134 135 def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None: 136 del theta0 137 if optimizer.sigma <= 0.0: 138 raise ValueError("sigma must be positive.") 139 140 def theta_grad( 141 self, 142 optimizer: "Optimization", 143 theta: np.ndarray, 144 indices: np.ndarray, 145 ) -> np.ndarray: 146 if optimizer.perturbation_space == "u": 147 return self._u_grad(optimizer, theta, indices) 148 return self._theta_grad(optimizer, theta, indices) 149 150 def _theta_grad( 151 self, 152 optimizer: "Optimization", 153 theta: np.ndarray, 154 indices: np.ndarray, 155 ) -> np.ndarray: 156 return finite_difference_theta_grad( 157 lambda theta_eval: objective_value_on_indices( 158 optimizer.objective, 159 optimizer.x_array, 160 optimizer.n_total, 161 theta_eval, 162 indices, 163 ), 164 theta, 165 method="central", 166 step=optimizer.sigma, 167 ) 168 169 def _u_grad( 170 self, 171 optimizer: "Optimization", 172 theta: np.ndarray, 173 indices: np.ndarray, 174 ) -> np.ndarray: 175 x_arr, u_arr, grad_pi = _u_space_policy_setup(optimizer, theta, indices) 176 sigma = optimizer.sigma 177 values_plus = _action_objective_values(optimizer.objective, x_arr, u_arr + sigma) 178 values_minus = _action_objective_values(optimizer.objective, x_arr, u_arr - sigma) 179 grad_u = (values_plus - values_minus) / (2.0 * sigma) 180 return np.mean(grad_u[:, None] * grad_pi, axis=0)
Central finite-difference estimator.
- theta-space: $$\sum_{k=1}^d \frac{J(\theta+\sigma e_k)-J(\theta-\sigma e_k)}{2\sigma} e_k$$
—
2 * dim(theta)objective evaluations per step. - u-space: $$\frac{M(x, u+\sigma)-M(x, u-\sigma)}{2\sigma}$$ per sample, chain-ruled to theta
— only 2 evaluations per step regardless of
dim(theta).
183class GaussSteinGradient(GradientMethod): 184 """Gaussian Stein (score-function) estimator. 185 186 - **theta-space**: $$\\hat{g} = \\frac{1}{m}\\sum_j J(\\theta+\\sigma\\varepsilon_j)\\varepsilon_j / \\sigma$$, 187 $$\\varepsilon_j \\sim \\mathcal{N}(0, I^d)$$ — one-sided, ``n_grad_samples`` evaluations. 188 - **u-space**: same estimator applied to actions, chain-ruled to theta. 189 """ 190 191 name = "gauss-stein" 192 193 def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None: 194 del theta0 195 if optimizer.n_grad_samples <= 0: 196 raise ValueError("n_grad_samples must be positive.") 197 if optimizer.sigma <= 0.0: 198 raise ValueError("sigma must be positive.") 199 200 def theta_grad( 201 self, 202 optimizer: "Optimization", 203 theta: np.ndarray, 204 indices: np.ndarray, 205 ) -> np.ndarray: 206 if optimizer.perturbation_space == "u": 207 return self._u_grad(optimizer, theta, indices) 208 return self._theta_grad(optimizer, theta, indices) 209 210 def _theta_grad( 211 self, 212 optimizer: "Optimization", 213 theta: np.ndarray, 214 indices: np.ndarray, 215 ) -> np.ndarray: 216 eps_samples = optimizer.rng.normal( 217 0.0, 1.0, size=(optimizer.n_grad_samples, theta.size) 218 ).astype(float) 219 accum = np.zeros_like(theta, dtype=float) 220 for eps in eps_samples: 221 value = objective_value_on_indices( 222 optimizer.objective, 223 optimizer.x_array, 224 optimizer.n_total, 225 theta + optimizer.sigma * eps, 226 indices, 227 ) 228 accum += value * eps 229 return accum / float(eps_samples.shape[0]) / max(optimizer.sigma, 1e-8) 230 231 def _u_grad( 232 self, 233 optimizer: "Optimization", 234 theta: np.ndarray, 235 indices: np.ndarray, 236 ) -> np.ndarray: 237 x_arr, u_arr, grad_pi = _u_space_policy_setup(optimizer, theta, indices) 238 w_samples = optimizer.rng.normal(0.0, 1.0, size=optimizer.n_grad_samples).astype(float) 239 grad_u = np.zeros(x_arr.shape[0], dtype=float) 240 for w in w_samples: 241 values = _action_objective_values(optimizer.objective, x_arr, u_arr + optimizer.sigma * w) 242 grad_u += values * w 243 grad_u /= float(w_samples.shape[0]) * max(optimizer.sigma, 1e-8) 244 return np.mean(grad_u[:, None] * grad_pi, axis=0)
Gaussian Stein (score-function) estimator.
- theta-space: $$\hat{g} = \frac{1}{m}\sum_j J(\theta+\sigma\varepsilon_j)\varepsilon_j / \sigma$$,
$$\varepsilon_j \sim \mathcal{N}(0, I^d)$$ — one-sided,
n_grad_samplesevaluations. - u-space: same estimator applied to actions, chain-ruled to theta.
247class SPSAGradient(GradientMethod): 248 """SPSA estimator using Rademacher perturbations. 249 250 - **theta-space**: $$\\hat{g} = \\frac{1}{m}\\sum_j \\frac{J(\\theta+\\sigma\\Delta_j)-J(\\theta-\\sigma\\Delta_j)}{2\\sigma}\\Delta_j$$, 251 $$\\Delta_j \\sim \\{\\pm 1\\}^d$$ — two-sided, ``2 * n_grad_samples`` evaluations. 252 - **u-space**: same estimator applied to actions, chain-ruled to theta. 253 """ 254 255 name = "spsa" 256 257 def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None: 258 del theta0 259 if optimizer.n_grad_samples <= 0: 260 raise ValueError("n_grad_samples must be positive.") 261 if optimizer.sigma <= 0.0: 262 raise ValueError("sigma must be positive.") 263 264 def theta_grad( 265 self, 266 optimizer: "Optimization", 267 theta: np.ndarray, 268 indices: np.ndarray, 269 ) -> np.ndarray: 270 if optimizer.perturbation_space == "u": 271 return self._u_grad(optimizer, theta, indices) 272 return self._theta_grad(optimizer, theta, indices) 273 274 def _theta_grad( 275 self, 276 optimizer: "Optimization", 277 theta: np.ndarray, 278 indices: np.ndarray, 279 ) -> np.ndarray: 280 delta_samples = optimizer.rng.choice( 281 np.asarray([-1.0, 1.0], dtype=float), 282 size=(optimizer.n_grad_samples, theta.size), 283 ) 284 grad = np.zeros_like(theta, dtype=float) 285 for delta in delta_samples: 286 value_plus = objective_value_on_indices( 287 optimizer.objective, 288 optimizer.x_array, 289 optimizer.n_total, 290 theta + optimizer.sigma * delta, 291 indices, 292 ) 293 value_minus = objective_value_on_indices( 294 optimizer.objective, 295 optimizer.x_array, 296 optimizer.n_total, 297 theta - optimizer.sigma * delta, 298 indices, 299 ) 300 grad += ((value_plus - value_minus) / (2.0 * optimizer.sigma)) * delta 301 return grad / float(delta_samples.shape[0]) 302 303 def _u_grad( 304 self, 305 optimizer: "Optimization", 306 theta: np.ndarray, 307 indices: np.ndarray, 308 ) -> np.ndarray: 309 x_arr, u_arr, grad_pi = _u_space_policy_setup(optimizer, theta, indices) 310 delta_samples = optimizer.rng.choice( 311 np.asarray([-1.0, 1.0], dtype=float), size=optimizer.n_grad_samples 312 ) 313 grad_u = np.zeros(x_arr.shape[0], dtype=float) 314 for delta in delta_samples: 315 values_plus = _action_objective_values(optimizer.objective, x_arr, u_arr + optimizer.sigma * delta) 316 values_minus = _action_objective_values(optimizer.objective, x_arr, u_arr - optimizer.sigma * delta) 317 grad_u += ((values_plus - values_minus) / (2.0 * optimizer.sigma)) * delta 318 grad_u /= float(delta_samples.shape[0]) 319 return np.mean(grad_u[:, None] * grad_pi, axis=0)
SPSA estimator using Rademacher perturbations.
- theta-space: $$\hat{g} = \frac{1}{m}\sum_j \frac{J(\theta+\sigma\Delta_j)-J(\theta-\sigma\Delta_j)}{2\sigma}\Delta_j$$,
$$\Delta_j \sim {\pm 1}^d$$ — two-sided,
2 * n_grad_samplesevaluations. - u-space: same estimator applied to actions, chain-ruled to theta.
322class SteinDifferenceGradient(GradientMethod): 323 """Stein-difference estimator using two-sided Gaussian perturbations. 324 325 - **theta-space**: $$\\hat{g} = \\frac{1}{m}\\sum_j \\frac{J(\\theta+\\sigma\\varepsilon_j)-J(\\theta-\\sigma\\varepsilon_j)}{2\\sigma}\\varepsilon_j$$, 326 $$\\varepsilon_j \\sim \\mathcal{N}(0, I^d)$$ — two-sided Gaussian in theta, ``2 * n_grad_samples`` evaluations. 327 - **u-space**: $$\\hat{g}_{u,i} = \\frac{1}{m}\\sum_j \\frac{M(x_i,u_i+\\sigma w_j)-M(x_i,u_i-\\sigma w_j)}{2\\sigma} w_j$$, 328 $$w_j \\sim \\mathcal{N}(0,1)$$, chain-ruled to theta via $$\\nabla_\\theta \\pi_\\theta(x_i)$$. 329 """ 330 331 name = "stein-difference" 332 333 def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None: 334 del theta0 335 if optimizer.n_grad_samples <= 0: 336 raise ValueError("n_grad_samples must be positive.") 337 if optimizer.sigma <= 0.0: 338 raise ValueError("sigma must be positive.") 339 340 def theta_grad( 341 self, 342 optimizer: "Optimization", 343 theta: np.ndarray, 344 indices: np.ndarray, 345 ) -> np.ndarray: 346 if optimizer.perturbation_space == "u": 347 return self._u_grad(optimizer, theta, indices) 348 return self._theta_grad(optimizer, theta, indices) 349 350 def _theta_grad( 351 self, 352 optimizer: "Optimization", 353 theta: np.ndarray, 354 indices: np.ndarray, 355 ) -> np.ndarray: 356 eps_samples = optimizer.rng.normal( 357 0.0, 1.0, size=(optimizer.n_grad_samples, theta.size) 358 ).astype(float) 359 grad = np.zeros_like(theta, dtype=float) 360 for eps in eps_samples: 361 value_plus = objective_value_on_indices( 362 optimizer.objective, optimizer.x_array, optimizer.n_total, 363 theta + optimizer.sigma * eps, indices, 364 ) 365 value_minus = objective_value_on_indices( 366 optimizer.objective, optimizer.x_array, optimizer.n_total, 367 theta - optimizer.sigma * eps, indices, 368 ) 369 grad += ((value_plus - value_minus) / (2.0 * optimizer.sigma)) * eps 370 return grad / float(eps_samples.shape[0]) 371 372 def _u_grad( 373 self, 374 optimizer: "Optimization", 375 theta: np.ndarray, 376 indices: np.ndarray, 377 ) -> np.ndarray: 378 x_arr, u_arr, grad_pi = _u_space_policy_setup(optimizer, theta, indices) 379 sigma = optimizer.sigma 380 w_samples = optimizer.rng.normal(0.0, 1.0, size=optimizer.n_grad_samples).astype(float) 381 grad_u = np.zeros(x_arr.shape[0], dtype=float) 382 for w in w_samples: 383 values_plus = _action_objective_values(optimizer.objective, x_arr, u_arr + sigma * w) 384 values_minus = _action_objective_values(optimizer.objective, x_arr, u_arr - sigma * w) 385 grad_u += ((values_plus - values_minus) / (2.0 * sigma)) * w 386 grad_u /= float(w_samples.shape[0]) 387 return np.mean(grad_u[:, None] * grad_pi, axis=0)
Stein-difference estimator using two-sided Gaussian perturbations.
- theta-space: $$\hat{g} = \frac{1}{m}\sum_j \frac{J(\theta+\sigma\varepsilon_j)-J(\theta-\sigma\varepsilon_j)}{2\sigma}\varepsilon_j$$,
$$\varepsilon_j \sim \mathcal{N}(0, I^d)$$ — two-sided Gaussian in theta,
2 * n_grad_samplesevaluations. - u-space: $$\hat{g}_{u,i} = \frac{1}{m}\sum_j \frac{M(x_i,u_i+\sigma w_j)-M(x_i,u_i-\sigma w_j)}{2\sigma} w_j$$, $$w_j \sim \mathcal{N}(0,1)$$, chain-ruled to theta via $$\nabla_\theta \pi_\theta(x_i)$$.