diff --git a/opto/features/priority_search/module_regressor.py b/opto/features/priority_search/module_regressor.py
index a92793c5..818df096 100644
--- a/opto/features/priority_search/module_regressor.py
+++ b/opto/features/priority_search/module_regressor.py
@@ -1,22 +1,115 @@
import numpy as np
-import copy
-from typing import Union
-from opto.trainer.loader import DataLoader
from opto.trainer.utils import batch_run, async_run
from opto.optimizers.utils import print_color
-# from opto.trainer.evaluators import evaluate
from typing import Union, List, Tuple, Dict, Any, Optional
-from collections import deque
-from opto.utils.llm import LLM # For the selector LLM
-# from opto.trace.nodes import ParameterNode
-import json
-# import warnings
-# from black import format_str, FileMode
-import random
-# import mathX
from opto.utils.auto_retry import retry_with_exponential_backoff
import litellm
import time
+from opto.features.priority_search.priority_search import ModuleCandidate
+
+
+def embed_text(model, text):
+ """Call the embedding API for a given model and text string.
+
+ This is a standalone function so users can easily replace it with a custom
+ embedding provider (e.g. local model, different API) without subclassing.
+ Must return a litellm-compatible response with response.data[0].embedding.
+ """
+ return litellm.embedding(model=model, input=text)
+
+
+class RegressorTemplate:
+ """Base class template for regression-based predictors for ModuleCandidate objects.
+
+ Provides common functionality for embedding generation and candidate processing.
+ Subclasses should implement update() and predict_scores() methods.
+
+ Regressors can be built on this template by implementing the update() and predict_scores() methods.
+ This class itself is enough for getting embeddings for candidates.
+ """
+
+ def __init__(self, embedding_model="gemini/gemini-embedding-001", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True,verbose: bool = False, max_candidates_to_predict=500,original_embedding_dim=768):
+ '''
+ Args:
+ embedding_model: The embedding model to use.
+ num_threads: The number of threads to use for the embedding generation.
+ regularization_strength: The regularization strength for the logistic regression.
+ linear_dim: The dimension of the linear space.
+ rich_text: Whether to use rich text for the parameter text.
+ verbose: Whether to print the verbose output.
+ max_candidates_to_predict: The maximum number of candidates to predict.
+ original_embedding_dim: The original dimension of the embedding.
+ '''
+ def _get_parameter_text(self, candidate):
+ """Get the parameter text for a ModuleCandidate."""
+ if not hasattr(candidate, 'update_dict'):
+ print(candidate)
+ assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict"
+ # Convert parameter nodes to readable names for deterministic embedding
+ params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()}
+ return str(params_with_names)
+
+
+ def _get_embedding(self, candidate,max_retries=10,base_delay=1.0):
+ """Get the embedding for a ModuleCandidate."""
+ parameter_text = self._get_parameter_text(candidate)
+
+ try:
+ response = retry_with_exponential_backoff(
+ lambda: embed_text(self.embedding_model, parameter_text),
+ max_retries=max_retries,
+ base_delay=base_delay,
+ operation_name="Embedding API call"
+ )
+ embedding = response.data[0].embedding
+ if self.random_projector is not None:
+ embedding_array = np.array(embedding).reshape(1, -1)
+ projected = self.random_projector.transform(embedding_array)
+ embedding = projected.flatten().tolist()
+ return embedding
+ except Exception as e:
+ print_color(f"ERROR: Embedding API call failed after retries: {e}", "red")
+ return None
+
+ def add_embeddings_to_candidates(self, candidates: List[ModuleCandidate]):
+ """Add embeddings to a list of candidates. This function could be used outside."""
+ self._update_memory_embeddings_for_batch(candidates)
+
+ def _update_memory_embeddings_for_batch(self, batch,max_workers=50,max_retries=10,base_delay=1.0):
+ """Update the embeddings for a batch of candidates."""
+ # Separate candidates that need embeddings from those that already have them
+ candidates_needing_embeddings = []
+ for candidate in batch:
+ if not hasattr(candidate, "embedding"):
+ candidates_needing_embeddings.append(candidate)
+
+ # Generate embeddings in parallel for candidates that need them
+ if candidates_needing_embeddings:
+ def get_embedding_for_candidate(candidate):
+ return self._get_embedding(candidate)
+
+ # Create function list for async_run
+ embedding_functions = [lambda c=candidate: get_embedding_for_candidate(c)
+ for candidate in candidates_needing_embeddings]
+
+ # Run embedding generation in parallel
+ new_embeddings = async_run(
+ embedding_functions,
+ max_workers=max_workers,
+ description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates"
+ )
+
+ # Assign embeddings back to candidates
+ for candidate, embedding in zip(candidates_needing_embeddings, new_embeddings):
+ candidate.embedding = embedding
+
+ def update(self, memory: List[Tuple[float, ModuleCandidate]]):
+ """Update the regression model parameters. Should be implemented by subclasses."""
+ raise NotImplementedError("Subclasses must implement the update method")
+
+ def predict_scores(self, memory: List[Tuple[float, ModuleCandidate]]):
+ """Predict scores for candidates. Should be implemented by subclasses."""
+ raise NotImplementedError("Subclasses must implement the predict_scores method")
class ModuleCandidateRegressor:
"""
@@ -25,9 +118,8 @@ class ModuleCandidateRegressor:
predict_scores has no parameters, it could return predicted scores for all candidates in the memory.
predict_scores_for_batch has one parameter, a batch of candidates, it could return predicted scores for the batch of candidates."""
- def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num_threads=None, learning_rate=0.2, regularization_strength=1e-4, max_iterations=20000, tolerance=5e-3):
- # In the regressor, no need for calling LLM to make the prediction. So we could predict the entire memory at once.
- self.max_candidates_to_predict = 500
+ def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num_threads=None, learning_rate=0.2, regularization_strength=1e-4, max_iterations=20000, tolerance=5e-3, max_candidates_to_predict=500, original_embedding_dim=768,patience=20,lr_decay_factor=0.8):
+ self.max_candidates_to_predict = max_candidates_to_predict
self.memory = memory
self.embedding_model = embedding_model
self.num_threads = num_threads
@@ -36,10 +128,9 @@ def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num
self.regularization_strength = regularization_strength # L2 regularization strength (lambda)
self.max_iterations = max_iterations
self.tolerance = tolerance
- self.patience = 20 # Early stopping patience
- self.lr_decay_factor = 0.8 # Learning rate decay factor
- # default linear dimension is 768
- self.linear_dim = 768
+ self.patience = patience # Early stopping patience
+ self.lr_decay_factor = lr_decay_factor # Learning rate decay factor
+ self.linear_dim = original_embedding_dim
# Initialize weights with larger values for more aggressive learning
self.weights = np.random.normal(0, 0.1, self.linear_dim)
self.bias = 0.0
@@ -50,42 +141,33 @@ def _sigmoid(self, z):
def _get_parameter_text(self, candidate):
"""Get the parameter text for a ModuleCandidate."""
- if not candidate.update_dict:
- # If update_dict is empty, use a default text or base module info
- return "base_module_parameters"
-
- # Get the first value from update_dict (similar to additional_instructions)
- # TODO: support for multiple parameters
- parameter_text = list(candidate.update_dict.values())[0]
- return str(parameter_text)
+ if not hasattr(candidate, 'update_dict'):
+ print(candidate)
+ assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict"
+ # Convert parameter nodes to readable names for deterministic embedding
+ params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()}
+ return str(params_with_names)
- def _get_embedding(self, candidate):
+ def _get_embedding(self, candidate,max_retries=10,base_delay=1.0):
"""Get the embedding for a ModuleCandidate."""
parameter_text = self._get_parameter_text(candidate)
- def single_embedding_call():
- return litellm.embedding(
- model=self.embedding_model,
- input=parameter_text
- )
-
try:
response = retry_with_exponential_backoff(
- single_embedding_call,
- max_retries=10,
- base_delay=1.0,
+ lambda: embed_text(self.embedding_model, parameter_text),
+ max_retries=max_retries,
+ base_delay=base_delay,
operation_name="Embedding API call"
)
embedding = response.data[0].embedding
return embedding
except Exception as e:
print_color(f"ERROR: Embedding API call failed after retries: {e}", "red")
- # Return a random embedding as fallback to prevent complete failure
print_color("Using random embedding as fallback", "yellow")
fallback_embedding = np.random.normal(0, 0.01, self.linear_dim)
return fallback_embedding / np.linalg.norm(fallback_embedding)
- def _update_memory_embeddings_for_batch(self, batch):
+ def _update_memory_embeddings_for_batch(self, batch,max_workers=1000,max_retries=10,base_delay=1.0):
"""Update the embeddings for a batch of candidates."""
# Separate candidates that need embeddings from those that already have them
candidates_needing_embeddings = []
@@ -105,7 +187,7 @@ def get_embedding_for_candidate(candidate):
# Run embedding generation in parallel
new_embeddings = async_run(
embedding_functions,
- max_workers=1000,
+ max_workers=max_workers,
description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates"
)
@@ -116,7 +198,8 @@ def get_embedding_for_candidate(candidate):
def update(self):
"""Update the regression model parameters using the current memory with logistic regression."""
start_time = time.time()
- print_color("Updating regression model using the current memory with logistic regression...", "blue")
+ if self.verbose:
+ print_color("Updating regression model using the current memory with logistic regression...", "blue")
# Extract candidates from memory (memory contains (neg_score, candidate) tuples)
batch = [candidate for _, candidate in self.memory]
# Ensure all candidates have embeddings
@@ -126,10 +209,12 @@ def update(self):
training_candidates = [candidate for neg_score, candidate in self.memory if candidate.num_rollouts > 0 and candidate.mean_score() is not None]
if len(training_candidates) == 0:
- print_color("Warning: No training data available for regression model.", "yellow")
+ if self.verbose:
+ print_color("Warning: No training data available for regression model.", "yellow")
end_time = time.time()
elapsed_time = end_time - start_time
- print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no training data)", "cyan")
+ if self.verbose:
+ print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no training data)", "cyan")
return
# Extract raw binary training data from each candidate
@@ -169,7 +254,8 @@ def update(self):
print_color("Warning: No binary training samples generated.", "yellow")
end_time = time.time()
elapsed_time = end_time - start_time
- print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no binary samples)", "cyan")
+ if self.verbose:
+ print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no binary samples)", "cyan")
return
# Convert to numpy arrays
@@ -183,21 +269,7 @@ def update(self):
self.weights = np.random.normal(0, 0.1, self.linear_dim)
# Convergence-based regularized logistic regression training using all raw binary data
- m = len(X_list)
- # print_color(f"Training regularized logistic regression with {m} binary samples from {len(training_candidates)} candidates until convergence.", "blue")
- # print_color(f"Using L2 regularization strength: {self.regularization_strength}, learning rate: {self.learning_rate}", "blue")
- # print_color(f"Max iterations: {self.max_iterations}, tolerance: {self.tolerance}", "blue")
-
- # Debug: Print initial weight statistics
- initial_weight_norm = np.linalg.norm(self.weights)
- # print_color(f"Initial weight norm: {initial_weight_norm:.6f}", "yellow")
-
- # Debug: Print embedding statistics
- embedding_mean = np.mean(X)
- embedding_std = np.std(X)
- embedding_norm_mean = np.mean([np.linalg.norm(row) for row in X])
- # print_color(f"Embedding stats - mean: {embedding_mean:.6f}, std: {embedding_std:.6f}, avg norm: {embedding_norm_mean:.6f}", "yellow")
-
+ m = len(X_list)
# Training loop until convergence with adaptive learning rate and early stopping
prev_cost = float('inf')
best_cost = float('inf')
@@ -253,15 +325,6 @@ def update(self):
self.weights -= self.learning_rate * dw
self.bias -= self.learning_rate * db
- # Print progress periodically
- # if iteration == 0 or (iteration + 1) % max(1, min(50, self.max_iterations // 20)) == 0:
- # z_mean, z_std = np.mean(z), np.std(z)
- # weight_norm = np.linalg.norm(self.weights)
- # print_color(f"Iteration {iteration + 1}: Cost: {total_cost:.6f} (change: {cost_change:.8f}), LR: {self.learning_rate:.6f}, Weight norm: {weight_norm:.6f}, Gradient norm: {gradient_norm:.8f}", "cyan")
- # print_color(f" Logits - mean: {z_mean:.6f}, std: {z_std:.6f}, range: [{np.min(z):.6f}, {np.max(z):.6f}]", "cyan")
- # print_color(f" Predictions - range: [{np.min(predictions):.6f}, {np.max(predictions):.6f}], mean: {np.mean(predictions):.6f}", "cyan")
- # print_color(f" Patience: {patience_counter}/{self.patience}", "cyan")
-
prev_cost = total_cost
# Final status
diff --git a/opto/features/priority_search/polca.py b/opto/features/priority_search/polca.py
new file mode 100644
index 00000000..2850a2a6
--- /dev/null
+++ b/opto/features/priority_search/polca.py
@@ -0,0 +1,128 @@
+from opto.features.priority_search.priority_search import PrioritySearch, ModuleCandidate
+from opto.features.priority_search.module_regressor import RegressorTemplate
+from opto.features.priority_search.summarizer import Summarizer
+from typing import Union, List, Tuple, Dict, Any, Optional, Callable
+from opto.optimizers.utils import print_color
+import numpy as np
+from opto.features.priority_search.search_template import Samples
+
+
+def calculate_distance_to_memory(memory, new_candidate):
+ """For a new candidate, calculate the distance to the current memory. That's the least L2 distance to any candidate in the memory.
+
+ To use this funciton in PrioritySearch, set memory to be self.memory.memory.
+ """
+ assert hasattr(new_candidate, 'embedding') and all(hasattr(candidate, 'embedding') for _, candidate in memory), "All candidates should have the embedding attribute."
+ min_distance = float('inf')
+ for _, candidate in memory:
+ distance = np.linalg.norm(np.array(new_candidate.embedding) - np.array(candidate.embedding))
+ if distance < min_distance:
+ min_distance = distance
+ return min_distance
+
+class POLCA(PrioritySearch):
+ """
+ A subclass of PrioritySearch, which keeps an epsilon-net as the memory. Reject new candidates that are in the epsilon-net of the memory.
+
+ This class uses a summarizer to summarize the memory and the exploration candidates. It then sets the context for the optimizer to use the summary to guide the exploration.
+
+ Args:
+ epsilon: The epsilon value for the epsilon-net. 0 means no filtering, the same as vanilla PrioritySearch.
+ use_summarizer: Whether to use a summarizer to summarize the memory and the exploration candidates.
+ summarizer_model_name: The model name for the summarizer.
+ *args: Additional arguments for the parent class.
+ **kwargs: Additional keyword arguments for the parent class.
+ """
+ def __init__(self,
+ epsilon: float = 0.1,
+ use_summarizer: bool = False,
+ context: str = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: ",
+ *args,
+ **kwargs):
+ super().__init__(*args, **kwargs)
+ self.epsilon = epsilon
+ self.use_summarizer = use_summarizer
+ self.regressor = RegressorTemplate()
+ self.summarizer = Summarizer()
+ self.context = context
+
+ def filter_candidates(self, new_candidates: List[ModuleCandidate]) -> List[ModuleCandidate]:
+ """ Filter candidates by their embeddings.
+ """
+ if self.epsilon == 0: # no filtering
+ print_color(f"No filtering of candidates.", "green")
+ return new_candidates
+ exploration_memory = [(0, candidate) for candidate in self._exploration_candidates]
+ current_memory = self.memory.memory + exploration_memory
+ # Add embeddings to all the candidates. The regressor will check if the candidates have embeddings, and if not, it will add them in parallel.
+ current_candidates = [candidate for _, candidate in current_memory]
+ self.regressor.add_embeddings_to_candidates(current_candidates+new_candidates)
+
+ # filter new candidates based on the distance to the current memory.
+ num_new_candidates = len(new_candidates)
+
+ added_candidates = []
+ success_distances = []
+
+ while len(new_candidates) > 0:
+ # calculate the distance to the memory for each new candidate
+ distances = [calculate_distance_to_memory(current_memory, new_candidate) for new_candidate in new_candidates]
+
+ # filter candidates: keep only those with distance > epsilon
+ filtered_candidates = []
+ filtered_distances = []
+ for i, (candidate, distance) in enumerate(zip(new_candidates, distances)):
+ if distance > self.epsilon:
+ filtered_candidates.append(candidate)
+ filtered_distances.append(distance)
+
+ # if no candidates remain, exit the loop
+ if len(filtered_candidates) == 0:
+ break
+
+ # add the candidate with the largest distance to the memory
+ max_distance_idx = np.argmax(filtered_distances)
+ new_node = filtered_candidates[max_distance_idx]
+ current_memory.append((0, new_node))
+ added_candidates.append(new_node)
+ success_distances.append(float(filtered_distances[max_distance_idx]))
+
+ # remove the added candidate from new_candidates list
+ new_candidates = [c for c in filtered_candidates if c is not new_node]
+
+ print_color(f"Proposed {num_new_candidates} new candidates, {len(added_candidates)} of them are added to the memory.", "green")
+ # print the distances between the added candidates and the memory before adding them.
+ print_color(f"Distances between the added candidates and the memory before adding them: {success_distances}", "green")
+ return added_candidates
+
+ def compress_candidate_memory(self, candidate: ModuleCandidate) -> ModuleCandidate:
+ """ For the summarizer usage, we keep the entire rollout. """
+ if self.use_summarizer:
+ return candidate
+ else:
+ return super().compress_candidate_memory(candidate)
+
+ def propose(self,
+ samples : Samples,
+ verbose : bool = False,
+ **kwargs):
+ """
+ Override the propose method to include a summary into the context of the optimizer.
+ """
+
+ # Use the summarizer to summarize the memory and the exploration candidates.
+ if self.use_summarizer:
+ # Summarize the memory and the exploration candidates.
+ exploration_memory = [(0, candidate) for candidate in self._exploration_candidates]
+ print_color(f"Summarizing the history...", "green")
+ try:
+ summary = self.summarizer.summarize(self.memory.memory+exploration_memory)
+ print_color(f"Summary: {summary}", "green")
+ self.context = f"Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: {summary}"
+ except Exception as e:
+ print_color(f"Error: {e}", "red")
+ print_color(f"Using fallback context: {self.context}", "red")
+ # Set the context for the optimizer.
+ for candidate in self._exploration_candidates:
+ candidate.optimizer.set_context(self.context)
+ return super().propose(samples, verbose, **kwargs)
diff --git a/opto/features/priority_search/priority_search.py b/opto/features/priority_search/priority_search.py
index 828b8837..d35b114f 100644
--- a/opto/features/priority_search/priority_search.py
+++ b/opto/features/priority_search/priority_search.py
@@ -534,7 +534,14 @@ def update(self,
# samples is None in the first iteration
if samples is not None:
# 1. Propose new parameters based on running LLM optimizers on the collected samples
+
+ # We add the exploration rollouts to the exploration candidates, before proposing. Then these samples will not be added in the validate step.
+ self.add_exploration_rollouts_to_candidates(self._exploration_candidates, samples)
candidates = self.propose(samples, verbose=verbose, **kwargs) # List of ModuleCandidates
+
+ # Filter the new candidates, only some of them will be added to the memory. Default is no filtering.
+ candidates = self.filter_candidates(candidates)
+
# 2. Validate the proposed parameters
validate_results = self.validate(candidates, samples, verbose=verbose, **kwargs) # this updates the priority queue
# 3. Update the priority queue with the validation results
@@ -721,8 +728,8 @@ def validate(self,
exploration_candidates = self._exploration_candidates # exploration candidates from the previous iteration
assert self._exploration_candidates is not None, "exploration_candidates must be set before calling validate."
- # The current batch of samples can be used to validate the exploration candidates
- validate_samples = copy.copy(samples)
+ # Exploration samples are added before proposing, so we don't need to add them again here.
+ validate_samples = Samples([], {'inputs': [], 'infos': []})
# Validate newly proposed candidates
use_prev_batch = self.use_prev_batch # when True, self.validate_sampler == self.train_sampler, and the current batch is used for validation
@@ -730,7 +737,7 @@ def validate(self,
validate_samples.add_samples(Samples(*self.validate_sampler.sample(candidate_agents,
use_prev_batch=use_prev_batch,
description_prefix='Validating newly proposed candidates: '))) # list of BatchRollout objects
-
+ candidates_to_be_matched = candidates
if self.validate_exploration_candidates:
if not use_prev_batch: # validate the exploration candidates that collected the samples as well
# validate the agents in the validate_dataset
@@ -738,16 +745,22 @@ def validate(self,
exploration_samples = Samples(*self.validate_sampler.sample(exploration_agents,
description_prefix='Validating exploration candidates: ')) # sample the exploration agents
validate_samples.add_samples(exploration_samples) # append the exploration samples to the validate_samples
+ # Only match exploration candidates if they were actually sampled (i.e., validate_exploration_candidates=True and use_prev_batch=False)
+ candidates_to_be_matched = exploration_candidates + candidates
-
- matched_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates + candidates, validate_samples.samples)
+ matched_candidates_and_samples = self.match_candidates_and_samples(candidates_to_be_matched, validate_samples.samples)
results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts)
for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts
results[c] = [ r for rr in rollouts for r in rr.to_list()] # we only need the list of dicts
+ # Add exploration candidates that weren't included in match_candidates_and_samples
+ # This ensures they get re-added to memory even if they weren't validated again
+ for candidate in exploration_candidates:
+ if candidate not in results:
+ results[candidate] = [] # Add with empty rollouts list
# Populate score_dict in each rollout when multi-objective is active
cfg = getattr(self, 'objective_config', None)
- if cfg is not None and cfg.mode != "scalar":
+ if cfg is not None:
guide = self.validate_sampler.guide
for c, rollout_list in results.items():
for rollout in rollout_list:
@@ -936,3 +949,22 @@ def _process_rollout(rollout):
for rollout in candidate.rollouts:
_process_rollout(rollout)
return candidate
+
+ # For the further usage.
+ def filter_candidates(self, candidates: List[ModuleCandidate]) -> List[ModuleCandidate]:
+ """ Filter candidates.
+ This function can be overridden by subclasses to filter candidates by other criteria.
+ Args:
+ candidates (List[ModuleCandidate]): A list of candidates to filter.
+ Returns:
+ List[ModuleCandidate]: A list of filtered candidates.
+ """
+ return candidates
+
+ # For the further usage, we decide to add the exploration rollouts to the exploration candidates, before proposing.
+ def add_exploration_rollouts_to_candidates(self, exploration_candidates: List[ModuleCandidate], samples: Samples):
+ """ Add the exploration rollouts to the exploration candidates.
+ """
+ matched_exploration_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates, samples.samples)
+ for c, rollouts in matched_exploration_candidates_and_samples.items():
+ c.add_rollouts([r for rr in rollouts for r in rr.to_list()])
\ No newline at end of file
diff --git a/opto/features/priority_search/search_template.py b/opto/features/priority_search/search_template.py
index ec244f74..616dd1ff 100644
--- a/opto/features/priority_search/search_template.py
+++ b/opto/features/priority_search/search_template.py
@@ -230,13 +230,13 @@ def train(self,
train_scores.append(info_sample['mean_score']) # so that mean can be computed
train_num_samples.append(info_sample['num_samples'])
+ self.n_samples += len(samples) # update the number of samples processed
if self.n_iters % log_frequency == 0:
avg_train_score = np.sum(np.array(train_scores) * np.array(train_num_samples)) / np.sum(train_num_samples)
self.logger.log('Algo/Average train score', avg_train_score, self.n_iters, color='blue')
self.log(info_update, prefix="Update/")
self.log(info_sample, prefix="Sample/")
- self.n_samples += len(samples) # update the number of samples processed
self.logger.log('Algo/Number of training samples', self.n_samples, self.n_iters, color='blue')
# Log parameters
for p in self.agent.parameters():
diff --git a/opto/features/priority_search/summarizer.py b/opto/features/priority_search/summarizer.py
new file mode 100644
index 00000000..4530ae09
--- /dev/null
+++ b/opto/features/priority_search/summarizer.py
@@ -0,0 +1,195 @@
+from opto.optimizers.utils import print_color
+from opto.utils.llm import LLM # For the selector LLM
+import random
+import re
+
+
+def get_trajectory_of_one_rollout(rollout):
+ """
+ Convert a rollout into a structured markdown trajectory for optimization.
+
+ This function extracts the trainable parameters and formats the trajectory
+ to guide the optimizer in improving the module's performance.
+
+ Parameters
+ ----------
+ rollout : dict
+ A rollout dictionary containing:
+ - 'module': trace.Module - the agent module with trainable parameters
+ - 'x': Any - the input data
+ - 'info': Any - additional information about the input
+ - 'target': Any - the generated output
+ - 'score': float - evaluation score (0 = failed, 1 = success)
+ - 'feedback': Any - detailed feedback from the evaluation
+
+ Returns
+ -------
+ str
+ A markdown-formatted trajectory string for optimizer guidance.
+ """
+ assert rollout['module'] is not None, "rollout['module'] is None."
+ assert rollout['x'] is not None, "rollout['x'] is None."
+ assert rollout['target'] is not None, "rollout['target'] is None."
+ assert rollout['score'] is not None, "rollout['score'] is None."
+ assert rollout['feedback'] is not None, "rollout['feedback'] is None."
+
+ # Extract trainable parameters
+ parameters = rollout['module'].parameters()
+ parameters_dict = {p.py_name: p.data for p in parameters}
+
+ # In multi-objective mode, rollouts carry a 'score_dict' with per-metric scores
+ # (e.g. {"accuracy": 0.9, "latency": 0.2}) populated by validate() in PrioritySearch.
+ # We render the full breakdown so the summarizer LLM can analyze trade-offs across
+ # objectives, rather than seeing only the aggregate scalar score.
+ # When score_dict is absent (single-objective mode), we fall back to scalar-only display.
+ score_dict = rollout.get('score_dict')
+ if isinstance(score_dict, dict) and score_dict:
+ breakdown = "\n".join(f" - {k}: {v}" for k, v in score_dict.items())
+ result_section = (
+ f"- **Overall Score:** {rollout['score']}\n"
+ f"- **Score Breakdown:**\n{breakdown}\n"
+ f"- **Feedback:** {rollout['feedback']}"
+ )
+ else:
+ result_section = (
+ f"- **Score:** {rollout['score']}\n"
+ f"- **Feedback:** {rollout['feedback']}"
+ )
+
+ trajectory = f"""## Task Trajectory
+
+## Module Parameters
+{parameters_dict}
+
+## Input
+{rollout['x']}
+
+## Output
+{rollout['target']}
+
+## Result
+{result_section}"""
+ return trajectory
+
+class Summarizer:
+ """A class which use LLM to summarize the trajectories of the memory. It should be able to learn the patterns of the trajectories. Generate a summary to guide the optimizer to generate better candidates.
+ """
+ DEFAULT_SYSTEM_PROMPT = "You are an expert at analyzing agent behavior patterns and providing actionable guidance for parameter optimization."
+
+ DEFAULT_USER_PROMPT_TEMPLATE = """Analyze the following agent conversation trajectories and extract insights for optimization.
+
+ Current Summary (from previous analysis):
+ {current_summary}
+
+ New Trajectories to Analyze:
+ {history_trajectories}
+
+ Instructions:
+ - Review both the Current Summary and the New Trajectories
+ - Synthesize ALL insights into a single, cohesive summary
+ - Integrate new patterns with existing knowledge
+ - Reorganize and consolidate information as needed for clarity
+ - DO NOT use incremental language like "[Previous points remain valid, plus:]"
+ - Generate a complete, standalone summary that incorporates everything
+
+ Provide your analysis in XML format:
+
+ Analyze the key patterns and strategies that led to success or failure in these trajectories. Consider both the current summary and new trajectories.
+
+
+ A complete, consolidated summary with concrete recommendations for generating better results. This should be a standalone summary that integrates insights from both the current summary and new trajectories, without using incremental modification language.
+ """
+
+ def __init__(self, verbose: bool = False, success_threshold: float = 0,
+ max_candidates_in_prompt: int = 5,
+ current_summary: str = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: ",
+ system_prompt: str = None,
+ user_prompt_template: str = None):
+ self.llm = LLM() # use the default model
+ self.max_candidates_in_prompt = max_candidates_in_prompt
+ self.current_summary = current_summary
+ self.used_candidates = set() # Track candidates that have been summarized
+ self.verbose = verbose
+ self.system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT
+ self.user_prompt_template = user_prompt_template or self.DEFAULT_USER_PROMPT_TEMPLATE
+ # Configurable threshold for classifying rollouts as successful (score > threshold)
+ # or failed (score <= threshold). Defaults to 0 for backward compatibility.
+ # Previously hardcoded as 0, which also caused rollouts with negative scores to be
+ # missed by both the success and failure lists.
+ self.success_threshold = success_threshold
+
+ def _get_trajectories_for_memory(self, memory):
+ """
+ Get trajectories for the memory. Memory is a list of (neg_score, candidate) tuples.
+ We first collect rollouts from the each candidate, and then get the trajectories for each rollout.
+
+ Return one single string of all trajectories.
+ """
+ trajectories = []
+ if self.verbose:
+ print_color(f"Getting trajectories from {len(memory)} candidates.", "blue")
+ # Filter out candidates that have already been used and have rollouts
+ memory_with_rollouts = [(neg_score, candidate) for neg_score, candidate in memory
+ if len([rollout for rollout in candidate.rollouts if rollout['score'] is not None]) > 0
+ and id(candidate) not in self.used_candidates]
+ if self.verbose:
+ print_color(f"Memory (unseen candidates) with rollouts: {len(memory_with_rollouts)}", "blue")
+ # Sample 5 candidates (or fewer if not enough available)
+ num_to_sample = min(5, len(memory_with_rollouts))
+ temporary_memory = random.sample(memory_with_rollouts, k=num_to_sample)
+ # Mark sampled candidates as used
+ for _, candidate in temporary_memory:
+ self.used_candidates.add(id(candidate))
+ for _, candidate in temporary_memory:
+ rollouts = [rollout for rollout in candidate.rollouts if rollout['score'] is not None]
+ if len(rollouts) == 0:
+ continue
+ # For each candidate, add one (if exists) successful_rollout and one (if exists) failed_rollout.
+ candidate_update_dict = candidate.update_dict.values()
+ prompt = f"Candidate pamameters: {candidate_update_dict}."
+ successful_rollouts = [rollout for rollout in rollouts if rollout['score'] > self.success_threshold]
+ failed_rollouts = [rollout for rollout in rollouts if rollout['score'] <= self.success_threshold]
+ if len(successful_rollouts) > 0:
+ random_successful_rollout = random.choice(successful_rollouts)
+ prompt += f"\nSuccessful trajectory: {get_trajectory_of_one_rollout(random_successful_rollout)}."
+ if len(failed_rollouts) > 0:
+ random_failed_rollout = random.choice(failed_rollouts)
+ prompt += f"\nFailed trajectory: {get_trajectory_of_one_rollout(random_failed_rollout)}."
+
+ trajectories.append(prompt)
+ if self.verbose:
+ print_color(f"Generated trajectories from {len(trajectories)} candidates.", "green")
+
+ return '\n'.join(trajectories)
+
+ def summarize(self, memory) -> str:
+ """Summarize the trajectories using the LLM.
+ Args:
+ memory: The memory containing trajectories to summarize.
+ Returns:
+ str: The summary.
+ """
+
+ history_trajectories = self._get_trajectories_for_memory(memory)
+ if len(history_trajectories) == 0:
+ return "No trajectories found for the memory."
+
+ user_prompt = self.user_prompt_template.format(
+ current_summary=self.current_summary,
+ history_trajectories=history_trajectories,
+ )
+
+ prompt_messages = [
+ {"role": "system", "content": self.system_prompt},
+ {"role": "user", "content": user_prompt},
+ ]
+
+
+ response = self.llm(messages=prompt_messages)
+ response = response.choices[0].message.content
+ # Extract summary using XML regex
+ summary_match = re.search(r'(.*?)', response, re.DOTALL)
+
+ self.current_summary = summary_match.group(1).strip()
+
+ return self.current_summary
\ No newline at end of file