# Adapted from ReEvo: https://github.com/ai4co/reevo/blob/main/baselines/eoh/problem_adapter.py
# and https://github.com/ai4co/reevo/blob/main/reevo.py
# Licensed under the MIT License (see THIRD-PARTY-LICENSES.txt)
import os
import logging
from pathlib import Path
import subprocess
from typing import TypeVar
from llamda.individual import Individual
from llamda.problem import BaseProblem
logger = logging.getLogger("llamda")
T = TypeVar("T", bound=Individual)
[docs]
class Evaluator:
def __init__(self, problem: BaseProblem, timeout: int = 30) -> None:
self.problem = problem
self.timeout = timeout
self.function_evals = 0
def _logging_context(self) -> dict:
return {
"problem_name": self.problem.name,
"function_evals": self.function_evals,
}
[docs]
def mark_invalid_individual(self, individual: T, traceback_msg: str) -> T:
"""
Mark an individual as invalid.
"""
logger.debug(
"Marking individual as invalid", extra={"individual_name": individual.name}
)
individual.exec_success = False
individual.obj = float("inf")
individual.traceback_msg = traceback_msg
return individual
[docs]
def batch_evaluate(self, population: list[T], output_dir: Path) -> list[T]:
"""
Evaluate population by running code in parallel and computing objective values.
"""
logger.info(
"Starting batch evaluation",
extra={
**self._logging_context(),
"population_size": len(population),
"population_names": [ind.name for ind in population],
},
)
inner_runs: list[subprocess.Popen[str] | None] = []
# Run code to evaluate
for i, individual in enumerate(population):
logger.info(
f"Evaluating individual [{i}/{len(population) - 1}]",
extra={**self._logging_context(), "individual_name": individual.name},
)
individual_dir = output_dir / "individuals" / individual.name
os.makedirs(individual_dir, exist_ok=True)
stdout_path = individual_dir / "stdout.txt"
code_path = individual_dir / "code.py"
self.function_evals += 1
# Write code to file if present; otherwise, mark as invalid
if individual.code is not None:
individual.write_code_to_file(str(code_path))
else:
logger.debug(
"There is no code to run for this individual.",
extra={
**self._logging_context(),
"individual_name": individual.name,
},
)
individual = self.mark_invalid_individual(
individual, "Invalid response!"
)
inner_runs.append(None)
continue
try:
process = self._run_code(str(code_path))
inner_runs.append(process)
except Exception as e: # If code execution fails
logger.exception(
"Failed to run code for individual.",
extra={
**self._logging_context(),
"individual_name": individual.name,
},
)
individual = self.mark_invalid_individual(individual, str(e))
inner_runs.append(None)
# Update population with objective values
for individual, inner_run in zip(population, inner_runs):
if inner_run is None: # If code execution fails, skip
continue
try:
stdout_str, _ = inner_run.communicate(timeout=self.timeout)
with open(stdout_path, "w") as f:
f.write(stdout_str)
except subprocess.TimeoutExpired:
logger.warning(
"Timeout expired during code execution",
extra={
**self._logging_context(),
"timeout": self.timeout,
"individual_name": individual.name,
},
)
individual = self.mark_invalid_individual(
individual, "Timeout expired during code execution"
)
inner_run.kill()
continue
traceback_msg = filter_traceback(stdout_str)
# Store objective value for each individual
if traceback_msg == "": # If execution has no error
try:
individual.obj = float(stdout_str.split("\n")[-2])
assert individual.obj > 0, "Objective value <= 0 is not supported."
individual.obj = (
-individual.obj
if self.problem.obj_type == "max"
else individual.obj
)
individual.exec_success = True
except Exception:
logger.exception(
"Failed to parse objective value",
extra={
**self._logging_context(),
"individual_name": individual.name,
},
)
individual = self.mark_invalid_individual(
individual, "Invalid std out / objective value!"
)
else: # Otherwise, also provide execution traceback error feedback
logger.warning(
"Code evaluation of individual failed with traceback",
extra={
**self._logging_context(),
"individual_name": individual.name,
"traceback": traceback_msg,
},
)
individual = self.mark_invalid_individual(individual, traceback_msg)
logger.debug(
"Individual evaluated successfully",
extra={
**self._logging_context(),
"individual_name": individual.name,
"objective": individual.obj,
},
)
logger.info(
"Batch evaluation completed",
extra={
**self._logging_context(),
"successful_individuals": sum(
1 for ind in population if ind.exec_success
),
"failed_individuals": sum(
1 for ind in population if not ind.exec_success
),
},
)
return population
def _run_code(self, code_path: str) -> subprocess.Popen:
"""Run the evaluation script with the individual's code."""
process = subprocess.Popen(
[
"python",
"-u",
str(self.problem.eval_path),
f"{self.problem.size}",
"train",
"--code-path",
code_path,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
return process
[docs]
def filter_traceback(s: str) -> str:
lines = s.split("\n")
filtered_lines = []
for i, line in enumerate(lines):
if line.startswith("Traceback"):
for j in range(i, len(lines)):
if "Set the environment variable HYDRA_FULL_ERROR=1" in lines[j]:
break
filtered_lines.append(lines[j])
return "\n".join(filtered_lines)
return "" # Return an empty string if no Traceback is found