optimization

 1from optimization.base import Optimization
 2from optimization.gradients import (
 3    FiniteDifferenceGradient,
 4    FirstOrderGradient,
 5    GaussSteinGradient,
 6    GradientMethod,
 7    SPSAGradient,
 8    SteinDifferenceGradient,
 9)
10from optimization.steps import STEP_RULE_ARMIJO, STEP_RULE_CONSTANT, STEP_RULES
11
12__all__ = [
13    "Optimization",
14    "GradientMethod",
15    "FirstOrderGradient",
16    "FiniteDifferenceGradient",
17    "GaussSteinGradient",
18    "SPSAGradient",
19    "SteinDifferenceGradient",
20    "STEP_RULE_ARMIJO",
21    "STEP_RULE_CONSTANT",
22    "STEP_RULES",
23]
class Optimization:
 29class Optimization:
 30    """SciPy L-BFGS-B optimizer for theta-level objectives with pluggable gradient methods."""
 31
 32    def __init__(
 33        self,
 34        objective: Objective,
 35        x_samples: np.ndarray,
 36        gradient: Any,
 37        *,
 38        algorithm: str = STEP_RULE_LBFGSB,
 39        t_steps: int,
 40        n_grad_samples: int,
 41        sigma: float,
 42        perturbation_space: str = "theta",
 43        step_size: float = 0.01,
 44        batch_size: int | None = None,
 45        true_grad_theta_fn: TrueThetaGradFn | None = None,
 46        grad_norm_tol: float | None = None,
 47        ftol: float | None = None,
 48        step_reporter: "StepReporter | None" = None,
 49        method_label: str | None = None,
 50        rng: np.random.Generator | None = None,
 51        minimize_fn: MinimizeFn = minimize,
 52    ) -> None:
 53        """Configure SciPy-based optimization over theta.
 54
 55        Args:
 56            objective: Theta-level objective implementing value(theta, x_batch) and grad(theta, x_batch).
 57            x_samples: State samples array, shape (n_samples, state_dim).
 58            gradient: Gradient method object with setup() and theta_grad() methods.
 59            algorithm: Optimization algorithm (default: 'l-bfgs-b').
 60            t_steps: Maximum number of optimization steps.
 61            n_grad_samples: Number of gradient samples for zeroth-order methods.
 62            sigma: Perturbation scale for zeroth-order methods.
 63            perturbation_space: Space in which zeroth-order perturbations are applied:
 64                ``"theta"`` (default) perturbs policy parameters directly; ``"u"`` perturbs
 65                actions and maps back via chain rule (requires objective.policy).
 66            step_size: Step size for gradient descent algorithms.
 67            batch_size: Mini-batch size (None for full batch).
 68            true_grad_theta_fn: Optional function for computing true theta gradients.
 69            grad_norm_tol: Early stopping threshold on gradient norm.
 70            ftol: SciPy function tolerance.
 71            step_reporter: Optional reporter for per-step metrics.
 72            method_label: Label for this optimization method.
 73            rng: Random number generator.
 74            minimize_fn: SciPy minimize function (for testing).
 75        """
 76        if perturbation_space not in {"theta", "u"}:
 77            raise ValueError("perturbation_space must be 'theta' or 'u'.")
 78        self.objective = objective
 79        self.gradient = gradient
 80        self.perturbation_space = perturbation_space
 81        self.algorithm = algorithm
 82        self.t_steps = int(t_steps)
 83        self.n_grad_samples = int(n_grad_samples)
 84        self.sigma = float(sigma)
 85        self.step_size = float(step_size)
 86        self.batch_size = batch_size
 87        self.true_grad_theta_fn = true_grad_theta_fn
 88        self.grad_norm_tol = grad_norm_tol
 89        self.ftol = ftol
 90        self.step_reporter = step_reporter
 91        self.method_label = method_label if method_label is not None else getattr(self.gradient, "name", "opt")
 92        self.rng = rng if rng is not None else np.random.default_rng(0)
 93        self._minimize_fn = minimize_fn
 94
 95        x_arr = np.asarray(x_samples, dtype=float)
 96        if x_arr.ndim != 2:
 97            raise ValueError("x_samples must be a 2D array.")
 98        if x_arr.shape[0] < 1:
 99            raise ValueError("x_samples must contain at least one sample.")
100        self.x_array = x_arr
101        self.n_total = self.x_array.shape[0]
102        self.batch_size_eff = self.n_total if batch_size is None else int(batch_size)
103        if self.batch_size_eff <= 0 or self.batch_size_eff > self.n_total:
104            raise ValueError("batch_size must be in [1, len(x_samples)].")
105        if self.grad_norm_tol is not None and self.grad_norm_tol <= 0.0:
106            raise ValueError("grad_norm_tol must be positive when provided.")
107        if self.ftol is not None and self.ftol <= 0.0:
108            raise ValueError("ftol must be positive when provided.")
109        if self.t_steps <= 0:
110            raise ValueError("t_steps must be positive.")
111        self._full_indices = np.arange(self.n_total, dtype=int)
112
113    def solve(self, theta_start: np.ndarray) -> tuple[np.ndarray, "OptimizationTrace"]:
114        """Run SciPy ``minimize`` and return final theta with trace."""
115        theta0 = np.asarray(theta_start, dtype=float)
116        self.gradient.setup(self, theta0)
117
118        steps: list[int] = []
119        u_values: list[float] = []
120        values: list[float] = []
121        u_grad_estimates: list[float] = []
122        theta_grad_norms: list[float] = []
123        true_theta_grad_norms: list[float] = []
124        theta_values: list[np.ndarray] = []
125
126        def value_fn(theta_vec: np.ndarray) -> float:
127            theta_arr = np.asarray(theta_vec, dtype=float)
128            indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices)
129            return objective_value_on_indices(self.objective, self.x_array, self.n_total, theta_arr, indices)
130
131        def grad_fn(theta_vec: np.ndarray) -> np.ndarray:
132            theta_arr = np.asarray(theta_vec, dtype=float)
133            indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices)
134            return np.asarray(self.gradient.theta_grad(self, theta_arr, indices), dtype=float)
135
136        def record(theta_vec: np.ndarray) -> None:
137            theta_arr = np.asarray(theta_vec, dtype=float)
138            indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices)
139            value = objective_value_on_indices(self.objective, self.x_array, self.n_total, theta_arr, indices)
140            grad_theta = np.asarray(self.gradient.theta_grad(self, theta_arr, indices), dtype=float)
141            theta_grad_norm = float(np.linalg.norm(grad_theta))
142
143            true_theta_grad_norm = None
144            if self.true_grad_theta_fn is not None:
145                grad_true = np.asarray(
146                    self.true_grad_theta_fn(theta_arr, x_batch(self.x_array, indices, self.n_total)),
147                    dtype=float,
148                )
149                true_theta_grad_norm = float(np.linalg.norm(grad_true))
150
151            step = len(steps)
152            
153            # Compute mean action if policy is available
154            policy = getattr(self.objective, "policy", None)
155            if policy is not None:
156                mean_u = _mean_action(policy, theta_arr, x_batch(self.x_array, indices, self.n_total))
157            else:
158                mean_u = float("nan")
159
160            steps.append(step)
161            u_values.append(mean_u)
162            values.append(value)
163            u_grad_estimates.append(float("nan"))
164            theta_grad_norms.append(theta_grad_norm)
165            theta_values.append(theta_arr.copy())
166            if true_theta_grad_norm is not None:
167                true_theta_grad_norms.append(true_theta_grad_norm)
168            if self.step_reporter is not None:
169                self.step_reporter.log_step(self.method_label, step, mean_u, value, theta_grad_norm)
170
171        record(theta0)
172
173        options: dict[str, float | int] = {"maxiter": int(self.t_steps)}
174        if self.grad_norm_tol is not None:
175            options["gtol"] = float(self.grad_norm_tol)
176        if self.ftol is not None:
177            options["ftol"] = float(self.ftol)
178
179        result = self._minimize_fn(
180            value_fn,
181            x0=theta0,
182            jac=grad_fn,
183            method=scipy_method(self.algorithm),
184            options=options,
185            callback=record,
186        )
187
188        theta_final = np.asarray(result.x, dtype=float)
189        if not np.allclose(theta_final, theta_values[-1]):
190            record(theta_final)
191
192        from experiments.results import OptimizationTrace
193
194        trace = OptimizationTrace(
195            steps=steps,
196            u_values=u_values,
197            objective_values=values,
198            u_grad_estimates=u_grad_estimates,
199            u_true_gradients=None,
200            theta_grad_norms=theta_grad_norms,
201            true_theta_grad_norms=true_theta_grad_norms if true_theta_grad_norms else None,
202            theta_values=theta_values,
203            optimizer_status=int(result.status),
204            optimizer_message=str(result.message),
205        )
206        return theta_final, trace

SciPy L-BFGS-B optimizer for theta-level objectives with pluggable gradient methods.

Optimization( objective: objective.Objective, x_samples: numpy.ndarray, gradient: Any, *, algorithm: str = 'l-bfgs-b', t_steps: int, n_grad_samples: int, sigma: float, perturbation_space: str = 'theta', step_size: float = 0.01, batch_size: int | None = None, true_grad_theta_fn: Callable[[numpy.ndarray, numpy.ndarray], numpy.ndarray] | None = None, grad_norm_tol: float | None = None, ftol: float | None = None, step_reporter: experiments.StepReporter | None = None, method_label: str | None = None, rng: numpy.random._generator.Generator | None = None, minimize_fn: Callable[..., Any] = <function minimize>)
 32    def __init__(
 33        self,
 34        objective: Objective,
 35        x_samples: np.ndarray,
 36        gradient: Any,
 37        *,
 38        algorithm: str = STEP_RULE_LBFGSB,
 39        t_steps: int,
 40        n_grad_samples: int,
 41        sigma: float,
 42        perturbation_space: str = "theta",
 43        step_size: float = 0.01,
 44        batch_size: int | None = None,
 45        true_grad_theta_fn: TrueThetaGradFn | None = None,
 46        grad_norm_tol: float | None = None,
 47        ftol: float | None = None,
 48        step_reporter: "StepReporter | None" = None,
 49        method_label: str | None = None,
 50        rng: np.random.Generator | None = None,
 51        minimize_fn: MinimizeFn = minimize,
 52    ) -> None:
 53        """Configure SciPy-based optimization over theta.
 54
 55        Args:
 56            objective: Theta-level objective implementing value(theta, x_batch) and grad(theta, x_batch).
 57            x_samples: State samples array, shape (n_samples, state_dim).
 58            gradient: Gradient method object with setup() and theta_grad() methods.
 59            algorithm: Optimization algorithm (default: 'l-bfgs-b').
 60            t_steps: Maximum number of optimization steps.
 61            n_grad_samples: Number of gradient samples for zeroth-order methods.
 62            sigma: Perturbation scale for zeroth-order methods.
 63            perturbation_space: Space in which zeroth-order perturbations are applied:
 64                ``"theta"`` (default) perturbs policy parameters directly; ``"u"`` perturbs
 65                actions and maps back via chain rule (requires objective.policy).
 66            step_size: Step size for gradient descent algorithms.
 67            batch_size: Mini-batch size (None for full batch).
 68            true_grad_theta_fn: Optional function for computing true theta gradients.
 69            grad_norm_tol: Early stopping threshold on gradient norm.
 70            ftol: SciPy function tolerance.
 71            step_reporter: Optional reporter for per-step metrics.
 72            method_label: Label for this optimization method.
 73            rng: Random number generator.
 74            minimize_fn: SciPy minimize function (for testing).
 75        """
 76        if perturbation_space not in {"theta", "u"}:
 77            raise ValueError("perturbation_space must be 'theta' or 'u'.")
 78        self.objective = objective
 79        self.gradient = gradient
 80        self.perturbation_space = perturbation_space
 81        self.algorithm = algorithm
 82        self.t_steps = int(t_steps)
 83        self.n_grad_samples = int(n_grad_samples)
 84        self.sigma = float(sigma)
 85        self.step_size = float(step_size)
 86        self.batch_size = batch_size
 87        self.true_grad_theta_fn = true_grad_theta_fn
 88        self.grad_norm_tol = grad_norm_tol
 89        self.ftol = ftol
 90        self.step_reporter = step_reporter
 91        self.method_label = method_label if method_label is not None else getattr(self.gradient, "name", "opt")
 92        self.rng = rng if rng is not None else np.random.default_rng(0)
 93        self._minimize_fn = minimize_fn
 94
 95        x_arr = np.asarray(x_samples, dtype=float)
 96        if x_arr.ndim != 2:
 97            raise ValueError("x_samples must be a 2D array.")
 98        if x_arr.shape[0] < 1:
 99            raise ValueError("x_samples must contain at least one sample.")
100        self.x_array = x_arr
101        self.n_total = self.x_array.shape[0]
102        self.batch_size_eff = self.n_total if batch_size is None else int(batch_size)
103        if self.batch_size_eff <= 0 or self.batch_size_eff > self.n_total:
104            raise ValueError("batch_size must be in [1, len(x_samples)].")
105        if self.grad_norm_tol is not None and self.grad_norm_tol <= 0.0:
106            raise ValueError("grad_norm_tol must be positive when provided.")
107        if self.ftol is not None and self.ftol <= 0.0:
108            raise ValueError("ftol must be positive when provided.")
109        if self.t_steps <= 0:
110            raise ValueError("t_steps must be positive.")
111        self._full_indices = np.arange(self.n_total, dtype=int)

Configure SciPy-based optimization over theta.

Args: objective: Theta-level objective implementing value(theta, x_batch) and grad(theta, x_batch). x_samples: State samples array, shape (n_samples, state_dim). gradient: Gradient method object with setup() and theta_grad() methods. algorithm: Optimization algorithm (default: 'l-bfgs-b'). t_steps: Maximum number of optimization steps. n_grad_samples: Number of gradient samples for zeroth-order methods. sigma: Perturbation scale for zeroth-order methods. perturbation_space: Space in which zeroth-order perturbations are applied: "theta" (default) perturbs policy parameters directly; "u" perturbs actions and maps back via chain rule (requires objective.policy). step_size: Step size for gradient descent algorithms. batch_size: Mini-batch size (None for full batch). true_grad_theta_fn: Optional function for computing true theta gradients. grad_norm_tol: Early stopping threshold on gradient norm. ftol: SciPy function tolerance. step_reporter: Optional reporter for per-step metrics. method_label: Label for this optimization method. rng: Random number generator. minimize_fn: SciPy minimize function (for testing).

objective
gradient
perturbation_space
algorithm
t_steps
n_grad_samples
sigma
step_size
batch_size
true_grad_theta_fn
grad_norm_tol
ftol
step_reporter
method_label
rng
x_array
n_total
batch_size_eff
def solve( self, theta_start: numpy.ndarray) -> tuple[numpy.ndarray, experiments.OptimizationTrace]:
113    def solve(self, theta_start: np.ndarray) -> tuple[np.ndarray, "OptimizationTrace"]:
114        """Run SciPy ``minimize`` and return final theta with trace."""
115        theta0 = np.asarray(theta_start, dtype=float)
116        self.gradient.setup(self, theta0)
117
118        steps: list[int] = []
119        u_values: list[float] = []
120        values: list[float] = []
121        u_grad_estimates: list[float] = []
122        theta_grad_norms: list[float] = []
123        true_theta_grad_norms: list[float] = []
124        theta_values: list[np.ndarray] = []
125
126        def value_fn(theta_vec: np.ndarray) -> float:
127            theta_arr = np.asarray(theta_vec, dtype=float)
128            indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices)
129            return objective_value_on_indices(self.objective, self.x_array, self.n_total, theta_arr, indices)
130
131        def grad_fn(theta_vec: np.ndarray) -> np.ndarray:
132            theta_arr = np.asarray(theta_vec, dtype=float)
133            indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices)
134            return np.asarray(self.gradient.theta_grad(self, theta_arr, indices), dtype=float)
135
136        def record(theta_vec: np.ndarray) -> None:
137            theta_arr = np.asarray(theta_vec, dtype=float)
138            indices = sample_indices(self.rng, self.batch_size_eff, self.n_total, self._full_indices)
139            value = objective_value_on_indices(self.objective, self.x_array, self.n_total, theta_arr, indices)
140            grad_theta = np.asarray(self.gradient.theta_grad(self, theta_arr, indices), dtype=float)
141            theta_grad_norm = float(np.linalg.norm(grad_theta))
142
143            true_theta_grad_norm = None
144            if self.true_grad_theta_fn is not None:
145                grad_true = np.asarray(
146                    self.true_grad_theta_fn(theta_arr, x_batch(self.x_array, indices, self.n_total)),
147                    dtype=float,
148                )
149                true_theta_grad_norm = float(np.linalg.norm(grad_true))
150
151            step = len(steps)
152            
153            # Compute mean action if policy is available
154            policy = getattr(self.objective, "policy", None)
155            if policy is not None:
156                mean_u = _mean_action(policy, theta_arr, x_batch(self.x_array, indices, self.n_total))
157            else:
158                mean_u = float("nan")
159
160            steps.append(step)
161            u_values.append(mean_u)
162            values.append(value)
163            u_grad_estimates.append(float("nan"))
164            theta_grad_norms.append(theta_grad_norm)
165            theta_values.append(theta_arr.copy())
166            if true_theta_grad_norm is not None:
167                true_theta_grad_norms.append(true_theta_grad_norm)
168            if self.step_reporter is not None:
169                self.step_reporter.log_step(self.method_label, step, mean_u, value, theta_grad_norm)
170
171        record(theta0)
172
173        options: dict[str, float | int] = {"maxiter": int(self.t_steps)}
174        if self.grad_norm_tol is not None:
175            options["gtol"] = float(self.grad_norm_tol)
176        if self.ftol is not None:
177            options["ftol"] = float(self.ftol)
178
179        result = self._minimize_fn(
180            value_fn,
181            x0=theta0,
182            jac=grad_fn,
183            method=scipy_method(self.algorithm),
184            options=options,
185            callback=record,
186        )
187
188        theta_final = np.asarray(result.x, dtype=float)
189        if not np.allclose(theta_final, theta_values[-1]):
190            record(theta_final)
191
192        from experiments.results import OptimizationTrace
193
194        trace = OptimizationTrace(
195            steps=steps,
196            u_values=u_values,
197            objective_values=values,
198            u_grad_estimates=u_grad_estimates,
199            u_true_gradients=None,
200            theta_grad_norms=theta_grad_norms,
201            true_theta_grad_norms=true_theta_grad_norms if true_theta_grad_norms else None,
202            theta_values=theta_values,
203            optimizer_status=int(result.status),
204            optimizer_message=str(result.message),
205        )
206        return theta_final, trace

Run SciPy minimize and return final theta with trace.

class GradientMethod:
82class GradientMethod:
83    """Base interface for theta-gradient estimators used by the optimizer."""
84
85    name = "gradient"
86
87    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
88        del optimizer, theta0
89
90    def theta_grad(
91        self,
92        optimizer: "Optimization",
93        theta: np.ndarray,
94        indices: np.ndarray,
95    ) -> np.ndarray:
96        raise NotImplementedError

Base interface for theta-gradient estimators used by the optimizer.

name = 'gradient'
def setup( self, optimizer: Optimization, theta0: numpy.ndarray) -> None:
87    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
88        del optimizer, theta0
def theta_grad( self, optimizer: Optimization, theta: numpy.ndarray, indices: numpy.ndarray) -> numpy.ndarray:
90    def theta_grad(
91        self,
92        optimizer: "Optimization",
93        theta: np.ndarray,
94        indices: np.ndarray,
95    ) -> np.ndarray:
96        raise NotImplementedError
class FirstOrderGradient(optimization.GradientMethod):
104class FirstOrderGradient(GradientMethod):
105    """Exact theta-gradient: $$\\nabla_\\theta J$$ from ``objective.grad``."""
106
107    name = "first-order"
108
109    def theta_grad(
110        self,
111        optimizer: "Optimization",
112        theta: np.ndarray,
113        indices: np.ndarray,
114    ) -> np.ndarray:
115        return objective_grad_on_indices(
116            optimizer.objective,
117            optimizer.x_array,
118            optimizer.n_total,
119            theta,
120            indices,
121        )

Exact theta-gradient: $$\nabla_\theta J$$ from objective.grad.

name = 'first-order'
def theta_grad( self, optimizer: Optimization, theta: numpy.ndarray, indices: numpy.ndarray) -> numpy.ndarray:
109    def theta_grad(
110        self,
111        optimizer: "Optimization",
112        theta: np.ndarray,
113        indices: np.ndarray,
114    ) -> np.ndarray:
115        return objective_grad_on_indices(
116            optimizer.objective,
117            optimizer.x_array,
118            optimizer.n_total,
119            theta,
120            indices,
121        )
class FiniteDifferenceGradient(optimization.GradientMethod):
124class FiniteDifferenceGradient(GradientMethod):
125    """Central finite-difference estimator.
126
127    - **theta-space**: $$\\sum_{k=1}^d \\frac{J(\\theta+\\sigma e_k)-J(\\theta-\\sigma e_k)}{2\\sigma} e_k$$
128      — ``2 * dim(theta)`` objective evaluations per step.
129    - **u-space**: $$\\frac{M(x, u+\\sigma)-M(x, u-\\sigma)}{2\\sigma}$$ per sample, chain-ruled to theta
130      — only 2 evaluations per step regardless of ``dim(theta)``.
131    """
132
133    name = "finite-difference"
134
135    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
136        del theta0
137        if optimizer.sigma <= 0.0:
138            raise ValueError("sigma must be positive.")
139
140    def theta_grad(
141        self,
142        optimizer: "Optimization",
143        theta: np.ndarray,
144        indices: np.ndarray,
145    ) -> np.ndarray:
146        if optimizer.perturbation_space == "u":
147            return self._u_grad(optimizer, theta, indices)
148        return self._theta_grad(optimizer, theta, indices)
149
150    def _theta_grad(
151        self,
152        optimizer: "Optimization",
153        theta: np.ndarray,
154        indices: np.ndarray,
155    ) -> np.ndarray:
156        return finite_difference_theta_grad(
157            lambda theta_eval: objective_value_on_indices(
158                optimizer.objective,
159                optimizer.x_array,
160                optimizer.n_total,
161                theta_eval,
162                indices,
163            ),
164            theta,
165            method="central",
166            step=optimizer.sigma,
167        )
168
169    def _u_grad(
170        self,
171        optimizer: "Optimization",
172        theta: np.ndarray,
173        indices: np.ndarray,
174    ) -> np.ndarray:
175        x_arr, u_arr, grad_pi = _u_space_policy_setup(optimizer, theta, indices)
176        sigma = optimizer.sigma
177        values_plus = _action_objective_values(optimizer.objective, x_arr, u_arr + sigma)
178        values_minus = _action_objective_values(optimizer.objective, x_arr, u_arr - sigma)
179        grad_u = (values_plus - values_minus) / (2.0 * sigma)
180        return np.mean(grad_u[:, None] * grad_pi, axis=0)

Central finite-difference estimator.

  • theta-space: $$\sum_{k=1}^d \frac{J(\theta+\sigma e_k)-J(\theta-\sigma e_k)}{2\sigma} e_k$$ — 2 * dim(theta) objective evaluations per step.
  • u-space: $$\frac{M(x, u+\sigma)-M(x, u-\sigma)}{2\sigma}$$ per sample, chain-ruled to theta — only 2 evaluations per step regardless of dim(theta).
name = 'finite-difference'
def setup( self, optimizer: Optimization, theta0: numpy.ndarray) -> None:
135    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
136        del theta0
137        if optimizer.sigma <= 0.0:
138            raise ValueError("sigma must be positive.")
def theta_grad( self, optimizer: Optimization, theta: numpy.ndarray, indices: numpy.ndarray) -> numpy.ndarray:
140    def theta_grad(
141        self,
142        optimizer: "Optimization",
143        theta: np.ndarray,
144        indices: np.ndarray,
145    ) -> np.ndarray:
146        if optimizer.perturbation_space == "u":
147            return self._u_grad(optimizer, theta, indices)
148        return self._theta_grad(optimizer, theta, indices)
class GaussSteinGradient(optimization.GradientMethod):
183class GaussSteinGradient(GradientMethod):
184    """Gaussian Stein (score-function) estimator.
185
186    - **theta-space**: $$\\hat{g} = \\frac{1}{m}\\sum_j J(\\theta+\\sigma\\varepsilon_j)\\varepsilon_j / \\sigma$$,
187      $$\\varepsilon_j \\sim \\mathcal{N}(0, I^d)$$ — one-sided, ``n_grad_samples`` evaluations.
188    - **u-space**: same estimator applied to actions, chain-ruled to theta.
189    """
190
191    name = "gauss-stein"
192
193    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
194        del theta0
195        if optimizer.n_grad_samples <= 0:
196            raise ValueError("n_grad_samples must be positive.")
197        if optimizer.sigma <= 0.0:
198            raise ValueError("sigma must be positive.")
199
200    def theta_grad(
201        self,
202        optimizer: "Optimization",
203        theta: np.ndarray,
204        indices: np.ndarray,
205    ) -> np.ndarray:
206        if optimizer.perturbation_space == "u":
207            return self._u_grad(optimizer, theta, indices)
208        return self._theta_grad(optimizer, theta, indices)
209
210    def _theta_grad(
211        self,
212        optimizer: "Optimization",
213        theta: np.ndarray,
214        indices: np.ndarray,
215    ) -> np.ndarray:
216        eps_samples = optimizer.rng.normal(
217            0.0, 1.0, size=(optimizer.n_grad_samples, theta.size)
218        ).astype(float)
219        accum = np.zeros_like(theta, dtype=float)
220        for eps in eps_samples:
221            value = objective_value_on_indices(
222                optimizer.objective,
223                optimizer.x_array,
224                optimizer.n_total,
225                theta + optimizer.sigma * eps,
226                indices,
227            )
228            accum += value * eps
229        return accum / float(eps_samples.shape[0]) / max(optimizer.sigma, 1e-8)
230
231    def _u_grad(
232        self,
233        optimizer: "Optimization",
234        theta: np.ndarray,
235        indices: np.ndarray,
236    ) -> np.ndarray:
237        x_arr, u_arr, grad_pi = _u_space_policy_setup(optimizer, theta, indices)
238        w_samples = optimizer.rng.normal(0.0, 1.0, size=optimizer.n_grad_samples).astype(float)
239        grad_u = np.zeros(x_arr.shape[0], dtype=float)
240        for w in w_samples:
241            values = _action_objective_values(optimizer.objective, x_arr, u_arr + optimizer.sigma * w)
242            grad_u += values * w
243        grad_u /= float(w_samples.shape[0]) * max(optimizer.sigma, 1e-8)
244        return np.mean(grad_u[:, None] * grad_pi, axis=0)

Gaussian Stein (score-function) estimator.

  • theta-space: $$\hat{g} = \frac{1}{m}\sum_j J(\theta+\sigma\varepsilon_j)\varepsilon_j / \sigma$$, $$\varepsilon_j \sim \mathcal{N}(0, I^d)$$ — one-sided, n_grad_samples evaluations.
  • u-space: same estimator applied to actions, chain-ruled to theta.
name = 'gauss-stein'
def setup( self, optimizer: Optimization, theta0: numpy.ndarray) -> None:
193    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
194        del theta0
195        if optimizer.n_grad_samples <= 0:
196            raise ValueError("n_grad_samples must be positive.")
197        if optimizer.sigma <= 0.0:
198            raise ValueError("sigma must be positive.")
def theta_grad( self, optimizer: Optimization, theta: numpy.ndarray, indices: numpy.ndarray) -> numpy.ndarray:
200    def theta_grad(
201        self,
202        optimizer: "Optimization",
203        theta: np.ndarray,
204        indices: np.ndarray,
205    ) -> np.ndarray:
206        if optimizer.perturbation_space == "u":
207            return self._u_grad(optimizer, theta, indices)
208        return self._theta_grad(optimizer, theta, indices)
class SPSAGradient(optimization.GradientMethod):
247class SPSAGradient(GradientMethod):
248    """SPSA estimator using Rademacher perturbations.
249
250    - **theta-space**: $$\\hat{g} = \\frac{1}{m}\\sum_j \\frac{J(\\theta+\\sigma\\Delta_j)-J(\\theta-\\sigma\\Delta_j)}{2\\sigma}\\Delta_j$$,
251      $$\\Delta_j \\sim \\{\\pm 1\\}^d$$ — two-sided, ``2 * n_grad_samples`` evaluations.
252    - **u-space**: same estimator applied to actions, chain-ruled to theta.
253    """
254
255    name = "spsa"
256
257    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
258        del theta0
259        if optimizer.n_grad_samples <= 0:
260            raise ValueError("n_grad_samples must be positive.")
261        if optimizer.sigma <= 0.0:
262            raise ValueError("sigma must be positive.")
263
264    def theta_grad(
265        self,
266        optimizer: "Optimization",
267        theta: np.ndarray,
268        indices: np.ndarray,
269    ) -> np.ndarray:
270        if optimizer.perturbation_space == "u":
271            return self._u_grad(optimizer, theta, indices)
272        return self._theta_grad(optimizer, theta, indices)
273
274    def _theta_grad(
275        self,
276        optimizer: "Optimization",
277        theta: np.ndarray,
278        indices: np.ndarray,
279    ) -> np.ndarray:
280        delta_samples = optimizer.rng.choice(
281            np.asarray([-1.0, 1.0], dtype=float),
282            size=(optimizer.n_grad_samples, theta.size),
283        )
284        grad = np.zeros_like(theta, dtype=float)
285        for delta in delta_samples:
286            value_plus = objective_value_on_indices(
287                optimizer.objective,
288                optimizer.x_array,
289                optimizer.n_total,
290                theta + optimizer.sigma * delta,
291                indices,
292            )
293            value_minus = objective_value_on_indices(
294                optimizer.objective,
295                optimizer.x_array,
296                optimizer.n_total,
297                theta - optimizer.sigma * delta,
298                indices,
299            )
300            grad += ((value_plus - value_minus) / (2.0 * optimizer.sigma)) * delta
301        return grad / float(delta_samples.shape[0])
302
303    def _u_grad(
304        self,
305        optimizer: "Optimization",
306        theta: np.ndarray,
307        indices: np.ndarray,
308    ) -> np.ndarray:
309        x_arr, u_arr, grad_pi = _u_space_policy_setup(optimizer, theta, indices)
310        delta_samples = optimizer.rng.choice(
311            np.asarray([-1.0, 1.0], dtype=float), size=optimizer.n_grad_samples
312        )
313        grad_u = np.zeros(x_arr.shape[0], dtype=float)
314        for delta in delta_samples:
315            values_plus = _action_objective_values(optimizer.objective, x_arr, u_arr + optimizer.sigma * delta)
316            values_minus = _action_objective_values(optimizer.objective, x_arr, u_arr - optimizer.sigma * delta)
317            grad_u += ((values_plus - values_minus) / (2.0 * optimizer.sigma)) * delta
318        grad_u /= float(delta_samples.shape[0])
319        return np.mean(grad_u[:, None] * grad_pi, axis=0)

SPSA estimator using Rademacher perturbations.

  • theta-space: $$\hat{g} = \frac{1}{m}\sum_j \frac{J(\theta+\sigma\Delta_j)-J(\theta-\sigma\Delta_j)}{2\sigma}\Delta_j$$, $$\Delta_j \sim {\pm 1}^d$$ — two-sided, 2 * n_grad_samples evaluations.
  • u-space: same estimator applied to actions, chain-ruled to theta.
name = 'spsa'
def setup( self, optimizer: Optimization, theta0: numpy.ndarray) -> None:
257    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
258        del theta0
259        if optimizer.n_grad_samples <= 0:
260            raise ValueError("n_grad_samples must be positive.")
261        if optimizer.sigma <= 0.0:
262            raise ValueError("sigma must be positive.")
def theta_grad( self, optimizer: Optimization, theta: numpy.ndarray, indices: numpy.ndarray) -> numpy.ndarray:
264    def theta_grad(
265        self,
266        optimizer: "Optimization",
267        theta: np.ndarray,
268        indices: np.ndarray,
269    ) -> np.ndarray:
270        if optimizer.perturbation_space == "u":
271            return self._u_grad(optimizer, theta, indices)
272        return self._theta_grad(optimizer, theta, indices)
class SteinDifferenceGradient(optimization.GradientMethod):
322class SteinDifferenceGradient(GradientMethod):
323    """Stein-difference estimator using two-sided Gaussian perturbations.
324
325    - **theta-space**: $$\\hat{g} = \\frac{1}{m}\\sum_j \\frac{J(\\theta+\\sigma\\varepsilon_j)-J(\\theta-\\sigma\\varepsilon_j)}{2\\sigma}\\varepsilon_j$$,
326      $$\\varepsilon_j \\sim \\mathcal{N}(0, I^d)$$ — two-sided Gaussian in theta, ``2 * n_grad_samples`` evaluations.
327    - **u-space**: $$\\hat{g}_{u,i} = \\frac{1}{m}\\sum_j \\frac{M(x_i,u_i+\\sigma w_j)-M(x_i,u_i-\\sigma w_j)}{2\\sigma} w_j$$,
328      $$w_j \\sim \\mathcal{N}(0,1)$$, chain-ruled to theta via $$\\nabla_\\theta \\pi_\\theta(x_i)$$.
329    """
330
331    name = "stein-difference"
332
333    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
334        del theta0
335        if optimizer.n_grad_samples <= 0:
336            raise ValueError("n_grad_samples must be positive.")
337        if optimizer.sigma <= 0.0:
338            raise ValueError("sigma must be positive.")
339
340    def theta_grad(
341        self,
342        optimizer: "Optimization",
343        theta: np.ndarray,
344        indices: np.ndarray,
345    ) -> np.ndarray:
346        if optimizer.perturbation_space == "u":
347            return self._u_grad(optimizer, theta, indices)
348        return self._theta_grad(optimizer, theta, indices)
349
350    def _theta_grad(
351        self,
352        optimizer: "Optimization",
353        theta: np.ndarray,
354        indices: np.ndarray,
355    ) -> np.ndarray:
356        eps_samples = optimizer.rng.normal(
357            0.0, 1.0, size=(optimizer.n_grad_samples, theta.size)
358        ).astype(float)
359        grad = np.zeros_like(theta, dtype=float)
360        for eps in eps_samples:
361            value_plus = objective_value_on_indices(
362                optimizer.objective, optimizer.x_array, optimizer.n_total,
363                theta + optimizer.sigma * eps, indices,
364            )
365            value_minus = objective_value_on_indices(
366                optimizer.objective, optimizer.x_array, optimizer.n_total,
367                theta - optimizer.sigma * eps, indices,
368            )
369            grad += ((value_plus - value_minus) / (2.0 * optimizer.sigma)) * eps
370        return grad / float(eps_samples.shape[0])
371
372    def _u_grad(
373        self,
374        optimizer: "Optimization",
375        theta: np.ndarray,
376        indices: np.ndarray,
377    ) -> np.ndarray:
378        x_arr, u_arr, grad_pi = _u_space_policy_setup(optimizer, theta, indices)
379        sigma = optimizer.sigma
380        w_samples = optimizer.rng.normal(0.0, 1.0, size=optimizer.n_grad_samples).astype(float)
381        grad_u = np.zeros(x_arr.shape[0], dtype=float)
382        for w in w_samples:
383            values_plus = _action_objective_values(optimizer.objective, x_arr, u_arr + sigma * w)
384            values_minus = _action_objective_values(optimizer.objective, x_arr, u_arr - sigma * w)
385            grad_u += ((values_plus - values_minus) / (2.0 * sigma)) * w
386        grad_u /= float(w_samples.shape[0])
387        return np.mean(grad_u[:, None] * grad_pi, axis=0)

Stein-difference estimator using two-sided Gaussian perturbations.

  • theta-space: $$\hat{g} = \frac{1}{m}\sum_j \frac{J(\theta+\sigma\varepsilon_j)-J(\theta-\sigma\varepsilon_j)}{2\sigma}\varepsilon_j$$, $$\varepsilon_j \sim \mathcal{N}(0, I^d)$$ — two-sided Gaussian in theta, 2 * n_grad_samples evaluations.
  • u-space: $$\hat{g}_{u,i} = \frac{1}{m}\sum_j \frac{M(x_i,u_i+\sigma w_j)-M(x_i,u_i-\sigma w_j)}{2\sigma} w_j$$, $$w_j \sim \mathcal{N}(0,1)$$, chain-ruled to theta via $$\nabla_\theta \pi_\theta(x_i)$$.
name = 'stein-difference'
def setup( self, optimizer: Optimization, theta0: numpy.ndarray) -> None:
333    def setup(self, optimizer: "Optimization", theta0: np.ndarray) -> None:
334        del theta0
335        if optimizer.n_grad_samples <= 0:
336            raise ValueError("n_grad_samples must be positive.")
337        if optimizer.sigma <= 0.0:
338            raise ValueError("sigma must be positive.")
def theta_grad( self, optimizer: Optimization, theta: numpy.ndarray, indices: numpy.ndarray) -> numpy.ndarray:
340    def theta_grad(
341        self,
342        optimizer: "Optimization",
343        theta: np.ndarray,
344        indices: np.ndarray,
345    ) -> np.ndarray:
346        if optimizer.perturbation_space == "u":
347            return self._u_grad(optimizer, theta, indices)
348        return self._theta_grad(optimizer, theta, indices)
STEP_RULE_ARMIJO = 'armijo'
STEP_RULE_CONSTANT = 'constant'
STEP_RULES = ('constant', 'armijo', 'l-bfgs-b')