reporting
Reporting and visualization helpers for experiment outputs.
1"""Reporting and visualization helpers for experiment outputs.""" 2 3from reporting.logging import log_step, log_summary 4from reporting.visualization import ( 5 ESTIMATOR_STYLES, 6 plot_gradient_norms, 7 plot_loss_curves, 8 plot_objective_u_slice, 9 plot_step_sizes, 10 plot_theta_objective_contours, 11 select_theta_axes_max_variance, 12 theta_objective_contour_grid, 13) 14 15__all__ = [ 16 "ESTIMATOR_STYLES", 17 "log_step", 18 "log_summary", 19 "plot_gradient_norms", 20 "plot_loss_curves", 21 "plot_objective_u_slice", 22 "plot_step_sizes", 23 "plot_theta_objective_contours", 24 "select_theta_axes_max_variance", 25 "theta_objective_contour_grid", 26]
ESTIMATOR_STYLES =
{'first_order': {'label': 'first-order', 'color': '#1f77b4', 'marker': 'o'}, 'finite_difference': {'label': 'finite-difference', 'color': '#8c564b', 'marker': 'P'}, 'gauss_stein': {'label': 'gauss-stein', 'color': '#ff7f0e', 'marker': 's'}, 'stein_difference': {'label': 'stein-difference', 'color': '#2ca02c', 'marker': '^'}, 'spsa': {'label': 'SPSA', 'color': '#d62728', 'marker': 'D'}}
def
log_step( method: str, step: int, u: float, value: float, grad_norm: float | None = None, step_size: float | None = None) -> None:
14def log_step( 15 method: str, 16 step: int, 17 u: float, 18 value: float, 19 grad_norm: float | None = None, 20 step_size: float | None = None, 21) -> None: 22 """Print a single optimization step to console.""" 23 parts = [f"[{method}] step={step}", f"u={u:.4f}", f"value={value:.4f}"] 24 if grad_norm is not None: 25 parts.append(f"grad_norm={grad_norm:.4f}") 26 if step_size is not None: 27 parts.append(f"step_size={step_size:.6f}") 28 print(" ".join(parts))
Print a single optimization step to console.
31def log_summary(result: ExperimentResult) -> None: 32 def format_array(values: object, precision: int = 3) -> str: 33 arr = np.asarray(values, dtype=float) 34 formatted = ", ".join(f"{val:.{precision}f}" for val in arr) 35 return f"[{formatted}]" 36 37 config = result.config 38 objective = config.objective 39 u_star: Optional[float] = result.u_star 40 value_at_u_star: Optional[float] = result.value_at_u_star 41 42 if isinstance(objective, FixedRegressionObjective): 43 beta_1 = format_array(objective.beta_1) 44 beta_2 = objective.beta_2 45 beta_3 = format_array(objective.beta_3) 46 beta_4 = objective.beta_4 47 print( 48 "Objective: f(u; x) = sigmoid(beta_1·x + beta_2*u) * (beta_3·x - beta_4*u)" 49 ) 50 print( 51 "Betas: " 52 f"beta_1={beta_1}, beta_2={beta_2:.3f}, beta_3={beta_3}, beta_4={beta_4:.3f}" 53 ) 54 elif isinstance(objective, PlantedLogisticObjective): 55 beta = format_array(objective.beta) 56 print("Objective: L(u; x) = log(1 + exp(z)) - p*(x) * z") 57 print("z = alpha * u + beta·x + bias") 58 print("p*(x) = sigmoid(alpha * u* + beta·x + bias)") 59 print( 60 "Params: " 61 f"alpha={objective.alpha:.3f}, bias={objective.bias:.3f}, " 62 f"u*={objective.u_star:.3f}, beta={beta}" 63 ) 64 else: 65 print(f"Objective: {type(objective).__name__}") 66 67 print( 68 "Run: " 69 f"steps={config.t_steps}, n_samples={config.n_samples}, step_size={config.step_size:.4f}, " 70 f"step_rule={config.step_rule}" 71 ) 72 print(f"Initial objective value: {result.initial_value:.4f}") 73 u_star_value = float(u_star) if u_star is not None else None 74 value_at_u_star_value = ( 75 float(value_at_u_star) if value_at_u_star is not None else None 76 ) 77 if u_star_value is not None: 78 print(f"Known optimum u*: {u_star_value:.4f}") 79 if value_at_u_star_value is not None: 80 print(f"Objective at u*: {value_at_u_star_value:.4f}") 81 print("=== Results ===") 82 83 order = ("first_order", "finite_difference", "gauss_stein", "stein_difference", "spsa") 84 labels = { 85 "first_order": "first-order", 86 "finite_difference": "finite-difference", 87 "gauss_stein": "gauss-stein", 88 "stein_difference": "stein-difference", 89 "spsa": "SPSA", 90 } 91 ordered = [name for name in order if name in result.results] 92 if ordered: 93 final_u = ", ".join( 94 f"{labels[name]}={float(result.results[name].u):.4f}" for name in ordered 95 ) 96 final_value = ", ".join( 97 f"{labels[name]}={float(result.results[name].value):.4f}" for name in ordered 98 ) 99 print(f"Final u: {final_u}") 100 print(f"Final objective: {final_value}") 101 if u_star_value is not None: 102 u_gap = ", ".join( 103 f"{labels[name]}={abs(result.results[name].u - u_star_value):.4f}" 104 for name in ordered 105 ) 106 print(f"|u - u*|: {u_gap}") 107 if value_at_u_star_value is not None: 108 value_gap = ", ".join( 109 f"{labels[name]}={result.results[name].value - value_at_u_star_value:.4f}" 110 for name in ordered 111 ) 112 print(f"Objective gap: {value_gap}") 113 print(f"Initial theta: {format_array(config.theta0)}") 114 for name in ordered: 115 theta = result.results[name].theta 116 theta_l2 = float(np.linalg.norm(theta)) 117 theta_delta_l2 = float(np.linalg.norm(theta - config.theta0)) 118 print(f"Final theta ({labels[name]}): {format_array(theta)}") 119 print( 120 f"Final theta norms ({labels[name]}): " 121 f"||theta||_2={theta_l2:.4f}, ||theta-theta0||_2={theta_delta_l2:.4f}" 122 ) 123 print("=== Runtime (s) ===") 124 for name in ordered: 125 runtime = float(result.results[name].time) 126 print(f"{labels[name]}: {runtime:.4f}")
def
plot_gradient_norms( traces: Mapping[str, experiments.OptimizationTrace], plot_dir: str) -> None:
108def plot_gradient_norms( 109 traces: Mapping[str, OptimizationTrace], 110 plot_dir: str, 111) -> None: 112 trace_items = _ordered_traces(traces) 113 if not trace_items: 114 return 115 path = _ensure_plot_dir(plot_dir) 116 has_true = any(trace.true_theta_grad_norms is not None for _, trace in trace_items) 117 has_est = any(trace.theta_grad_norms is not None for _, trace in trace_items) 118 119 if has_true and has_est: 120 fig, axes = plt.subplots(2, 1, figsize=(8, 7), sharex=True) 121 ax_norm, ax_err = axes 122 else: 123 fig, ax_norm = plt.subplots(1, 1, figsize=(8, 4.5)) 124 ax_err = None 125 126 for name, trace in trace_items: 127 series = trace.true_theta_grad_norms 128 if series is None: 129 series = trace.theta_grad_norms 130 if series is None: 131 raise ValueError("theta_grad_norms or true_theta_grad_norms must be provided.") 132 style = ESTIMATOR_STYLES[name] 133 ax_norm.plot( 134 trace.steps, 135 series, 136 label=style["label"], 137 color=style["color"], 138 alpha=_LINE_ALPHA, 139 linewidth=_LINE_WIDTH, 140 marker=style["marker"], 141 markersize=_MARKER_SIZE, 142 markevery=_marker_every(len(trace.steps)), 143 ) 144 if has_true: 145 ax_norm.set_ylabel("|theta grad norm| (true)") 146 else: 147 ax_norm.set_ylabel("|theta grad norm|") 148 ax_norm.legend() 149 ax_norm.grid(True, alpha=0.3) 150 151 if ax_err is not None: 152 for name, trace in trace_items: 153 if trace.true_theta_grad_norms is None or trace.theta_grad_norms is None: 154 continue 155 style = ESTIMATOR_STYLES[name] 156 err_values = [ 157 abs(g - t) 158 for g, t in zip(trace.theta_grad_norms, trace.true_theta_grad_norms) 159 ] 160 ax_err.plot( 161 trace.steps, 162 err_values, 163 label=f"{style['label']} error", 164 color=style["color"], 165 alpha=_LINE_ALPHA, 166 linewidth=_LINE_WIDTH, 167 marker=style["marker"], 168 markersize=_MARKER_SIZE, 169 markevery=_marker_every(len(trace.steps)), 170 ) 171 ax_err.set_ylabel("|norm error|") 172 ax_err.set_xlabel("Step") 173 ax_err.legend() 174 ax_err.grid(True, alpha=0.3) 175 else: 176 ax_norm.set_xlabel("Step") 177 178 fig.tight_layout() 179 fig.savefig(path / "gradient_norms.png", dpi=200) 180 plt.close(fig)
def
plot_loss_curves( traces: Mapping[str, experiments.OptimizationTrace], plot_dir: str, u_star: float | None = None) -> None:
48def plot_loss_curves( 49 traces: Mapping[str, OptimizationTrace], 50 plot_dir: str, 51 u_star: Optional[float] = None, 52) -> None: 53 trace_items = _ordered_traces(traces) 54 if not trace_items: 55 return 56 path = _ensure_plot_dir(plot_dir) 57 if u_star is not None: 58 fig, axes = plt.subplots(2, 1, figsize=(8, 7), sharex=True) 59 ax_loss, ax_dist = axes 60 else: 61 fig, ax_loss = plt.subplots(1, 1, figsize=(8, 4.5)) 62 ax_dist = None 63 64 for name, trace in trace_items: 65 style = ESTIMATOR_STYLES[name] 66 ax_loss.plot( 67 trace.steps, 68 trace.objective_values, 69 label=style["label"], 70 color=style["color"], 71 alpha=_LINE_ALPHA, 72 linewidth=_LINE_WIDTH, 73 marker=style["marker"], 74 markersize=_MARKER_SIZE, 75 markevery=_marker_every(len(trace.steps)), 76 ) 77 ax_loss.set_ylabel("Objective value") 78 ax_loss.legend() 79 ax_loss.grid(True, alpha=0.3) 80 81 if ax_dist is not None and u_star is not None: 82 for name, trace in trace_items: 83 style = ESTIMATOR_STYLES[name] 84 dist_values = [abs(u - u_star) for u in trace.u_values] 85 ax_dist.plot( 86 trace.steps, 87 dist_values, 88 label=style["label"], 89 color=style["color"], 90 alpha=_LINE_ALPHA, 91 linewidth=_LINE_WIDTH, 92 marker=style["marker"], 93 markersize=_MARKER_SIZE, 94 markevery=_marker_every(len(trace.steps)), 95 ) 96 ax_dist.set_ylabel("|u - u*|") 97 ax_dist.set_xlabel("Step") 98 ax_dist.legend() 99 ax_dist.grid(True, alpha=0.3) 100 else: 101 ax_loss.set_xlabel("Step") 102 103 fig.tight_layout() 104 fig.savefig(path / "loss_curves.png", dpi=200) 105 plt.close(fig)
def
plot_objective_u_slice( x_samples: numpy.ndarray, objective: objective.Objective, traces: Mapping[str, experiments.OptimizationTrace], plot_dir: str, u_star: float | None = None) -> None:
227def plot_objective_u_slice( 228 x_samples: np.ndarray, 229 objective: Objective, 230 traces: Mapping[str, OptimizationTrace], 231 plot_dir: str, 232 u_star: Optional[float] = None, 233) -> None: 234 """Plot objective value as a function of u. 235 236 Uses the objective's value_at_u method for computing values at fixed u. 237 """ 238 trace_items = _ordered_traces(traces) 239 if not trace_items: 240 return 241 path = _ensure_plot_dir(plot_dir) 242 x_arr = np.asarray(x_samples, dtype=float) 243 if x_arr.ndim != 2: 244 raise ValueError("x_samples must be a 2D array.") 245 246 u_values: list[float] = [] 247 for _, trace in trace_items: 248 u_values.extend(list(trace.u_values)) 249 if u_star is not None: 250 u_values.append(float(u_star)) 251 if u_values: 252 u_min = float(min(u_values)) 253 u_max = float(max(u_values)) 254 if np.isclose(u_min, u_max): 255 pad = 0.1 if u_min == 0.0 else abs(u_min) * 0.1 256 else: 257 pad = 0.1 * (u_max - u_min) 258 u_grid = np.linspace(u_min - pad, u_max + pad, 200) 259 else: 260 u_grid = np.linspace(0.5, 1.5, 200) 261 262 # Use value_at_u method if available 263 value_at_u_fn = getattr(objective, "value_at_u", None) 264 if not callable(value_at_u_fn): 265 return 266 value_at_u_typed = cast(Callable[[np.ndarray, float], float], value_at_u_fn) 267 268 def value_at_u_scalar(u: float) -> float: 269 return float(value_at_u_typed(x_arr, float(u))) 270 271 obj_grid = [value_at_u_scalar(float(u)) for u in u_grid] 272 273 fig, ax = plt.subplots(1, 1, figsize=(8, 5)) 274 275 ax.plot(u_grid, obj_grid, color="black", label="objective", alpha=_LINE_ALPHA, linewidth=_LINE_WIDTH) 276 for name, trace in trace_items: 277 style = ESTIMATOR_STYLES[name] 278 zorder = 4 if name == "gauss_stein" else 3 279 ax.scatter( 280 trace.u_values, 281 trace.objective_values, 282 color=style["color"], 283 label=style["label"], 284 marker=style["marker"], 285 edgecolors=style["color"], 286 linewidths=0.4, 287 alpha=0.75, 288 zorder=zorder, 289 ) 290 ax.set_ylabel("Objective value") 291 ax.set_xlabel("u") 292 if u_star is not None: 293 ax.axvline( 294 u_star, 295 color="#444444", 296 linestyle="--", 297 linewidth=1.2, 298 alpha=0.8, 299 label="u*", 300 ) 301 ax.legend() 302 ax.grid(True, alpha=0.3) 303 304 fig.tight_layout() 305 fig.savefig(path / "objective_u_slice.png", dpi=200) 306 plt.close(fig)
Plot objective value as a function of u.
Uses the objective's value_at_u method for computing values at fixed u.
183def plot_step_sizes( 184 traces: Mapping[str, OptimizationTrace], 185 plot_dir: str, 186) -> None: 187 trace_items = _ordered_traces(traces) 188 if not trace_items: 189 return 190 path = _ensure_plot_dir(plot_dir) 191 fig, ax = plt.subplots(1, 1, figsize=(8, 4.5)) 192 has_series = False 193 194 for name, trace in trace_items: 195 if trace.step_sizes is None: 196 continue 197 if len(trace.step_sizes) != len(trace.steps): 198 raise ValueError("step_sizes must match steps length for plotting.") 199 style = ESTIMATOR_STYLES[name] 200 ax.plot( 201 trace.steps, 202 trace.step_sizes, 203 label=style["label"], 204 color=style["color"], 205 alpha=_LINE_ALPHA, 206 linewidth=_LINE_WIDTH, 207 marker=style["marker"], 208 markersize=_MARKER_SIZE, 209 markevery=_marker_every(len(trace.steps)), 210 ) 211 has_series = True 212 213 if not has_series: 214 plt.close(fig) 215 return 216 217 ax.set_ylabel("Step size") 218 ax.set_yscale("log") 219 ax.set_xlabel("Step") 220 ax.legend() 221 ax.grid(True, alpha=0.3) 222 fig.tight_layout() 223 fig.savefig(path / "step_sizes.png", dpi=200) 224 plt.close(fig)
def
plot_theta_objective_contours( x_samples: numpy.ndarray, objective: objective.Objective, theta_base: numpy.ndarray, plot_dir: str, axis_indices: tuple[int, int] = (0, 1), axis_labels: tuple[str, str] | None = None, theta_refs: Sequence[numpy.ndarray] | None = None, theta_points: Sequence[tuple[numpy.ndarray, str, str, str]] | None = None, traces: Mapping[str, experiments.OptimizationTrace] | None = None, grid_size: int = 60, levels: int = 15, filename: str = 'theta_objective_contours.png') -> None:
387def plot_theta_objective_contours( 388 x_samples: np.ndarray, 389 objective: Objective, 390 theta_base: np.ndarray, 391 plot_dir: str, 392 axis_indices: tuple[int, int] = (0, 1), 393 axis_labels: Optional[tuple[str, str]] = None, 394 theta_refs: Optional[Sequence[np.ndarray]] = None, 395 theta_points: Optional[Sequence[tuple[np.ndarray, str, str, str]]] = None, 396 traces: Optional[Mapping[str, OptimizationTrace]] = None, 397 grid_size: int = 60, 398 levels: int = 15, 399 filename: str = "theta_objective_contours.png", 400) -> None: 401 x_arr = np.asarray(x_samples, dtype=float) 402 if x_arr.ndim != 2: 403 raise ValueError("x_samples must be a 2D array.") 404 path = _ensure_plot_dir(plot_dir) 405 grid_x, grid_y, objective_grid = theta_objective_contour_grid( 406 x_arr, 407 objective, 408 theta_base, 409 axis_indices=axis_indices, 410 theta_refs=theta_refs, 411 grid_size=grid_size, 412 ) 413 414 fig, ax = plt.subplots(1, 1, figsize=(7.5, 6)) 415 contour = ax.contourf(grid_x, grid_y, objective_grid, levels=levels, cmap="viridis") 416 ax.contour(grid_x, grid_y, objective_grid, levels=levels, colors="black", linewidths=0.4, alpha=0.35) 417 colorbar = fig.colorbar(contour, ax=ax) 418 colorbar.set_label("Objective value") 419 420 if axis_labels is None: 421 ax.set_xlabel(f"theta[{axis_indices[0]}]") 422 ax.set_ylabel(f"theta[{axis_indices[1]}]") 423 else: 424 ax.set_xlabel(axis_labels[0]) 425 ax.set_ylabel(axis_labels[1]) 426 ax.set_title("Objective contour over theta slice") 427 428 show_legend = False 429 if traces is not None: 430 for name, trace in _ordered_traces(traces): 431 if trace.theta_values is None: 432 continue 433 style = ESTIMATOR_STYLES[name] 434 theta_path = np.asarray(trace.theta_values, dtype=float) 435 ax.plot( 436 theta_path[:, axis_indices[0]], 437 theta_path[:, axis_indices[1]], 438 color=style["color"], 439 alpha=_LINE_ALPHA, 440 linewidth=_LINE_WIDTH, 441 marker=style["marker"], 442 markersize=_MARKER_SIZE, 443 markevery=_marker_every(theta_path.shape[0]), 444 label=f"{style['label']} path", 445 ) 446 show_legend = True 447 448 if theta_points is not None: 449 for theta, label, color, marker in theta_points: 450 theta_arr = np.asarray(theta, dtype=float) 451 ax.scatter( 452 [theta_arr[axis_indices[0]]], 453 [theta_arr[axis_indices[1]]], 454 label=label, 455 color=color, 456 marker=marker, 457 edgecolors=color, 458 linewidths=0.5, 459 alpha=0.6, 460 zorder=5, 461 ) 462 show_legend = True 463 464 if show_legend: 465 ax.legend() 466 467 fig.tight_layout() 468 fig.savefig(path / filename, dpi=200) 469 plt.close(fig)
def
select_theta_axes_max_variance(theta_points: Sequence[numpy.ndarray]) -> tuple[int, int]:
334def select_theta_axes_max_variance(theta_points: Sequence[np.ndarray]) -> tuple[int, int]: 335 if not theta_points: 336 raise ValueError("theta_points must contain at least one theta array.") 337 theta_stack = np.asarray([np.asarray(theta, dtype=float) for theta in theta_points]) 338 if theta_stack.ndim != 2: 339 raise ValueError("theta_points must be a sequence of 1D arrays with matching sizes.") 340 if theta_stack.shape[1] < 2: 341 raise ValueError("theta_points must have at least two dimensions.") 342 variances = np.var(theta_stack, axis=0) 343 top_two = np.argsort(variances)[-2:] 344 ordered = top_two[np.argsort(variances[top_two])[::-1]] 345 return int(ordered[0]), int(ordered[1])
def
theta_objective_contour_grid( x_samples: numpy.ndarray, objective: objective.Objective, theta_base: numpy.ndarray, axis_indices: tuple[int, int] = (0, 1), theta_refs: Sequence[numpy.ndarray] | None = None, grid_size: int = 60, pad_ratio: float = 0.2, min_pad: float = 0.05) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:
348def theta_objective_contour_grid( 349 x_samples: np.ndarray, 350 objective: Objective, 351 theta_base: np.ndarray, 352 axis_indices: tuple[int, int] = (0, 1), 353 theta_refs: Optional[Sequence[np.ndarray]] = None, 354 grid_size: int = 60, 355 pad_ratio: float = 0.2, 356 min_pad: float = 0.05, 357) -> tuple[np.ndarray, np.ndarray, np.ndarray]: 358 """Compute contour grid for theta-level objective.""" 359 x_arr = np.asarray(x_samples, dtype=float) 360 if x_arr.ndim != 2: 361 raise ValueError("x_samples must be a 2D array.") 362 theta_arr = np.asarray(theta_base, dtype=float) 363 if len(axis_indices) != 2: 364 raise ValueError("axis_indices must contain exactly two indices.") 365 if axis_indices[0] == axis_indices[1]: 366 raise ValueError("axis_indices must refer to two distinct components.") 367 if any(index < 0 or index >= theta_arr.size for index in axis_indices): 368 raise ValueError("axis_indices must be valid indices for theta.") 369 if grid_size <= 1: 370 raise ValueError("grid_size must be greater than 1.") 371 372 theta_x = _theta_axis_grid(theta_arr, axis_indices[0], theta_refs, grid_size, pad_ratio, min_pad) 373 theta_y = _theta_axis_grid(theta_arr, axis_indices[1], theta_refs, grid_size, pad_ratio, min_pad) 374 grid_x, grid_y = np.meshgrid(theta_x, theta_y) 375 objective_grid = np.zeros_like(grid_x, dtype=float) 376 377 for i in range(grid_size): 378 for j in range(grid_size): 379 theta = theta_arr.copy() 380 theta[axis_indices[0]] = grid_x[i, j] 381 theta[axis_indices[1]] = grid_y[i, j] 382 objective_grid[i, j] = float(objective.value(theta, x_arr)) 383 384 return grid_x, grid_y, objective_grid
Compute contour grid for theta-level objective.