diff --git a/opto/features/priority_search/module_regressor.py b/opto/features/priority_search/module_regressor.py index a92793c5..818df096 100644 --- a/opto/features/priority_search/module_regressor.py +++ b/opto/features/priority_search/module_regressor.py @@ -1,22 +1,115 @@ import numpy as np -import copy -from typing import Union -from opto.trainer.loader import DataLoader from opto.trainer.utils import batch_run, async_run from opto.optimizers.utils import print_color -# from opto.trainer.evaluators import evaluate from typing import Union, List, Tuple, Dict, Any, Optional -from collections import deque -from opto.utils.llm import LLM # For the selector LLM -# from opto.trace.nodes import ParameterNode -import json -# import warnings -# from black import format_str, FileMode -import random -# import mathX from opto.utils.auto_retry import retry_with_exponential_backoff import litellm import time +from opto.features.priority_search.priority_search import ModuleCandidate + + +def embed_text(model, text): + """Call the embedding API for a given model and text string. + + This is a standalone function so users can easily replace it with a custom + embedding provider (e.g. local model, different API) without subclassing. + Must return a litellm-compatible response with response.data[0].embedding. + """ + return litellm.embedding(model=model, input=text) + + +class RegressorTemplate: + """Base class template for regression-based predictors for ModuleCandidate objects. + + Provides common functionality for embedding generation and candidate processing. + Subclasses should implement update() and predict_scores() methods. + + Regressors can be built on this template by implementing the update() and predict_scores() methods. + This class itself is enough for getting embeddings for candidates. + """ + + def __init__(self, embedding_model="gemini/gemini-embedding-001", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True,verbose: bool = False, max_candidates_to_predict=500,original_embedding_dim=768): + ''' + Args: + embedding_model: The embedding model to use. + num_threads: The number of threads to use for the embedding generation. + regularization_strength: The regularization strength for the logistic regression. + linear_dim: The dimension of the linear space. + rich_text: Whether to use rich text for the parameter text. + verbose: Whether to print the verbose output. + max_candidates_to_predict: The maximum number of candidates to predict. + original_embedding_dim: The original dimension of the embedding. + ''' + def _get_parameter_text(self, candidate): + """Get the parameter text for a ModuleCandidate.""" + if not hasattr(candidate, 'update_dict'): + print(candidate) + assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict" + # Convert parameter nodes to readable names for deterministic embedding + params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()} + return str(params_with_names) + + + def _get_embedding(self, candidate,max_retries=10,base_delay=1.0): + """Get the embedding for a ModuleCandidate.""" + parameter_text = self._get_parameter_text(candidate) + + try: + response = retry_with_exponential_backoff( + lambda: embed_text(self.embedding_model, parameter_text), + max_retries=max_retries, + base_delay=base_delay, + operation_name="Embedding API call" + ) + embedding = response.data[0].embedding + if self.random_projector is not None: + embedding_array = np.array(embedding).reshape(1, -1) + projected = self.random_projector.transform(embedding_array) + embedding = projected.flatten().tolist() + return embedding + except Exception as e: + print_color(f"ERROR: Embedding API call failed after retries: {e}", "red") + return None + + def add_embeddings_to_candidates(self, candidates: List[ModuleCandidate]): + """Add embeddings to a list of candidates. This function could be used outside.""" + self._update_memory_embeddings_for_batch(candidates) + + def _update_memory_embeddings_for_batch(self, batch,max_workers=50,max_retries=10,base_delay=1.0): + """Update the embeddings for a batch of candidates.""" + # Separate candidates that need embeddings from those that already have them + candidates_needing_embeddings = [] + for candidate in batch: + if not hasattr(candidate, "embedding"): + candidates_needing_embeddings.append(candidate) + + # Generate embeddings in parallel for candidates that need them + if candidates_needing_embeddings: + def get_embedding_for_candidate(candidate): + return self._get_embedding(candidate) + + # Create function list for async_run + embedding_functions = [lambda c=candidate: get_embedding_for_candidate(c) + for candidate in candidates_needing_embeddings] + + # Run embedding generation in parallel + new_embeddings = async_run( + embedding_functions, + max_workers=max_workers, + description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates" + ) + + # Assign embeddings back to candidates + for candidate, embedding in zip(candidates_needing_embeddings, new_embeddings): + candidate.embedding = embedding + + def update(self, memory: List[Tuple[float, ModuleCandidate]]): + """Update the regression model parameters. Should be implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement the update method") + + def predict_scores(self, memory: List[Tuple[float, ModuleCandidate]]): + """Predict scores for candidates. Should be implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement the predict_scores method") class ModuleCandidateRegressor: """ @@ -25,9 +118,8 @@ class ModuleCandidateRegressor: predict_scores has no parameters, it could return predicted scores for all candidates in the memory. predict_scores_for_batch has one parameter, a batch of candidates, it could return predicted scores for the batch of candidates.""" - def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num_threads=None, learning_rate=0.2, regularization_strength=1e-4, max_iterations=20000, tolerance=5e-3): - # In the regressor, no need for calling LLM to make the prediction. So we could predict the entire memory at once. - self.max_candidates_to_predict = 500 + def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num_threads=None, learning_rate=0.2, regularization_strength=1e-4, max_iterations=20000, tolerance=5e-3, max_candidates_to_predict=500, original_embedding_dim=768,patience=20,lr_decay_factor=0.8): + self.max_candidates_to_predict = max_candidates_to_predict self.memory = memory self.embedding_model = embedding_model self.num_threads = num_threads @@ -36,10 +128,9 @@ def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num self.regularization_strength = regularization_strength # L2 regularization strength (lambda) self.max_iterations = max_iterations self.tolerance = tolerance - self.patience = 20 # Early stopping patience - self.lr_decay_factor = 0.8 # Learning rate decay factor - # default linear dimension is 768 - self.linear_dim = 768 + self.patience = patience # Early stopping patience + self.lr_decay_factor = lr_decay_factor # Learning rate decay factor + self.linear_dim = original_embedding_dim # Initialize weights with larger values for more aggressive learning self.weights = np.random.normal(0, 0.1, self.linear_dim) self.bias = 0.0 @@ -50,42 +141,33 @@ def _sigmoid(self, z): def _get_parameter_text(self, candidate): """Get the parameter text for a ModuleCandidate.""" - if not candidate.update_dict: - # If update_dict is empty, use a default text or base module info - return "base_module_parameters" - - # Get the first value from update_dict (similar to additional_instructions) - # TODO: support for multiple parameters - parameter_text = list(candidate.update_dict.values())[0] - return str(parameter_text) + if not hasattr(candidate, 'update_dict'): + print(candidate) + assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict" + # Convert parameter nodes to readable names for deterministic embedding + params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()} + return str(params_with_names) - def _get_embedding(self, candidate): + def _get_embedding(self, candidate,max_retries=10,base_delay=1.0): """Get the embedding for a ModuleCandidate.""" parameter_text = self._get_parameter_text(candidate) - def single_embedding_call(): - return litellm.embedding( - model=self.embedding_model, - input=parameter_text - ) - try: response = retry_with_exponential_backoff( - single_embedding_call, - max_retries=10, - base_delay=1.0, + lambda: embed_text(self.embedding_model, parameter_text), + max_retries=max_retries, + base_delay=base_delay, operation_name="Embedding API call" ) embedding = response.data[0].embedding return embedding except Exception as e: print_color(f"ERROR: Embedding API call failed after retries: {e}", "red") - # Return a random embedding as fallback to prevent complete failure print_color("Using random embedding as fallback", "yellow") fallback_embedding = np.random.normal(0, 0.01, self.linear_dim) return fallback_embedding / np.linalg.norm(fallback_embedding) - def _update_memory_embeddings_for_batch(self, batch): + def _update_memory_embeddings_for_batch(self, batch,max_workers=1000,max_retries=10,base_delay=1.0): """Update the embeddings for a batch of candidates.""" # Separate candidates that need embeddings from those that already have them candidates_needing_embeddings = [] @@ -105,7 +187,7 @@ def get_embedding_for_candidate(candidate): # Run embedding generation in parallel new_embeddings = async_run( embedding_functions, - max_workers=1000, + max_workers=max_workers, description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates" ) @@ -116,7 +198,8 @@ def get_embedding_for_candidate(candidate): def update(self): """Update the regression model parameters using the current memory with logistic regression.""" start_time = time.time() - print_color("Updating regression model using the current memory with logistic regression...", "blue") + if self.verbose: + print_color("Updating regression model using the current memory with logistic regression...", "blue") # Extract candidates from memory (memory contains (neg_score, candidate) tuples) batch = [candidate for _, candidate in self.memory] # Ensure all candidates have embeddings @@ -126,10 +209,12 @@ def update(self): training_candidates = [candidate for neg_score, candidate in self.memory if candidate.num_rollouts > 0 and candidate.mean_score() is not None] if len(training_candidates) == 0: - print_color("Warning: No training data available for regression model.", "yellow") + if self.verbose: + print_color("Warning: No training data available for regression model.", "yellow") end_time = time.time() elapsed_time = end_time - start_time - print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no training data)", "cyan") + if self.verbose: + print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no training data)", "cyan") return # Extract raw binary training data from each candidate @@ -169,7 +254,8 @@ def update(self): print_color("Warning: No binary training samples generated.", "yellow") end_time = time.time() elapsed_time = end_time - start_time - print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no binary samples)", "cyan") + if self.verbose: + print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no binary samples)", "cyan") return # Convert to numpy arrays @@ -183,21 +269,7 @@ def update(self): self.weights = np.random.normal(0, 0.1, self.linear_dim) # Convergence-based regularized logistic regression training using all raw binary data - m = len(X_list) - # print_color(f"Training regularized logistic regression with {m} binary samples from {len(training_candidates)} candidates until convergence.", "blue") - # print_color(f"Using L2 regularization strength: {self.regularization_strength}, learning rate: {self.learning_rate}", "blue") - # print_color(f"Max iterations: {self.max_iterations}, tolerance: {self.tolerance}", "blue") - - # Debug: Print initial weight statistics - initial_weight_norm = np.linalg.norm(self.weights) - # print_color(f"Initial weight norm: {initial_weight_norm:.6f}", "yellow") - - # Debug: Print embedding statistics - embedding_mean = np.mean(X) - embedding_std = np.std(X) - embedding_norm_mean = np.mean([np.linalg.norm(row) for row in X]) - # print_color(f"Embedding stats - mean: {embedding_mean:.6f}, std: {embedding_std:.6f}, avg norm: {embedding_norm_mean:.6f}", "yellow") - + m = len(X_list) # Training loop until convergence with adaptive learning rate and early stopping prev_cost = float('inf') best_cost = float('inf') @@ -253,15 +325,6 @@ def update(self): self.weights -= self.learning_rate * dw self.bias -= self.learning_rate * db - # Print progress periodically - # if iteration == 0 or (iteration + 1) % max(1, min(50, self.max_iterations // 20)) == 0: - # z_mean, z_std = np.mean(z), np.std(z) - # weight_norm = np.linalg.norm(self.weights) - # print_color(f"Iteration {iteration + 1}: Cost: {total_cost:.6f} (change: {cost_change:.8f}), LR: {self.learning_rate:.6f}, Weight norm: {weight_norm:.6f}, Gradient norm: {gradient_norm:.8f}", "cyan") - # print_color(f" Logits - mean: {z_mean:.6f}, std: {z_std:.6f}, range: [{np.min(z):.6f}, {np.max(z):.6f}]", "cyan") - # print_color(f" Predictions - range: [{np.min(predictions):.6f}, {np.max(predictions):.6f}], mean: {np.mean(predictions):.6f}", "cyan") - # print_color(f" Patience: {patience_counter}/{self.patience}", "cyan") - prev_cost = total_cost # Final status diff --git a/opto/features/priority_search/polca.py b/opto/features/priority_search/polca.py new file mode 100644 index 00000000..2850a2a6 --- /dev/null +++ b/opto/features/priority_search/polca.py @@ -0,0 +1,128 @@ +from opto.features.priority_search.priority_search import PrioritySearch, ModuleCandidate +from opto.features.priority_search.module_regressor import RegressorTemplate +from opto.features.priority_search.summarizer import Summarizer +from typing import Union, List, Tuple, Dict, Any, Optional, Callable +from opto.optimizers.utils import print_color +import numpy as np +from opto.features.priority_search.search_template import Samples + + +def calculate_distance_to_memory(memory, new_candidate): + """For a new candidate, calculate the distance to the current memory. That's the least L2 distance to any candidate in the memory. + + To use this funciton in PrioritySearch, set memory to be self.memory.memory. + """ + assert hasattr(new_candidate, 'embedding') and all(hasattr(candidate, 'embedding') for _, candidate in memory), "All candidates should have the embedding attribute." + min_distance = float('inf') + for _, candidate in memory: + distance = np.linalg.norm(np.array(new_candidate.embedding) - np.array(candidate.embedding)) + if distance < min_distance: + min_distance = distance + return min_distance + +class POLCA(PrioritySearch): + """ + A subclass of PrioritySearch, which keeps an epsilon-net as the memory. Reject new candidates that are in the epsilon-net of the memory. + + This class uses a summarizer to summarize the memory and the exploration candidates. It then sets the context for the optimizer to use the summary to guide the exploration. + + Args: + epsilon: The epsilon value for the epsilon-net. 0 means no filtering, the same as vanilla PrioritySearch. + use_summarizer: Whether to use a summarizer to summarize the memory and the exploration candidates. + summarizer_model_name: The model name for the summarizer. + *args: Additional arguments for the parent class. + **kwargs: Additional keyword arguments for the parent class. + """ + def __init__(self, + epsilon: float = 0.1, + use_summarizer: bool = False, + context: str = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: ", + *args, + **kwargs): + super().__init__(*args, **kwargs) + self.epsilon = epsilon + self.use_summarizer = use_summarizer + self.regressor = RegressorTemplate() + self.summarizer = Summarizer() + self.context = context + + def filter_candidates(self, new_candidates: List[ModuleCandidate]) -> List[ModuleCandidate]: + """ Filter candidates by their embeddings. + """ + if self.epsilon == 0: # no filtering + print_color(f"No filtering of candidates.", "green") + return new_candidates + exploration_memory = [(0, candidate) for candidate in self._exploration_candidates] + current_memory = self.memory.memory + exploration_memory + # Add embeddings to all the candidates. The regressor will check if the candidates have embeddings, and if not, it will add them in parallel. + current_candidates = [candidate for _, candidate in current_memory] + self.regressor.add_embeddings_to_candidates(current_candidates+new_candidates) + + # filter new candidates based on the distance to the current memory. + num_new_candidates = len(new_candidates) + + added_candidates = [] + success_distances = [] + + while len(new_candidates) > 0: + # calculate the distance to the memory for each new candidate + distances = [calculate_distance_to_memory(current_memory, new_candidate) for new_candidate in new_candidates] + + # filter candidates: keep only those with distance > epsilon + filtered_candidates = [] + filtered_distances = [] + for i, (candidate, distance) in enumerate(zip(new_candidates, distances)): + if distance > self.epsilon: + filtered_candidates.append(candidate) + filtered_distances.append(distance) + + # if no candidates remain, exit the loop + if len(filtered_candidates) == 0: + break + + # add the candidate with the largest distance to the memory + max_distance_idx = np.argmax(filtered_distances) + new_node = filtered_candidates[max_distance_idx] + current_memory.append((0, new_node)) + added_candidates.append(new_node) + success_distances.append(float(filtered_distances[max_distance_idx])) + + # remove the added candidate from new_candidates list + new_candidates = [c for c in filtered_candidates if c is not new_node] + + print_color(f"Proposed {num_new_candidates} new candidates, {len(added_candidates)} of them are added to the memory.", "green") + # print the distances between the added candidates and the memory before adding them. + print_color(f"Distances between the added candidates and the memory before adding them: {success_distances}", "green") + return added_candidates + + def compress_candidate_memory(self, candidate: ModuleCandidate) -> ModuleCandidate: + """ For the summarizer usage, we keep the entire rollout. """ + if self.use_summarizer: + return candidate + else: + return super().compress_candidate_memory(candidate) + + def propose(self, + samples : Samples, + verbose : bool = False, + **kwargs): + """ + Override the propose method to include a summary into the context of the optimizer. + """ + + # Use the summarizer to summarize the memory and the exploration candidates. + if self.use_summarizer: + # Summarize the memory and the exploration candidates. + exploration_memory = [(0, candidate) for candidate in self._exploration_candidates] + print_color(f"Summarizing the history...", "green") + try: + summary = self.summarizer.summarize(self.memory.memory+exploration_memory) + print_color(f"Summary: {summary}", "green") + self.context = f"Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: {summary}" + except Exception as e: + print_color(f"Error: {e}", "red") + print_color(f"Using fallback context: {self.context}", "red") + # Set the context for the optimizer. + for candidate in self._exploration_candidates: + candidate.optimizer.set_context(self.context) + return super().propose(samples, verbose, **kwargs) diff --git a/opto/features/priority_search/priority_search.py b/opto/features/priority_search/priority_search.py index 828b8837..d35b114f 100644 --- a/opto/features/priority_search/priority_search.py +++ b/opto/features/priority_search/priority_search.py @@ -534,7 +534,14 @@ def update(self, # samples is None in the first iteration if samples is not None: # 1. Propose new parameters based on running LLM optimizers on the collected samples + + # We add the exploration rollouts to the exploration candidates, before proposing. Then these samples will not be added in the validate step. + self.add_exploration_rollouts_to_candidates(self._exploration_candidates, samples) candidates = self.propose(samples, verbose=verbose, **kwargs) # List of ModuleCandidates + + # Filter the new candidates, only some of them will be added to the memory. Default is no filtering. + candidates = self.filter_candidates(candidates) + # 2. Validate the proposed parameters validate_results = self.validate(candidates, samples, verbose=verbose, **kwargs) # this updates the priority queue # 3. Update the priority queue with the validation results @@ -721,8 +728,8 @@ def validate(self, exploration_candidates = self._exploration_candidates # exploration candidates from the previous iteration assert self._exploration_candidates is not None, "exploration_candidates must be set before calling validate." - # The current batch of samples can be used to validate the exploration candidates - validate_samples = copy.copy(samples) + # Exploration samples are added before proposing, so we don't need to add them again here. + validate_samples = Samples([], {'inputs': [], 'infos': []}) # Validate newly proposed candidates use_prev_batch = self.use_prev_batch # when True, self.validate_sampler == self.train_sampler, and the current batch is used for validation @@ -730,7 +737,7 @@ def validate(self, validate_samples.add_samples(Samples(*self.validate_sampler.sample(candidate_agents, use_prev_batch=use_prev_batch, description_prefix='Validating newly proposed candidates: '))) # list of BatchRollout objects - + candidates_to_be_matched = candidates if self.validate_exploration_candidates: if not use_prev_batch: # validate the exploration candidates that collected the samples as well # validate the agents in the validate_dataset @@ -738,16 +745,22 @@ def validate(self, exploration_samples = Samples(*self.validate_sampler.sample(exploration_agents, description_prefix='Validating exploration candidates: ')) # sample the exploration agents validate_samples.add_samples(exploration_samples) # append the exploration samples to the validate_samples + # Only match exploration candidates if they were actually sampled (i.e., validate_exploration_candidates=True and use_prev_batch=False) + candidates_to_be_matched = exploration_candidates + candidates - - matched_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates + candidates, validate_samples.samples) + matched_candidates_and_samples = self.match_candidates_and_samples(candidates_to_be_matched, validate_samples.samples) results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts) for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts results[c] = [ r for rr in rollouts for r in rr.to_list()] # we only need the list of dicts + # Add exploration candidates that weren't included in match_candidates_and_samples + # This ensures they get re-added to memory even if they weren't validated again + for candidate in exploration_candidates: + if candidate not in results: + results[candidate] = [] # Add with empty rollouts list # Populate score_dict in each rollout when multi-objective is active cfg = getattr(self, 'objective_config', None) - if cfg is not None and cfg.mode != "scalar": + if cfg is not None: guide = self.validate_sampler.guide for c, rollout_list in results.items(): for rollout in rollout_list: @@ -936,3 +949,22 @@ def _process_rollout(rollout): for rollout in candidate.rollouts: _process_rollout(rollout) return candidate + + # For the further usage. + def filter_candidates(self, candidates: List[ModuleCandidate]) -> List[ModuleCandidate]: + """ Filter candidates. + This function can be overridden by subclasses to filter candidates by other criteria. + Args: + candidates (List[ModuleCandidate]): A list of candidates to filter. + Returns: + List[ModuleCandidate]: A list of filtered candidates. + """ + return candidates + + # For the further usage, we decide to add the exploration rollouts to the exploration candidates, before proposing. + def add_exploration_rollouts_to_candidates(self, exploration_candidates: List[ModuleCandidate], samples: Samples): + """ Add the exploration rollouts to the exploration candidates. + """ + matched_exploration_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates, samples.samples) + for c, rollouts in matched_exploration_candidates_and_samples.items(): + c.add_rollouts([r for rr in rollouts for r in rr.to_list()]) \ No newline at end of file diff --git a/opto/features/priority_search/search_template.py b/opto/features/priority_search/search_template.py index ec244f74..616dd1ff 100644 --- a/opto/features/priority_search/search_template.py +++ b/opto/features/priority_search/search_template.py @@ -230,13 +230,13 @@ def train(self, train_scores.append(info_sample['mean_score']) # so that mean can be computed train_num_samples.append(info_sample['num_samples']) + self.n_samples += len(samples) # update the number of samples processed if self.n_iters % log_frequency == 0: avg_train_score = np.sum(np.array(train_scores) * np.array(train_num_samples)) / np.sum(train_num_samples) self.logger.log('Algo/Average train score', avg_train_score, self.n_iters, color='blue') self.log(info_update, prefix="Update/") self.log(info_sample, prefix="Sample/") - self.n_samples += len(samples) # update the number of samples processed self.logger.log('Algo/Number of training samples', self.n_samples, self.n_iters, color='blue') # Log parameters for p in self.agent.parameters(): diff --git a/opto/features/priority_search/summarizer.py b/opto/features/priority_search/summarizer.py new file mode 100644 index 00000000..4530ae09 --- /dev/null +++ b/opto/features/priority_search/summarizer.py @@ -0,0 +1,195 @@ +from opto.optimizers.utils import print_color +from opto.utils.llm import LLM # For the selector LLM +import random +import re + + +def get_trajectory_of_one_rollout(rollout): + """ + Convert a rollout into a structured markdown trajectory for optimization. + + This function extracts the trainable parameters and formats the trajectory + to guide the optimizer in improving the module's performance. + + Parameters + ---------- + rollout : dict + A rollout dictionary containing: + - 'module': trace.Module - the agent module with trainable parameters + - 'x': Any - the input data + - 'info': Any - additional information about the input + - 'target': Any - the generated output + - 'score': float - evaluation score (0 = failed, 1 = success) + - 'feedback': Any - detailed feedback from the evaluation + + Returns + ------- + str + A markdown-formatted trajectory string for optimizer guidance. + """ + assert rollout['module'] is not None, "rollout['module'] is None." + assert rollout['x'] is not None, "rollout['x'] is None." + assert rollout['target'] is not None, "rollout['target'] is None." + assert rollout['score'] is not None, "rollout['score'] is None." + assert rollout['feedback'] is not None, "rollout['feedback'] is None." + + # Extract trainable parameters + parameters = rollout['module'].parameters() + parameters_dict = {p.py_name: p.data for p in parameters} + + # In multi-objective mode, rollouts carry a 'score_dict' with per-metric scores + # (e.g. {"accuracy": 0.9, "latency": 0.2}) populated by validate() in PrioritySearch. + # We render the full breakdown so the summarizer LLM can analyze trade-offs across + # objectives, rather than seeing only the aggregate scalar score. + # When score_dict is absent (single-objective mode), we fall back to scalar-only display. + score_dict = rollout.get('score_dict') + if isinstance(score_dict, dict) and score_dict: + breakdown = "\n".join(f" - {k}: {v}" for k, v in score_dict.items()) + result_section = ( + f"- **Overall Score:** {rollout['score']}\n" + f"- **Score Breakdown:**\n{breakdown}\n" + f"- **Feedback:** {rollout['feedback']}" + ) + else: + result_section = ( + f"- **Score:** {rollout['score']}\n" + f"- **Feedback:** {rollout['feedback']}" + ) + + trajectory = f"""## Task Trajectory + +## Module Parameters +{parameters_dict} + +## Input +{rollout['x']} + +## Output +{rollout['target']} + +## Result +{result_section}""" + return trajectory + +class Summarizer: + """A class which use LLM to summarize the trajectories of the memory. It should be able to learn the patterns of the trajectories. Generate a summary to guide the optimizer to generate better candidates. + """ + DEFAULT_SYSTEM_PROMPT = "You are an expert at analyzing agent behavior patterns and providing actionable guidance for parameter optimization." + + DEFAULT_USER_PROMPT_TEMPLATE = """Analyze the following agent conversation trajectories and extract insights for optimization. + + Current Summary (from previous analysis): + {current_summary} + + New Trajectories to Analyze: + {history_trajectories} + + Instructions: + - Review both the Current Summary and the New Trajectories + - Synthesize ALL insights into a single, cohesive summary + - Integrate new patterns with existing knowledge + - Reorganize and consolidate information as needed for clarity + - DO NOT use incremental language like "[Previous points remain valid, plus:]" + - Generate a complete, standalone summary that incorporates everything + + Provide your analysis in XML format: + + Analyze the key patterns and strategies that led to success or failure in these trajectories. Consider both the current summary and new trajectories. + + + A complete, consolidated summary with concrete recommendations for generating better results. This should be a standalone summary that integrates insights from both the current summary and new trajectories, without using incremental modification language. + """ + + def __init__(self, verbose: bool = False, success_threshold: float = 0, + max_candidates_in_prompt: int = 5, + current_summary: str = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: ", + system_prompt: str = None, + user_prompt_template: str = None): + self.llm = LLM() # use the default model + self.max_candidates_in_prompt = max_candidates_in_prompt + self.current_summary = current_summary + self.used_candidates = set() # Track candidates that have been summarized + self.verbose = verbose + self.system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT + self.user_prompt_template = user_prompt_template or self.DEFAULT_USER_PROMPT_TEMPLATE + # Configurable threshold for classifying rollouts as successful (score > threshold) + # or failed (score <= threshold). Defaults to 0 for backward compatibility. + # Previously hardcoded as 0, which also caused rollouts with negative scores to be + # missed by both the success and failure lists. + self.success_threshold = success_threshold + + def _get_trajectories_for_memory(self, memory): + """ + Get trajectories for the memory. Memory is a list of (neg_score, candidate) tuples. + We first collect rollouts from the each candidate, and then get the trajectories for each rollout. + + Return one single string of all trajectories. + """ + trajectories = [] + if self.verbose: + print_color(f"Getting trajectories from {len(memory)} candidates.", "blue") + # Filter out candidates that have already been used and have rollouts + memory_with_rollouts = [(neg_score, candidate) for neg_score, candidate in memory + if len([rollout for rollout in candidate.rollouts if rollout['score'] is not None]) > 0 + and id(candidate) not in self.used_candidates] + if self.verbose: + print_color(f"Memory (unseen candidates) with rollouts: {len(memory_with_rollouts)}", "blue") + # Sample 5 candidates (or fewer if not enough available) + num_to_sample = min(5, len(memory_with_rollouts)) + temporary_memory = random.sample(memory_with_rollouts, k=num_to_sample) + # Mark sampled candidates as used + for _, candidate in temporary_memory: + self.used_candidates.add(id(candidate)) + for _, candidate in temporary_memory: + rollouts = [rollout for rollout in candidate.rollouts if rollout['score'] is not None] + if len(rollouts) == 0: + continue + # For each candidate, add one (if exists) successful_rollout and one (if exists) failed_rollout. + candidate_update_dict = candidate.update_dict.values() + prompt = f"Candidate pamameters: {candidate_update_dict}." + successful_rollouts = [rollout for rollout in rollouts if rollout['score'] > self.success_threshold] + failed_rollouts = [rollout for rollout in rollouts if rollout['score'] <= self.success_threshold] + if len(successful_rollouts) > 0: + random_successful_rollout = random.choice(successful_rollouts) + prompt += f"\nSuccessful trajectory: {get_trajectory_of_one_rollout(random_successful_rollout)}." + if len(failed_rollouts) > 0: + random_failed_rollout = random.choice(failed_rollouts) + prompt += f"\nFailed trajectory: {get_trajectory_of_one_rollout(random_failed_rollout)}." + + trajectories.append(prompt) + if self.verbose: + print_color(f"Generated trajectories from {len(trajectories)} candidates.", "green") + + return '\n'.join(trajectories) + + def summarize(self, memory) -> str: + """Summarize the trajectories using the LLM. + Args: + memory: The memory containing trajectories to summarize. + Returns: + str: The summary. + """ + + history_trajectories = self._get_trajectories_for_memory(memory) + if len(history_trajectories) == 0: + return "No trajectories found for the memory." + + user_prompt = self.user_prompt_template.format( + current_summary=self.current_summary, + history_trajectories=history_trajectories, + ) + + prompt_messages = [ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + + response = self.llm(messages=prompt_messages) + response = response.choices[0].message.content + # Extract summary using XML regex + summary_match = re.search(r'(.*?)', response, re.DOTALL) + + self.current_summary = summary_match.group(1).strip() + + return self.current_summary \ No newline at end of file