reporting

Reporting and visualization helpers for experiment outputs.

 1"""Reporting and visualization helpers for experiment outputs."""
 2
 3from reporting.logging import log_step, log_summary
 4from reporting.visualization import (
 5    ESTIMATOR_STYLES,
 6    plot_gradient_norms,
 7    plot_loss_curves,
 8    plot_objective_u_slice,
 9    plot_step_sizes,
10    plot_theta_objective_contours,
11    select_theta_axes_max_variance,
12    theta_objective_contour_grid,
13)
14
15__all__ = [
16    "ESTIMATOR_STYLES",
17    "log_step",
18    "log_summary",
19    "plot_gradient_norms",
20    "plot_loss_curves",
21    "plot_objective_u_slice",
22    "plot_step_sizes",
23    "plot_theta_objective_contours",
24    "select_theta_axes_max_variance",
25    "theta_objective_contour_grid",
26]
ESTIMATOR_STYLES = {'first_order': {'label': 'first-order', 'color': '#1f77b4', 'marker': 'o'}, 'finite_difference': {'label': 'finite-difference', 'color': '#8c564b', 'marker': 'P'}, 'gauss_stein': {'label': 'gauss-stein', 'color': '#ff7f0e', 'marker': 's'}, 'stein_difference': {'label': 'stein-difference', 'color': '#2ca02c', 'marker': '^'}, 'spsa': {'label': 'SPSA', 'color': '#d62728', 'marker': 'D'}}
def log_step( method: str, step: int, u: float, value: float, grad_norm: float | None = None, step_size: float | None = None) -> None:
14def log_step(
15    method: str,
16    step: int,
17    u: float,
18    value: float,
19    grad_norm: float | None = None,
20    step_size: float | None = None,
21) -> None:
22    """Print a single optimization step to console."""
23    parts = [f"[{method}] step={step}", f"u={u:.4f}", f"value={value:.4f}"]
24    if grad_norm is not None:
25        parts.append(f"grad_norm={grad_norm:.4f}")
26    if step_size is not None:
27        parts.append(f"step_size={step_size:.6f}")
28    print(" ".join(parts))

Print a single optimization step to console.

def log_summary(result: experiments.ExperimentResult) -> None:
 31def log_summary(result: ExperimentResult) -> None:
 32    def format_array(values: object, precision: int = 3) -> str:
 33        arr = np.asarray(values, dtype=float)
 34        formatted = ", ".join(f"{val:.{precision}f}" for val in arr)
 35        return f"[{formatted}]"
 36
 37    config = result.config
 38    objective = config.objective
 39    u_star: Optional[float] = result.u_star
 40    value_at_u_star: Optional[float] = result.value_at_u_star
 41
 42    if isinstance(objective, FixedRegressionObjective):
 43        beta_1 = format_array(objective.beta_1)
 44        beta_2 = objective.beta_2
 45        beta_3 = format_array(objective.beta_3)
 46        beta_4 = objective.beta_4
 47        print(
 48            "Objective: f(u; x) = sigmoid(beta_1·x + beta_2*u) * (beta_3·x - beta_4*u)"
 49        )
 50        print(
 51            "Betas: "
 52            f"beta_1={beta_1}, beta_2={beta_2:.3f}, beta_3={beta_3}, beta_4={beta_4:.3f}"
 53        )
 54    elif isinstance(objective, PlantedLogisticObjective):
 55        beta = format_array(objective.beta)
 56        print("Objective: L(u; x) = log(1 + exp(z)) - p*(x) * z")
 57        print("z = alpha * u + beta·x + bias")
 58        print("p*(x) = sigmoid(alpha * u* + beta·x + bias)")
 59        print(
 60            "Params: "
 61            f"alpha={objective.alpha:.3f}, bias={objective.bias:.3f}, "
 62            f"u*={objective.u_star:.3f}, beta={beta}"
 63        )
 64    else:
 65        print(f"Objective: {type(objective).__name__}")
 66
 67    print(
 68        "Run: "
 69        f"steps={config.t_steps}, n_samples={config.n_samples}, step_size={config.step_size:.4f}, "
 70        f"step_rule={config.step_rule}"
 71    )
 72    print(f"Initial objective value: {result.initial_value:.4f}")
 73    u_star_value = float(u_star) if u_star is not None else None
 74    value_at_u_star_value = (
 75        float(value_at_u_star) if value_at_u_star is not None else None
 76    )
 77    if u_star_value is not None:
 78        print(f"Known optimum u*: {u_star_value:.4f}")
 79        if value_at_u_star_value is not None:
 80            print(f"Objective at u*: {value_at_u_star_value:.4f}")
 81    print("=== Results ===")
 82
 83    order = ("first_order", "finite_difference", "gauss_stein", "stein_difference", "spsa")
 84    labels = {
 85        "first_order": "first-order",
 86        "finite_difference": "finite-difference",
 87        "gauss_stein": "gauss-stein",
 88        "stein_difference": "stein-difference",
 89        "spsa": "SPSA",
 90    }
 91    ordered = [name for name in order if name in result.results]
 92    if ordered:
 93        final_u = ", ".join(
 94            f"{labels[name]}={float(result.results[name].u):.4f}" for name in ordered
 95        )
 96        final_value = ", ".join(
 97            f"{labels[name]}={float(result.results[name].value):.4f}" for name in ordered
 98        )
 99        print(f"Final u: {final_u}")
100        print(f"Final objective: {final_value}")
101        if u_star_value is not None:
102            u_gap = ", ".join(
103                f"{labels[name]}={abs(result.results[name].u - u_star_value):.4f}"
104                for name in ordered
105            )
106            print(f"|u - u*|: {u_gap}")
107            if value_at_u_star_value is not None:
108                value_gap = ", ".join(
109                    f"{labels[name]}={result.results[name].value - value_at_u_star_value:.4f}"
110                    for name in ordered
111                )
112                print(f"Objective gap: {value_gap}")
113        print(f"Initial theta: {format_array(config.theta0)}")
114        for name in ordered:
115            theta = result.results[name].theta
116            theta_l2 = float(np.linalg.norm(theta))
117            theta_delta_l2 = float(np.linalg.norm(theta - config.theta0))
118            print(f"Final theta ({labels[name]}): {format_array(theta)}")
119            print(
120                f"Final theta norms ({labels[name]}): "
121                f"||theta||_2={theta_l2:.4f}, ||theta-theta0||_2={theta_delta_l2:.4f}"
122            )
123        print("=== Runtime (s) ===")
124        for name in ordered:
125            runtime = float(result.results[name].time)
126            print(f"{labels[name]}: {runtime:.4f}")
def plot_gradient_norms( traces: Mapping[str, experiments.OptimizationTrace], plot_dir: str) -> None:
108def plot_gradient_norms(
109    traces: Mapping[str, OptimizationTrace],
110    plot_dir: str,
111) -> None:
112    trace_items = _ordered_traces(traces)
113    if not trace_items:
114        return
115    path = _ensure_plot_dir(plot_dir)
116    has_true = any(trace.true_theta_grad_norms is not None for _, trace in trace_items)
117    has_est = any(trace.theta_grad_norms is not None for _, trace in trace_items)
118
119    if has_true and has_est:
120        fig, axes = plt.subplots(2, 1, figsize=(8, 7), sharex=True)
121        ax_norm, ax_err = axes
122    else:
123        fig, ax_norm = plt.subplots(1, 1, figsize=(8, 4.5))
124        ax_err = None
125
126    for name, trace in trace_items:
127        series = trace.true_theta_grad_norms
128        if series is None:
129            series = trace.theta_grad_norms
130        if series is None:
131            raise ValueError("theta_grad_norms or true_theta_grad_norms must be provided.")
132        style = ESTIMATOR_STYLES[name]
133        ax_norm.plot(
134            trace.steps,
135            series,
136            label=style["label"],
137            color=style["color"],
138            alpha=_LINE_ALPHA,
139            linewidth=_LINE_WIDTH,
140            marker=style["marker"],
141            markersize=_MARKER_SIZE,
142            markevery=_marker_every(len(trace.steps)),
143        )
144    if has_true:
145        ax_norm.set_ylabel("|theta grad norm| (true)")
146    else:
147        ax_norm.set_ylabel("|theta grad norm|")
148    ax_norm.legend()
149    ax_norm.grid(True, alpha=0.3)
150
151    if ax_err is not None:
152        for name, trace in trace_items:
153            if trace.true_theta_grad_norms is None or trace.theta_grad_norms is None:
154                continue
155            style = ESTIMATOR_STYLES[name]
156            err_values = [
157                abs(g - t)
158                for g, t in zip(trace.theta_grad_norms, trace.true_theta_grad_norms)
159            ]
160            ax_err.plot(
161                trace.steps,
162                err_values,
163                label=f"{style['label']} error",
164                color=style["color"],
165                alpha=_LINE_ALPHA,
166                linewidth=_LINE_WIDTH,
167                marker=style["marker"],
168                markersize=_MARKER_SIZE,
169                markevery=_marker_every(len(trace.steps)),
170            )
171        ax_err.set_ylabel("|norm error|")
172        ax_err.set_xlabel("Step")
173        ax_err.legend()
174        ax_err.grid(True, alpha=0.3)
175    else:
176        ax_norm.set_xlabel("Step")
177
178    fig.tight_layout()
179    fig.savefig(path / "gradient_norms.png", dpi=200)
180    plt.close(fig)
def plot_loss_curves( traces: Mapping[str, experiments.OptimizationTrace], plot_dir: str, u_star: float | None = None) -> None:
 48def plot_loss_curves(
 49    traces: Mapping[str, OptimizationTrace],
 50    plot_dir: str,
 51    u_star: Optional[float] = None,
 52) -> None:
 53    trace_items = _ordered_traces(traces)
 54    if not trace_items:
 55        return
 56    path = _ensure_plot_dir(plot_dir)
 57    if u_star is not None:
 58        fig, axes = plt.subplots(2, 1, figsize=(8, 7), sharex=True)
 59        ax_loss, ax_dist = axes
 60    else:
 61        fig, ax_loss = plt.subplots(1, 1, figsize=(8, 4.5))
 62        ax_dist = None
 63
 64    for name, trace in trace_items:
 65        style = ESTIMATOR_STYLES[name]
 66        ax_loss.plot(
 67            trace.steps,
 68            trace.objective_values,
 69            label=style["label"],
 70            color=style["color"],
 71            alpha=_LINE_ALPHA,
 72            linewidth=_LINE_WIDTH,
 73            marker=style["marker"],
 74            markersize=_MARKER_SIZE,
 75            markevery=_marker_every(len(trace.steps)),
 76        )
 77    ax_loss.set_ylabel("Objective value")
 78    ax_loss.legend()
 79    ax_loss.grid(True, alpha=0.3)
 80
 81    if ax_dist is not None and u_star is not None:
 82        for name, trace in trace_items:
 83            style = ESTIMATOR_STYLES[name]
 84            dist_values = [abs(u - u_star) for u in trace.u_values]
 85            ax_dist.plot(
 86                trace.steps,
 87                dist_values,
 88                label=style["label"],
 89                color=style["color"],
 90                alpha=_LINE_ALPHA,
 91                linewidth=_LINE_WIDTH,
 92                marker=style["marker"],
 93                markersize=_MARKER_SIZE,
 94                markevery=_marker_every(len(trace.steps)),
 95            )
 96        ax_dist.set_ylabel("|u - u*|")
 97        ax_dist.set_xlabel("Step")
 98        ax_dist.legend()
 99        ax_dist.grid(True, alpha=0.3)
100    else:
101        ax_loss.set_xlabel("Step")
102
103    fig.tight_layout()
104    fig.savefig(path / "loss_curves.png", dpi=200)
105    plt.close(fig)
def plot_objective_u_slice( x_samples: numpy.ndarray, objective: objective.Objective, traces: Mapping[str, experiments.OptimizationTrace], plot_dir: str, u_star: float | None = None) -> None:
227def plot_objective_u_slice(
228    x_samples: np.ndarray,
229    objective: Objective,
230    traces: Mapping[str, OptimizationTrace],
231    plot_dir: str,
232    u_star: Optional[float] = None,
233) -> None:
234    """Plot objective value as a function of u.
235
236    Uses the objective's value_at_u method for computing values at fixed u.
237    """
238    trace_items = _ordered_traces(traces)
239    if not trace_items:
240        return
241    path = _ensure_plot_dir(plot_dir)
242    x_arr = np.asarray(x_samples, dtype=float)
243    if x_arr.ndim != 2:
244        raise ValueError("x_samples must be a 2D array.")
245
246    u_values: list[float] = []
247    for _, trace in trace_items:
248        u_values.extend(list(trace.u_values))
249    if u_star is not None:
250        u_values.append(float(u_star))
251    if u_values:
252        u_min = float(min(u_values))
253        u_max = float(max(u_values))
254        if np.isclose(u_min, u_max):
255            pad = 0.1 if u_min == 0.0 else abs(u_min) * 0.1
256        else:
257            pad = 0.1 * (u_max - u_min)
258        u_grid = np.linspace(u_min - pad, u_max + pad, 200)
259    else:
260        u_grid = np.linspace(0.5, 1.5, 200)
261
262    # Use value_at_u method if available
263    value_at_u_fn = getattr(objective, "value_at_u", None)
264    if not callable(value_at_u_fn):
265        return
266    value_at_u_typed = cast(Callable[[np.ndarray, float], float], value_at_u_fn)
267
268    def value_at_u_scalar(u: float) -> float:
269        return float(value_at_u_typed(x_arr, float(u)))
270
271    obj_grid = [value_at_u_scalar(float(u)) for u in u_grid]
272
273    fig, ax = plt.subplots(1, 1, figsize=(8, 5))
274
275    ax.plot(u_grid, obj_grid, color="black", label="objective", alpha=_LINE_ALPHA, linewidth=_LINE_WIDTH)
276    for name, trace in trace_items:
277        style = ESTIMATOR_STYLES[name]
278        zorder = 4 if name == "gauss_stein" else 3
279        ax.scatter(
280            trace.u_values,
281            trace.objective_values,
282            color=style["color"],
283            label=style["label"],
284            marker=style["marker"],
285            edgecolors=style["color"],
286            linewidths=0.4,
287            alpha=0.75,
288            zorder=zorder,
289        )
290    ax.set_ylabel("Objective value")
291    ax.set_xlabel("u")
292    if u_star is not None:
293        ax.axvline(
294            u_star,
295            color="#444444",
296            linestyle="--",
297            linewidth=1.2,
298            alpha=0.8,
299            label="u*",
300        )
301    ax.legend()
302    ax.grid(True, alpha=0.3)
303
304    fig.tight_layout()
305    fig.savefig(path / "objective_u_slice.png", dpi=200)
306    plt.close(fig)

Plot objective value as a function of u.

Uses the objective's value_at_u method for computing values at fixed u.

def plot_step_sizes( traces: Mapping[str, experiments.OptimizationTrace], plot_dir: str) -> None:
183def plot_step_sizes(
184    traces: Mapping[str, OptimizationTrace],
185    plot_dir: str,
186) -> None:
187    trace_items = _ordered_traces(traces)
188    if not trace_items:
189        return
190    path = _ensure_plot_dir(plot_dir)
191    fig, ax = plt.subplots(1, 1, figsize=(8, 4.5))
192    has_series = False
193
194    for name, trace in trace_items:
195        if trace.step_sizes is None:
196            continue
197        if len(trace.step_sizes) != len(trace.steps):
198            raise ValueError("step_sizes must match steps length for plotting.")
199        style = ESTIMATOR_STYLES[name]
200        ax.plot(
201            trace.steps,
202            trace.step_sizes,
203            label=style["label"],
204            color=style["color"],
205            alpha=_LINE_ALPHA,
206            linewidth=_LINE_WIDTH,
207            marker=style["marker"],
208            markersize=_MARKER_SIZE,
209            markevery=_marker_every(len(trace.steps)),
210        )
211        has_series = True
212
213    if not has_series:
214        plt.close(fig)
215        return
216
217    ax.set_ylabel("Step size")
218    ax.set_yscale("log")
219    ax.set_xlabel("Step")
220    ax.legend()
221    ax.grid(True, alpha=0.3)
222    fig.tight_layout()
223    fig.savefig(path / "step_sizes.png", dpi=200)
224    plt.close(fig)
def plot_theta_objective_contours( x_samples: numpy.ndarray, objective: objective.Objective, theta_base: numpy.ndarray, plot_dir: str, axis_indices: tuple[int, int] = (0, 1), axis_labels: tuple[str, str] | None = None, theta_refs: Sequence[numpy.ndarray] | None = None, theta_points: Sequence[tuple[numpy.ndarray, str, str, str]] | None = None, traces: Mapping[str, experiments.OptimizationTrace] | None = None, grid_size: int = 60, levels: int = 15, filename: str = 'theta_objective_contours.png') -> None:
387def plot_theta_objective_contours(
388    x_samples: np.ndarray,
389    objective: Objective,
390    theta_base: np.ndarray,
391    plot_dir: str,
392    axis_indices: tuple[int, int] = (0, 1),
393    axis_labels: Optional[tuple[str, str]] = None,
394    theta_refs: Optional[Sequence[np.ndarray]] = None,
395    theta_points: Optional[Sequence[tuple[np.ndarray, str, str, str]]] = None,
396    traces: Optional[Mapping[str, OptimizationTrace]] = None,
397    grid_size: int = 60,
398    levels: int = 15,
399    filename: str = "theta_objective_contours.png",
400) -> None:
401    x_arr = np.asarray(x_samples, dtype=float)
402    if x_arr.ndim != 2:
403        raise ValueError("x_samples must be a 2D array.")
404    path = _ensure_plot_dir(plot_dir)
405    grid_x, grid_y, objective_grid = theta_objective_contour_grid(
406        x_arr,
407        objective,
408        theta_base,
409        axis_indices=axis_indices,
410        theta_refs=theta_refs,
411        grid_size=grid_size,
412    )
413
414    fig, ax = plt.subplots(1, 1, figsize=(7.5, 6))
415    contour = ax.contourf(grid_x, grid_y, objective_grid, levels=levels, cmap="viridis")
416    ax.contour(grid_x, grid_y, objective_grid, levels=levels, colors="black", linewidths=0.4, alpha=0.35)
417    colorbar = fig.colorbar(contour, ax=ax)
418    colorbar.set_label("Objective value")
419
420    if axis_labels is None:
421        ax.set_xlabel(f"theta[{axis_indices[0]}]")
422        ax.set_ylabel(f"theta[{axis_indices[1]}]")
423    else:
424        ax.set_xlabel(axis_labels[0])
425        ax.set_ylabel(axis_labels[1])
426    ax.set_title("Objective contour over theta slice")
427
428    show_legend = False
429    if traces is not None:
430        for name, trace in _ordered_traces(traces):
431            if trace.theta_values is None:
432                continue
433            style = ESTIMATOR_STYLES[name]
434            theta_path = np.asarray(trace.theta_values, dtype=float)
435            ax.plot(
436                theta_path[:, axis_indices[0]],
437                theta_path[:, axis_indices[1]],
438                color=style["color"],
439                alpha=_LINE_ALPHA,
440                linewidth=_LINE_WIDTH,
441                marker=style["marker"],
442                markersize=_MARKER_SIZE,
443                markevery=_marker_every(theta_path.shape[0]),
444                label=f"{style['label']} path",
445            )
446            show_legend = True
447
448    if theta_points is not None:
449        for theta, label, color, marker in theta_points:
450            theta_arr = np.asarray(theta, dtype=float)
451            ax.scatter(
452                [theta_arr[axis_indices[0]]],
453                [theta_arr[axis_indices[1]]],
454                label=label,
455                color=color,
456                marker=marker,
457                edgecolors=color,
458                linewidths=0.5,
459                alpha=0.6,
460                zorder=5,
461            )
462        show_legend = True
463
464    if show_legend:
465        ax.legend()
466
467    fig.tight_layout()
468    fig.savefig(path / filename, dpi=200)
469    plt.close(fig)
def select_theta_axes_max_variance(theta_points: Sequence[numpy.ndarray]) -> tuple[int, int]:
334def select_theta_axes_max_variance(theta_points: Sequence[np.ndarray]) -> tuple[int, int]:
335    if not theta_points:
336        raise ValueError("theta_points must contain at least one theta array.")
337    theta_stack = np.asarray([np.asarray(theta, dtype=float) for theta in theta_points])
338    if theta_stack.ndim != 2:
339        raise ValueError("theta_points must be a sequence of 1D arrays with matching sizes.")
340    if theta_stack.shape[1] < 2:
341        raise ValueError("theta_points must have at least two dimensions.")
342    variances = np.var(theta_stack, axis=0)
343    top_two = np.argsort(variances)[-2:]
344    ordered = top_two[np.argsort(variances[top_two])[::-1]]
345    return int(ordered[0]), int(ordered[1])
def theta_objective_contour_grid( x_samples: numpy.ndarray, objective: objective.Objective, theta_base: numpy.ndarray, axis_indices: tuple[int, int] = (0, 1), theta_refs: Sequence[numpy.ndarray] | None = None, grid_size: int = 60, pad_ratio: float = 0.2, min_pad: float = 0.05) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:
348def theta_objective_contour_grid(
349    x_samples: np.ndarray,
350    objective: Objective,
351    theta_base: np.ndarray,
352    axis_indices: tuple[int, int] = (0, 1),
353    theta_refs: Optional[Sequence[np.ndarray]] = None,
354    grid_size: int = 60,
355    pad_ratio: float = 0.2,
356    min_pad: float = 0.05,
357) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
358    """Compute contour grid for theta-level objective."""
359    x_arr = np.asarray(x_samples, dtype=float)
360    if x_arr.ndim != 2:
361        raise ValueError("x_samples must be a 2D array.")
362    theta_arr = np.asarray(theta_base, dtype=float)
363    if len(axis_indices) != 2:
364        raise ValueError("axis_indices must contain exactly two indices.")
365    if axis_indices[0] == axis_indices[1]:
366        raise ValueError("axis_indices must refer to two distinct components.")
367    if any(index < 0 or index >= theta_arr.size for index in axis_indices):
368        raise ValueError("axis_indices must be valid indices for theta.")
369    if grid_size <= 1:
370        raise ValueError("grid_size must be greater than 1.")
371
372    theta_x = _theta_axis_grid(theta_arr, axis_indices[0], theta_refs, grid_size, pad_ratio, min_pad)
373    theta_y = _theta_axis_grid(theta_arr, axis_indices[1], theta_refs, grid_size, pad_ratio, min_pad)
374    grid_x, grid_y = np.meshgrid(theta_x, theta_y)
375    objective_grid = np.zeros_like(grid_x, dtype=float)
376
377    for i in range(grid_size):
378        for j in range(grid_size):
379            theta = theta_arr.copy()
380            theta[axis_indices[0]] = grid_x[i, j]
381            theta[axis_indices[1]] = grid_y[i, j]
382            objective_grid[i, j] = float(objective.value(theta, x_arr))
383
384    return grid_x, grid_y, objective_grid

Compute contour grid for theta-level objective.