Skip to content

dreadnode.optimization

API reference for the dreadnode.optimization module.

SearchSpace = Mapping[str, Distribution | list[Primitive]]

Type alias for search space definitions.

StudyStopCondition = StopCondition[list[Trial[CandidateT]]]

Type alias for study stop conditions.

Signals that GEPA updated optimization budget usage.

Signals that GEPA accepted a proposed candidate.

Signals that GEPA rejected a proposed candidate.

Capability adapter that scores candidates against a provisioned task environment.

Each dataset row is evaluated by provisioning a TaskEnvironment via :func:dreadnode.task_env, rendering the task instruction, running the rebuilt agent, and invoking the configured scorers against the agent’s output. Scorers can read dreadnode.core.current_task_environment to reach the live sandbox (e.g. to shell-probe for a flag) while it is still provisioned.

Dataset row conventions

  • task_ref (optional): overrides the adapter’s default task ref on a per-row basis. Drives which task each trial provisions.
  • inputs (optional): per-row template bindings substituted into the task’s instruction. The primary mechanism for per-row variation.
  • Scoring fields (expected_output, needle, reward, etc.) for reward-recipe-based scoring.

The dataset’s goal field is explicitly NOT consulted: the task’s rendered instruction is the agent’s user message, and the capability’s mutable surfaces are the optimization target. “Injecting a different prompt per row” isn’t a capability_env concept — it’s a capability_agent concept, and that adapter should be used instead.

Attributes:

  • task_ref (str) –Default task reference passed to :func:dreadnode.task_env when a row does not override it.
  • timeout_sec (int | None) –Optional per-env provisioning timeout.
parallel_rows: int = Field(default=1, ge=1)

Maximum dataset rows to evaluate concurrently within one candidate’s evaluate() call. 1 preserves serial behaviour. Higher values provision that many TaskEnvironment sandboxes in parallel, so watch platform concurrency limits.

evaluate(
batch: list[dict[str, Any]],
candidate: dict[str, str],
*,
capture_traces: bool = False,
) -> OptimizationEvaluationBatch

Evaluate a candidate by running the rebuilt agent against per-row task envs.

evaluate_candidate(
candidate: dict[str, str],
example: dict[str, Any] | None = None,
) -> OptimizationEvaluation

Evaluate one candidate in GEPA-compatible (score, side_info) form.

Categorical(choices: list[Primitive])

Categorical distribution for discrete choices.

Parameters:

  • choices (list[Primitive]) –List of possible values.
Distribution()

Base class for all search space distributions.

Adapter that evaluates agent instruction candidates with Evaluation.

apply_candidate(candidate: dict[str, str]) -> Agent

Clone the agent and apply an instruction-only candidate.

evaluate(
batch: list[dict[str, Any]],
candidate: dict[str, str],
*,
capture_traces: bool = False,
) -> OptimizationEvaluationBatch

Evaluate one batch of examples and return per-example scores.

evaluate_candidate(
candidate: dict[str, str],
example: dict[str, Any] | None = None,
) -> OptimizationEvaluation

Evaluate one candidate in a GEPA-compatible (score, side_info) shape.

make_reflective_dataset(
candidate: dict[str, str],
eval_batch: OptimizationEvaluationBatch,
components_to_update: list[str],
) -> dict[str, list[dict[str, t.Any]]]

Build component-scoped reflective data for GEPA.

seed_candidate() -> dict[str, str]

Return the current instruction candidate for this agent.

Execution settings for the optimization engine.

to_gepa_kwargs() -> dict[str, t.Any]

Return GEPA-compatible keyword arguments for the engine config.

Float(
low: float,
high: float,
log: bool = False,
step: float | None = None,
)

Floating-point distribution for continuous parameters.

Parameters:

  • low (float) –Lower bound (inclusive).
  • high (float) –Upper bound (inclusive).
  • log (bool, default: False ) –If True, sample in log space.
  • step (float | None, default: None ) –Discretization step size.

GEPA-backed implementation of Dreadnode optimize_anything.

Int(low: int, high: int, log: bool = False, step: int = 1)

Integer distribution for discrete parameters.

Parameters:

  • low (int) –Lower bound (inclusive).
  • high (int) –Upper bound (inclusive).
  • log (bool, default: False ) –If True, sample in log space.
  • step (int, default: 1 ) –Step size between values.

Signals the start of an optimization iteration.

Merge-policy settings for candidate combination.

to_gepa_kwargs() -> dict[str, t.Any]

Return GEPA-compatible keyword arguments for merge settings.

Signals that a new best trial has been found.

Dreadnode-native optimize_anything executor.

effective_dataset: list[Any] | None

Return the trainset if provided, otherwise dataset.

optimization_id: UUID

Stable identifier for this optimization run.

console() -> OptimizationResult[CandidateT]

Run the optimization with a live console adapter.

Adapter contract for systems that need batched evaluation and reflection.

Base interface for optimization backends.

Raised when an optimization backend cannot execute a request.

Top-level configuration for Dreadnode optimize_anything runs.

Raised when an optimization backend dependency is unavailable.

Signals the end of an optimize_anything run.

Signals that optimize_anything failed before producing a result.

OptimizationEvaluation(
score: float | None = None,
scores: dict[str, float] = dict(),
side_info: dict[str, Any] = dict(),
evaluation_result: EvalResult[Any, Any] | None = None,
traces: Any = None,
)

Normalized evaluator output for optimize_anything.

OptimizationEvaluationBatch(
outputs: list[Any] = list(),
scores: list[float] = list(),
trajectories: list[Any] | None = None,
objective_scores: list[dict[str, float]] | None = None,
)

Batch evaluation data returned by Dreadnode-native adapters.

Callable used to score a text candidate.

Base event type for Dreadnode optimize_anything.

OptimizationResult(
backend: str,
seed_candidate: CandidateT | None = None,
best_candidate: CandidateT | None = None,
best_score: float | None = None,
best_scores: dict[str, float] = dict(),
objective: str | None = None,
train_size: int = 0,
val_size: int = 0,
pareto_frontier: list[CandidateT] = list(),
history: list[Any] = list(),
metadata: dict[str, Any] = dict(),
raw_result: Any = None,
)

Result of a Dreadnode optimize_anything run.

frontier_size: int

Return the number of candidates currently on the Pareto frontier.

to_dict() -> dict[str, t.Any]

Return a JSON-serializable result dictionary.

Signals the beginning of an optimize_anything run.

Signals that the Pareto frontier changed.

Candidate-refinement settings for optimize_anything.

to_gepa_kwargs() -> dict[str, t.Any]

Return GEPA-compatible keyword arguments for refiner settings.

Reflection-model settings passed through to GEPA.

to_gepa_kwargs() -> dict[str, t.Any]

Return GEPA-compatible keyword arguments for the reflection config.

Sample(
candidate: CandidateT, metadata: dict[str, Any] = dict()
)

A candidate proposed by a sampler.

Attributes:

  • candidate (CandidateT) –The candidate value to evaluate.
  • metadata (dict[str, Any]) –Optional metadata (e.g., parent_id for graph-based search).
parent_id: UUID | None

Convenience accessor for parent_id in metadata.

Base class for optimization samplers.

Samplers propose candidates and learn from evaluation results. Study controls the execution loop - samplers are passive.

The sample/tell interface:

  • sample(history) -> list[Sample]: Propose candidates to evaluate
  • tell(trials): Receive evaluation results

Example

class GridSampler(Sampler[dict]): def init(self, grid: dict[str, list]): self.combinations = list(itertools.product(*grid.values())) self.keys = list(grid.keys()) self.index = 0

def sample(self, history: list[Trial]) -> list[Sample]:
if self.exhausted:
return []
candidate = dict(zip(self.keys, self.combinations[self.index]))
self.index += 1
return [Sample(candidate)]
@property
def exhausted(self) -> bool:
return self.index >= len(self.combinations)
exhausted: bool

Check if sampler has no more candidates to propose.

Override for finite samplers (grid search, explicit candidate list). Default: never exhausted (infinite sampling).

Returns:

  • bool –True if sampler cannot propose more candidates.
reset() -> None

Reset sampler state for reuse.

Override if sampler maintains state that should be cleared between study runs.

sample(
history: list[Trial[CandidateT]],
) -> (
list[Sample[CandidateT]]
| t.Awaitable[list[Sample[CandidateT]]]
)

Propose candidates to evaluate.

Can be sync or async. If async (returns awaitable), Study will await it. This allows samplers that use async operations (like LLM calls) to generate candidates.

Parameters:

  • history (list[Trial[CandidateT]]) –All trials evaluated so far (completed, failed, or pruned).

Returns:

  • list[Sample[CandidateT]] | Awaitable[list[Sample[CandidateT]]] –List of samples to evaluate together as a batch.
  • list[Sample[CandidateT]] | Awaitable[list[Sample[CandidateT]]] –Return empty list to signal the sampler is exhausted.
  • list[Sample[CandidateT]] | Awaitable[list[Sample[CandidateT]]] –Can also return an awaitable that resolves to the list.
tell(trials: list[Trial[CandidateT]]) -> None

Receive evaluation results.

Called after each batch from sample() completes evaluation. Override to update internal state based on results.

Parameters:

  • trials (list[Trial[CandidateT]]) –Completed trials from the last sample() batch. Each trial has status, scores, and other result data.

Capability optimization that runs each trial through a real ManagedRuntimeClient session.

See OPTIMIZE_RUNTIME.MD §5 for the full design. Inherits seed, materialize, propose_new_texts, make_reflective_dataset from :class:StackAwareCapabilityAdapter and overrides evaluate + materialize_candidate (to write under Storage instead of tempfile) and _format_feedback (optional turn excerpt).

materialize_retention: Literal["all", "frontier_only"] = (
"frontier_only"
)

Which materialized capability trees to keep on disk after the optimization run terminates.

optimization_job_id: str | None = None

Threaded into Storage.optimization_job_path so materialized trees land under <storage>/optimizations/<job>/iter-N/<hash>/. The bridge that wraps the adapter (the same code that calls api.create_optimization_job) is expected to set this before the first evaluate call.

persist_sessions: Literal["all", "accepted", "none"] = "all"

Which trial sessions to persist. "accepted" is a future enhancement (deferred sync until candidate accept signal); first cut treats it the same as "all".

policy: str | dict[str, Any] = 'headless'

Policy name or dict passed to RuntimeClient.create_session. The headless policy contributes a max_steps hook automatically; pass a dict to override e.g. \{"name": "headless", "max_steps": 10\}.

system_prompt_append: str | None = None

Mirrors the CLI --system-prompt overlay; threaded into :class:ManagedRuntimeClient at boot.

task_ref: str | None = None

Optional task reference; if set, each row provisions dn.task_env. Mirrors :class:CapabilityEnvAdapter.

trace_excerpt_chars: int = 0

When >0, inline a tool-call summary into the reflective dataset’s Feedback field. Tunes how much trajectory context the GEPA reflection LM sees per row. Default off for parity with parent.

aclose() -> None

Shut down the in-process runtime. Safe to call multiple times.

evaluate(
batch: list[dict[str, Any]],
candidate: dict[str, str],
*,
capture_traces: bool = False,
) -> OptimizationEvaluationBatch

Materialize → register transient capability → drive trial sessions.

evaluate_candidate(
candidate: dict[str, str],
example: dict[str, Any] | None = None,
) -> OptimizationEvaluation

Single-row eval entry, GEPA-compatible (score, side_info) shape.

mark_frontier(candidate_hash: str) -> None

Pin a candidate’s materialized tree against frontier_only cleanup.

materialize_candidate(
candidate: dict[str, str],
*,
job_id: str | None = None,
iteration: int | None = None,
candidate_hash: str | None = None,
) -> MaterializedCapabilityCandidate

Materialize the candidate under Storage.optimization_candidate_path(job_id, iteration, hash).

Falls through to :meth:StackAwareCapabilityAdapter.materialize_candidate (which uses :class:tempfile.TemporaryDirectory) when called without optimization context — preserves the parent’s behavior for callers that don’t go through the adapter’s evaluate.

Capability-level adapter for stack-aware local optimization.

policy_factory: Callable[[], Any] | None = None

Optional factory returning a SessionPolicy whose extra_hooks() are layered into the agent on each evaluation (e.g. HeadlessSessionPolicy contributing a max_steps hook). Called per _build_agent.

proposal_enabled: bool

Whether this adapter exposes a custom candidate proposer.

registry: Any = None

Optional CapabilityRegistry for cross-capability tool/hook merging. When provided, registry.all_tools() + registry.all_hooks() are layered into the agent alongside the materialized capability’s own tools/hooks.

system_prompt_append: str | None = None

Mirrors the production CLI --system-prompt overlay; appended to the final system prompt by create_agent so optimization sees the same prompt-stack production does.

apply_candidate(candidate: dict[str, str]) -> t.Any

Build an agent from a materialized candidate workspace.

cleanup() -> None

Delete any materialized candidate workspaces retained by apply_candidate().

component_keys() -> list[str]

Return all editable component keys in stable order.

evaluate(
batch: list[dict[str, Any]],
candidate: dict[str, str],
*,
capture_traces: bool = False,
) -> OptimizationEvaluationBatch

Evaluate a candidate by rebuilding the capability and running Evaluation.

evaluate_candidate(
candidate: dict[str, str],
example: dict[str, Any] | None = None,
) -> OptimizationEvaluation

Evaluate one candidate in GEPA-compatible (score, side_info) form.

make_reflective_dataset(
candidate: dict[str, str],
eval_batch: OptimizationEvaluationBatch,
components_to_update: list[str],
) -> dict[str, list[dict[str, t.Any]]]

Build component-scoped reflective data for GEPA.

materialize_candidate(
candidate: dict[str, str],
) -> MaterializedCapabilityCandidate

Copy the capability to a temp workspace and apply candidate edits.

propose_new_texts(
candidate: dict[str, str],
reflective_dataset: dict[str, list[dict[str, Any]]],
components_to_update: list[str],
) -> dict[str, str]

Delegate candidate proposal to an optional proposer capability agent.

seed_candidate() -> dict[str, str]

Return the current flat candidate map for mutable capability surfaces.

Optimization study using a sampler and objective function.

Study controls the optimization loop:

  1. Ask sampler for candidates via sample()
  2. Evaluate candidates via objective function
  3. Inform sampler of results via tell()
  4. Repeat until stopping condition or sampler exhausted

Example

async def objective(candidate: dict) -> float:
agent = Agent(model=candidate['model'], temperature=candidate['temp'])
result = await agent.run("test prompt")
return compute_score(result)
study = Study(
name="optimize-agent",
objective=objective,
sampler=GridSampler({'model': ['gpt-4', 'claude'], 'temp': [0.5, 1.0]}),
direction="maximize",
)
result = await study.run()

Attributes:

  • objective (SkipValidation[ObjectiveFunc[CandidateT]]) –Function that takes a candidate and returns score(s).
  • sampler (SkipValidation[Sampler[CandidateT]]) –Sampler that proposes candidates and learns from results.
  • direction (Direction | list[Direction]) –“maximize” or “minimize” (or list for multi-objective).
  • n_iterations (int) –Maximum number of iterations (sample/tell cycles).
  • constraints (ScorersLike[CandidateT]) –Optional scorers to validate candidates before running.
  • stop_conditions (list[StudyStopCondition]) –Conditions that will stop the study early.
airt_assessment_id: str | None = None

AIRT assessment ID for platform linking.

airt_attack_domain: str | None = None

Attack domain: ‘generative’ or ‘adversarial_ml’.

airt_attack_name: str | None = None

AIRT attack type (tap, pair, goat, crescendo).

airt_attacker_model: str | None = None

Attacker model identifier.

airt_category: str | None = None

AIRT category tier (safety/security).

airt_distance_norm: str | None = None

Distance norm for ML attacks: ‘l0’, ‘l1’, ‘l2’, ‘linf’.

airt_evaluator_model: str | None = None

Evaluator/judge model identifier.

airt_goal: str | None = None

AIRT attack goal text.

airt_goal_category: str | None = None

AIRT goal category slug (e.g. cybersecurity, weapons).

airt_input_modality: str | None = None

Input modality: ‘image’, ‘tabular’, ‘text’.

airt_jailbreak_threshold: float = 0.5

Score threshold for classifying a trial as a jailbreak (default 0.5).

airt_original_class: str | None = None

Original classification label for ML attacks.

airt_perturbation_budget: float | None = None

Perturbation budget (epsilon) for ML attacks.

airt_sub_category: str | None = None

AIRT sub-category slug (e.g. cybersecurity, weapons).

airt_target_model: str | None = None

Target model identifier.

airt_transforms: list[str] | None = None

AIRT transforms applied to prompts.

compliance_tags: dict[str, Any] = Field(
default_factory=dict
)

Compliance framework tags (OWASP, ATLAS, SAIF, NIST) for this study.

constraints: ScorersLike[CandidateT] = Field(
default_factory=list
)

Scorers that validate candidates before evaluation. Trial is pruned if any fails.

direction: Direction | list[Direction] = 'maximize'

Optimization direction(s). Use list for multi-objective.

directions: list[Direction]

Get directions as list.

max_trials: int | None = None

Hard cap on total trial count. When set, the study stops after this many trials regardless of iteration count. This prevents batch expansion from generating excessive trials (e.g., beam_width * branching_factor per iteration).

n_iterations: int = Config(default=100, ge=1)

Maximum number of iterations (sample/tell cycles) to run.

objective: SkipValidation[ObjectiveFunc[CandidateT]]

Function that evaluates a candidate and returns score(s).

objective_names: list[str]

Get objective names (populated after first trial).

sampler: SkipValidation[Sampler[CandidateT]]

Sampler that proposes candidates to evaluate.

stop_conditions: list[StudyStopCondition] = Field(
default_factory=list
)

Conditions that stop the study early when met.

add_stop_condition(
condition: StudyStopCondition,
) -> te.Self

Add a stopping condition, returning a new Study.

console() -> StudyResult[CandidateT]

Run with live progress dashboard.

Signals the end of the study.

Base class for study-level events.

as_dict() -> dict[str, t.Any]

Serialize event for transport.

emit(span: TaskSpan) -> None

Emit this event’s telemetry to the span.

StudyResult(
trials: list[Trial[CandidateT]] = list(),
stop_reason: StudyStopReason = "unknown",
stop_explanation: str | None = None,
)

The final result of an optimization study, containing all trials and summary statistics.

Attributes:

  • trials (list[Trial[CandidateT]]) –A complete list of all trials generated during the study.
  • stop_reason (StudyStopReason) –The reason the study concluded.
  • stop_explanation (str | None) –A human-readable explanation for why the study stopped.
best_score: float | None

The highest score among all finished trials. Returns None if no trials succeeded.

best_trial: Trial[CandidateT] | None

The trial with the highest score among all finished trials. Returns None if no trials succeeded.

failed_trials: list[Trial[CandidateT]]

A list of all trials that failed.

finished_trials: int

Number of successfully finished trials.

pending_trials: list[Trial[CandidateT]]

A list of all trials that are still pending.

pruned_trials: list[Trial[CandidateT]]

A list of all trials that were pruned.

running_trials: list[Trial[CandidateT]]

A list of all trials that are currently running.

total_trials: int

Total number of trials.

to_dataframe() -> pd.DataFrame

Converts the trials into a pandas DataFrame for analysis.

to_dicts() -> list[dict[str, t.Any]]

Flattens the results into a list of dictionaries, one for each trial.

to_jsonl(path: str | Path) -> None

Saves the trials to a JSON Lines (JSONL) file.

Signals the beginning of a study.

Tracing and reflection-data settings for optimization runs.

to_gepa_kwargs() -> dict[str, t.Any]

Return GEPA-compatible keyword arguments for tracking settings.

Represents a single, evaluated point in the search space.

Attributes:

  • id (UUID) –Unique identifier for the trial.
  • candidate (CandidateT) –The candidate configuration being assessed.
  • status (TrialStatus) –Current status of the trial.
  • score (float) –The primary, single-value fitness score for this trial. This is an average of all objective scores for this trial adjusted based on their objective directions (higher is better).
  • eval_result (float) –Complete evaluation result of the trial and associated dataset.
  • pruning_reason (str | None) –Reason for pruning this trial, if applicable.
  • error (str | None) –Any error which occurred while processing this trial.
  • step (int) –The optimization step which produced this trial.
  • dataset (int) –The specific dataset used for probing.
  • created_at (datetime) –The creation timestamp of the trial.
all_scores: dict[str, float]

A dictionary of all named metric mean values from the evaluation result.

This includes scores not directly related to the objective.

score_breakdown: dict[str, list[float]]

Returns a breakdown of all objective scores across all samples in the evaluation result.

Returns:

  • dict[str, list[float]] –A dictionary where keys are objective names and values are lists of scores,
  • dict[str, list[float]] –with each score corresponding to a sample from the evaluation dataset.
__await__() -> t.Generator[t.Any, None, Trial[CandidateT]]

Await the completion of the trial.

done() -> bool

A non-blocking check to see if the trial’s evaluation is complete.

get_directional_score(
name: str | None = None, default: float = -float("inf")
) -> float

Get a specific named objective score - adjusted for optimization direction (higher is better), or the overall score if no name is given.

Parameters:

  • name (str | None, default: None ) –The name of the objective.
  • default (float, default: -float('inf') ) –The value to return if the named score is not found.
wait_for(
*trials: Trial[CandidateT],
) -> list[Trial[CandidateT]]

Await the completion of multiple trials.

Parameters:

  • *trials (Trial[CandidateT], default: () ) –The trials to wait for.

Returns:

  • list[Trial[CandidateT]] –A future that resolves to a list of completed trials.

Signals that a trial has completed successfully.

Base class for trial-level events. Linked to study via span hierarchy.

as_dict() -> dict[str, t.Any]

Serialize event for transport.

emit(span: TaskSpan) -> None

Emit this event’s telemetry to the span.

Signals that a trial has failed.

Signals that a trial was pruned (constraint not satisfied).

Signals the start of a trial.

Signals that GEPA finished a validation-set evaluation.

optimize_anything(
seed_candidate: CandidateT | None = None,
evaluator: OptimizationEvaluator[CandidateT]
| None = None,
*,
name: str | None = None,
description: str = "",
objective: str | None = None,
background: str | None = None,
dataset: list[Any] | None = None,
trainset: list[Any] | None = None,
valset: list[Any] | None = None,
config: OptimizationConfig | None = None,
backend: str | OptimizationBackend[CandidateT] = "gepa",
adapter: OptimizationAdapter[CandidateT] | None = None,
tags: list[str] | None = None,
label: str | None = None,
concurrency: int = 1,
) -> Optimization[CandidateT]

Construct a Dreadnode-native optimize_anything executor.