diff --git a/benchmark/agbenchmark/config.py b/benchmark/agbenchmark/config.py index a1002bf73..7605b86b5 100644 --- a/benchmark/agbenchmark/config.py +++ b/benchmark/agbenchmark/config.py @@ -4,7 +4,7 @@ from datetime import datetime from pathlib import Path from typing import Optional -from pydantic import BaseSettings +from pydantic import BaseSettings, Field def _calculate_info_test_path(base_path: Path, benchmark_start_time: datetime) -> Path: @@ -57,7 +57,7 @@ class AgentBenchmarkConfig(BaseSettings, extra="allow"): subject application exposes an Agent Protocol compliant API. """ - agbenchmark_config_dir: Path + agbenchmark_config_dir: Path = Field(..., exclude=True) """Path to the agbenchmark_config folder of the subject agent application.""" categories: list[str] | None = None diff --git a/benchmark/agbenchmark/conftest.py b/benchmark/agbenchmark/conftest.py index e54746e56..dbdca56d4 100644 --- a/benchmark/agbenchmark/conftest.py +++ b/benchmark/agbenchmark/conftest.py @@ -6,17 +6,18 @@ import shutil import threading import time from pathlib import Path -from typing import Any, Generator +from typing import Generator import pytest from agbenchmark.config import AgentBenchmarkConfig +from agbenchmark.reports.ReportManager import RegressionTestsTracker from agbenchmark.reports.reports import ( - finalize_reports, - generate_single_call_report, + finalize_test_report, + initialize_test_report, session_finish, ) -from agbenchmark.utils.challenge import Challenge +from agbenchmark.utils.challenge import OPTIONAL_CATEGORIES, Challenge from agbenchmark.utils.data_types import Category GLOBAL_TIMEOUT = ( @@ -28,7 +29,6 @@ logger = logging.getLogger(__name__) pytest_plugins = ["agbenchmark.utils.dependencies"] collect_ignore = ["challenges"] -suite_reports: dict[str, list] = {} @pytest.fixture(scope="module") @@ -118,18 +118,18 @@ def check_regression(request: pytest.FixtureRequest) -> None: request: The request object from which the test name and the benchmark configuration are retrieved. """ - test_name = request.node.parent.name with contextlib.suppress(FileNotFoundError): - regression_report = agbenchmark_config.regression_tests_file - data = json.loads(regression_report.read_bytes()) - challenge_location = getattr(request.node.parent.cls, "CHALLENGE_LOCATION", "") + rt_tracker = RegressionTestsTracker(agbenchmark_config.regression_tests_file) + test_name = request.node.parent.name + challenge_location = getattr(request.node.parent.cls, "CHALLENGE_LOCATION", "") skip_string = f"Skipping {test_name} at {challenge_location}" # Check if the test name exists in the regression tests - if request.config.getoption("--improve") and data.get(test_name, None): + is_regression_test = rt_tracker.has_regression_test(test_name) + if request.config.getoption("--improve") and is_regression_test: pytest.skip(f"{skip_string} because it's a regression test") - elif request.config.getoption("--maintain") and not data.get(test_name, None): + elif request.config.getoption("--maintain") and not is_regression_test: pytest.skip(f"{skip_string} because it's not a regression test") @@ -177,20 +177,14 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None: call: The call object from which the test result is retrieved. """ challenge: type[Challenge] = item.cls # type: ignore - challenge_data = challenge.data - challenge_location = challenge.CHALLENGE_LOCATION + + if call.when == "setup": + test_name = item.nodeid.split("::")[1] + item.user_properties.append(("test_name", test_name)) + initialize_test_report(item, challenge.data) if call.when == "call": - answers = getattr(item, "answers", None) - test_name = item.nodeid.split("::")[1] - item.test_name = test_name - - generate_single_call_report( - item, call, challenge_data, answers, challenge_location, test_name - ) - - if call.when == "teardown": - finalize_reports(agbenchmark_config, item, challenge_data) + finalize_test_report(item, call, agbenchmark_config) def timeout_monitor(start_time: int) -> None: @@ -226,21 +220,7 @@ def pytest_sessionfinish(session: pytest.Session) -> None: Finalizes and saves the test reports. """ - session_finish(agbenchmark_config, suite_reports) - - -@pytest.fixture -def scores(request: pytest.FixtureRequest) -> None: - """ - Pytest fixture that retrieves the scores of the test class. - The scores are retrieved from the `Challenge.scores` attribute - using the test class name. - - Args: - request: The request object. - """ - challenge: type[Challenge] = request.node.cls - return challenge.scores.get(challenge.__name__) + session_finish(agbenchmark_config) def pytest_collection_modifyitems( @@ -255,10 +235,7 @@ def pytest_collection_modifyitems( items: The collected test items to be modified. config: The active pytest configuration. """ - regression_file = agbenchmark_config.regression_tests_file - regression_tests: dict[str, Any] = ( - json.loads(regression_file.read_bytes()) if regression_file.is_file() else {} - ) + rt_tracker = RegressionTestsTracker(agbenchmark_config.regression_tests_file) try: challenges_beaten_in_the_past = json.loads( @@ -295,7 +272,7 @@ def pytest_collection_modifyitems( # --maintain -> only challenges expected to be passed (= regression tests) # --improve -> only challenges that so far are not passed (reliably) # --explore -> only challenges that have never been passed - is_regression_test = regression_tests.get(challenge.data.name, None) + is_regression_test = rt_tracker.has_regression_test(challenge.data.name) has_been_passed = challenges_beaten_in_the_past.get(challenge.data.name, False) if ( (config.getoption("--maintain") and not is_regression_test) @@ -319,17 +296,17 @@ def pytest_collection_modifyitems( elif config.getoption("--improve"): # Filter dependencies, keep only deps that are not "regression" tests dependencies = [ - d for d in dependencies if not regression_tests.get(d, None) + d for d in dependencies if not rt_tracker.has_regression_test(d) ] # Set category markers - challenge_categories = [c.value for c in challenge.data.category] + challenge_categories = set(c.value for c in challenge.data.category) for category in challenge_categories: item.add_marker(category) # Enforce category selection if selected_categories: - if not set(challenge_categories).intersection(set(selected_categories)): + if not challenge_categories.intersection(set(selected_categories)): items.remove(item) continue # # Filter dependencies, keep only deps from selected categories @@ -338,6 +315,22 @@ def pytest_collection_modifyitems( # if not set(d.categories).intersection(set(selected_categories)) # ] + # Skip items in optional categories that are not selected for the subject agent + challenge_optional_categories = challenge_categories & set(OPTIONAL_CATEGORIES) + if challenge_optional_categories and not ( + agbenchmark_config.categories + and challenge_optional_categories.issubset( + set(agbenchmark_config.categories) + ) + ): + logger.debug( + f"Skipping {challenge_name}: " + f"category {' and '.join(challenge_optional_categories)} is optional, " + "and not explicitly selected in the benchmark config." + ) + items.remove(item) + continue + # Add marker for the DependencyManager item.add_marker(pytest.mark.depends(on=dependencies, name=challenge_name)) diff --git a/benchmark/agbenchmark/reports/ReportManager.py b/benchmark/agbenchmark/reports/ReportManager.py index eadb7c0e2..68af0a386 100644 --- a/benchmark/agbenchmark/reports/ReportManager.py +++ b/benchmark/agbenchmark/reports/ReportManager.py @@ -1,21 +1,29 @@ import copy import json +import logging import os import sys import time from datetime import datetime, timezone from pathlib import Path +from typing import Any from agbenchmark.config import AgentBenchmarkConfig from agbenchmark.reports.processing.graphs import save_single_radar_chart from agbenchmark.reports.processing.process_report import get_agent_category -from agbenchmark.reports.processing.report_types import Report +from agbenchmark.reports.processing.report_types import MetricsOverall, Report, Test from agbenchmark.utils.utils import get_highest_success_difficulty +logger = logging.getLogger(__name__) + class SingletonReportManager: instance = None + INFO_MANAGER: "SessionReportManager" + REGRESSION_MANAGER: "RegressionTestsTracker" + SUCCESS_RATE_TRACKER: "SuccessRatesTracker" + def __new__(cls): if not cls.instance: cls.instance = super(SingletonReportManager, cls).__new__(cls) @@ -26,17 +34,16 @@ class SingletonReportManager: ) # or any logic to fetch the datetime # Make the Managers class attributes - cls.REGRESSION_MANAGER = ReportManager( - agent_benchmark_config.regression_tests_file, - benchmark_start_time_dt, - ) - cls.INFO_MANAGER = ReportManager( + cls.INFO_MANAGER = SessionReportManager( agent_benchmark_config.get_report_dir(benchmark_start_time_dt) / "report.json", benchmark_start_time_dt, ) - cls.INTERNAL_INFO_MANAGER = ReportManager( - agent_benchmark_config.success_rate_file, benchmark_start_time_dt + cls.REGRESSION_MANAGER = RegressionTestsTracker( + agent_benchmark_config.regression_tests_file + ) + cls.SUCCESS_RATE_TRACKER = SuccessRatesTracker( + agent_benchmark_config.success_rate_file ) return cls.instance @@ -44,39 +51,33 @@ class SingletonReportManager: @classmethod def clear_instance(cls): cls.instance = None - cls.REGRESSION_MANAGER = None cls.INFO_MANAGER = None - cls.INTERNAL_INFO_MANAGER = None + cls.REGRESSION_MANAGER = None + cls.SUCCESS_RATE_TRACKER = None -class ReportManager: +class BaseReportManager: """Abstracts interaction with the regression tests file""" - def __init__(self, report_file: Path, benchmark_start_time: datetime): + tests: dict[str, Any] + + def __init__(self, report_file: Path): self.report_file = report_file - self.start_time = time.time() - self.benchmark_start_time = benchmark_start_time self.load() def load(self) -> None: if not self.report_file.exists(): self.report_file.parent.mkdir(exist_ok=True) - self.report_file.touch() try: with self.report_file.open("r") as f: - file_content = ( - f.read().strip() - ) # read the content and remove any leading/trailing whitespace - if file_content: # if file is not empty, load the json - data = json.loads(file_content) - self.tests = {k: data[k] for k in sorted(data)} - else: # if file is empty, assign an empty dictionary - self.tests = {} + data = json.load(f) + self.tests = {k: data[k] for k in sorted(data)} except FileNotFoundError: self.tests = {} - except json.decoder.JSONDecodeError: # If JSON is invalid + except json.decoder.JSONDecodeError as e: + logger.warning(f"Could not parse {self.report_file}: {e}") self.tests = {} self.save() @@ -84,13 +85,6 @@ class ReportManager: with self.report_file.open("w") as f: json.dump(self.tests, f, indent=4) - def add_test(self, test_name: str, test_details: dict | list) -> None: - if test_name.startswith("Test"): - test_name = test_name[4:] - self.tests[test_name] = test_details - - self.save() - def remove_test(self, test_name: str) -> None: if test_name in self.tests: del self.tests[test_name] @@ -100,34 +94,61 @@ class ReportManager: self.tests = {} self.save() - def end_info_report(self, config: AgentBenchmarkConfig) -> None: + +class SessionReportManager(BaseReportManager): + """Abstracts interaction with the regression tests file""" + + tests: dict[str, Test] | Report + + def __init__(self, report_file: Path, benchmark_start_time: datetime): + super().__init__(report_file) + + self.start_time = time.time() + self.benchmark_start_time = benchmark_start_time + + def save(self) -> None: + with self.report_file.open("w") as f: + if isinstance(self.tests, Report): + f.write(self.tests.json(indent=4)) + else: + json.dump({k: v.dict() for k, v in self.tests.items()}, f, indent=4) + + def add_test_report(self, test_name: str, test_report: Test) -> None: + if isinstance(self.tests, Report): + raise RuntimeError("Session report already finalized") + + if test_name.startswith("Test"): + test_name = test_name[4:] + self.tests[test_name] = test_report + + self.save() + + def finalize_session_report(self, config: AgentBenchmarkConfig) -> None: command = " ".join(sys.argv) - self.tests = { - "command": command.split(os.sep)[-1], - "benchmark_git_commit_sha": "---", - "agent_git_commit_sha": "---", - "completion_time": datetime.now(timezone.utc).strftime( + if isinstance(self.tests, Report): + raise RuntimeError("Session report already finalized") + + self.tests = Report( + command=command.split(os.sep)[-1], + benchmark_git_commit_sha="---", + agent_git_commit_sha="---", + completion_time=datetime.now(timezone.utc).strftime( "%Y-%m-%dT%H:%M:%S+00:00" ), - "benchmark_start_time": self.benchmark_start_time.strftime( + benchmark_start_time=self.benchmark_start_time.strftime( "%Y-%m-%dT%H:%M:%S+00:00" ), - "metrics": { - "run_time": str(round(time.time() - self.start_time, 2)) + " seconds", - "highest_difficulty": get_highest_success_difficulty(self.tests), - "total_cost": self.get_total_costs(), - }, - "tests": copy.copy(self.tests), - "config": { - k: v for k, v in json.loads(config.json()).items() if v is not None - }, - } - Report.parse_obj(self.tests) + metrics=MetricsOverall( + run_time=str(round(time.time() - self.start_time, 2)) + " seconds", + highest_difficulty=get_highest_success_difficulty(self.tests), + total_cost=self.get_total_costs(), + ), + tests=copy.copy(self.tests), + config=config.dict(exclude_none=True), + ) - converted_data = Report.parse_obj(self.tests) - - agent_categories = get_agent_category(converted_data) + agent_categories = get_agent_category(self.tests) if len(agent_categories) > 1: save_single_radar_chart( agent_categories, @@ -137,12 +158,15 @@ class ReportManager: self.save() def get_total_costs(self): + if isinstance(self.tests, Report): + tests = self.tests.tests + else: + tests = self.tests + total_cost = 0 all_costs_none = True - for test_name, test_data in self.tests.items(): - cost = test_data["metrics"].get( - "cost", 0 - ) # gets the cost or defaults to 0 if cost is missing + for test_data in tests.values(): + cost = test_data.metrics.cost or 0.0 if cost is not None: # check if cost is not None all_costs_none = False @@ -150,3 +174,32 @@ class ReportManager: if all_costs_none: total_cost = None return total_cost + + +class RegressionTestsTracker(BaseReportManager): + """Abstracts interaction with the regression tests file""" + + tests: dict[str, dict] + + def add_test(self, test_name: str, test_details: dict) -> None: + if test_name.startswith("Test"): + test_name = test_name[4:] + self.tests[test_name] = test_details + + self.save() + + def has_regression_test(self, test_name: str) -> bool: + return self.tests.get(test_name) is not None + + +class SuccessRatesTracker(BaseReportManager): + """Abstracts interaction with the regression tests file""" + + tests: dict[str, list[bool]] + + def update(self, test_name: str, success_history: list[bool]) -> None: + if test_name.startswith("Test"): + test_name = test_name[4:] + self.tests[test_name] = success_history + + self.save() diff --git a/benchmark/agbenchmark/reports/reports.py b/benchmark/agbenchmark/reports/reports.py index 684b715b7..c6b7f6119 100644 --- a/benchmark/agbenchmark/reports/reports.py +++ b/benchmark/agbenchmark/reports/reports.py @@ -3,11 +3,11 @@ import logging import os import sys from pathlib import Path -from typing import Any, Dict import pytest from agbenchmark.config import AgentBenchmarkConfig +from agbenchmark.reports.processing.report_types import Metrics, Test from agbenchmark.reports.ReportManager import SingletonReportManager from agbenchmark.utils.data_types import ChallengeData, DifficultyLevel from agbenchmark.utils.get_data_from_helicone import get_data_from_helicone @@ -16,24 +16,22 @@ from agbenchmark.utils.utils import calculate_success_percentage logger = logging.getLogger(__name__) -def get_previous_test_results( - test_name: str, info_details: dict[str, Any] -) -> list[bool]: +def get_and_update_success_history(test_name: str, info_details: Test) -> list[bool]: mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv - prev_test_results = SingletonReportManager().INTERNAL_INFO_MANAGER.tests.get( + prev_test_results = SingletonReportManager().SUCCESS_RATE_TRACKER.tests.get( test_name, [] ) - if not mock: + if not mock and info_details.metrics.success is not None: # only add if it's an actual test - prev_test_results.append(info_details["metrics"]["success"]) - SingletonReportManager().INTERNAL_INFO_MANAGER.add_test( + prev_test_results.append(info_details.metrics.success) + SingletonReportManager().SUCCESS_RATE_TRACKER.update( test_name, prev_test_results ) # can calculate success rate regardless of mock - info_details["metrics"]["success_%"] = calculate_success_percentage( + info_details.metrics.success_percentage = calculate_success_percentage( prev_test_results ) @@ -42,25 +40,22 @@ def get_previous_test_results( def update_regression_tests( prev_test_results: list[bool], - info_details: dict, + info_details: Test, test_name: str, - test_details: dict, ) -> None: if len(prev_test_results) >= 3 and prev_test_results[-3:] == [True, True, True]: # if the last 3 tests were successful, add to the regression tests - info_details["is_regression"] = True - SingletonReportManager().REGRESSION_MANAGER.add_test(test_name, test_details) + info_details.is_regression = True + SingletonReportManager().REGRESSION_MANAGER.add_test( + test_name, info_details.dict(include={"difficulty", "data_path"}) + ) -def generate_single_call_report( +def initialize_test_report( item: pytest.Item, - call: pytest.CallInfo, - challenge_data: ChallengeData, - answers: dict[str, Any], - challenge_location: str, - test_name: str, -) -> None: - difficulty = challenge_data.info.difficulty + challenge_info: ChallengeData, +): + difficulty = challenge_info.info.difficulty if isinstance(difficulty, DifficultyLevel): difficulty = difficulty.value @@ -70,60 +65,55 @@ def generate_single_call_report( # test_name = item.nodeid.split("::")[1] # item.test_name = test_name - test_details = { - "difficulty": difficulty, - "data_path": challenge_location, - } - - info_details: Any = { - "data_path": challenge_location, - "is_regression": False, - "category": challenge_data.category, - "task": challenge_data.task, - "answer": challenge_data.ground.answer, - "description": challenge_data.info.description, - "metrics": { - "difficulty": difficulty, - "success": False, - "attempted": True, - }, - # "answers": answers, - } - if answers: - info_details["answers"] = answers - - if challenge_data.metadata: - info_details["metadata"] = challenge_data.metadata - - mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv - if call: - if call.excinfo is None: - info_details["metrics"]["success"] = True - else: - if not mock: # don't remove if it's a mock test - SingletonReportManager().REGRESSION_MANAGER.remove_test(test_name) - info_details["metrics"]["fail_reason"] = str(call.excinfo.value) - if call.excinfo.typename == "Skipped": - info_details["metrics"]["attempted"] = False - - prev_test_results: list[bool] = get_previous_test_results(test_name, info_details) - - update_regression_tests(prev_test_results, info_details, test_name, test_details) + test_info = dict(item.user_properties).get("info_details") or Test( + data_path=str(challenge_info.spec_file), + is_regression=False, + category=[c.value for c in challenge_info.category], + task=challenge_info.task, + answer=challenge_info.ground.answer, + description=challenge_info.info.description, + metrics=Metrics( + difficulty=challenge_info.info.difficulty.value, + attempted=False, + ), + ) # user facing reporting if item: - item.info_details = info_details + item.user_properties.append(("info_details", test_info)) - return info_details + return test_info -def finalize_reports( - config: AgentBenchmarkConfig, item: pytest.Item, challenge_data: ChallengeData +def finalize_test_report( + item: pytest.Item, call: pytest.CallInfo, config: AgentBenchmarkConfig ) -> None: - run_time = dict(item.user_properties).get("run_time") + user_properties: dict = dict(item.user_properties) + run_time = user_properties.get("run_time") - info_details = getattr(item, "info_details", {}) - test_name = getattr(item, "test_name", "") + info_details: Test = user_properties.get("info_details", {}) + test_name: str = user_properties.get("test_name", "") + + mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv + if call: + logger.debug(f"Finalizing report with CallInfo: {vars(call)}") + if call.excinfo is None: + info_details.metrics.success = True + else: + if not mock: # don't remove if it's a mock test + SingletonReportManager().REGRESSION_MANAGER.remove_test(test_name) + info_details.metrics.fail_reason = str(call.excinfo.value) + if call.excinfo.typename == "Skipped": + info_details.metrics.attempted = False + info_details.metrics.attempted = True + info_details.metrics.run_time = f"{str(round(call.duration, 3))} seconds" + info_details.reached_cutoff = user_properties.get("timed_out", False) + + prev_test_results: list[bool] = get_and_update_success_history( + test_name, info_details + ) + + update_regression_tests(prev_test_results, info_details, test_name) if info_details and test_name: if run_time is not None: @@ -133,42 +123,20 @@ def finalize_reports( cost = get_data_from_helicone(test_name) logger.debug(f"Cost: {cost}") - info_details["metrics"]["cost"] = cost - - if info_details["metrics"].get("success", None) is None: - info_details["metrics"]["attempted"] = False - info_details["metrics"]["success"] = False - elif ( - info_details["metrics"].get("success") is False - and "attempted" not in info_details["metrics"] - ): - info_details["metrics"]["attempted"] = False - - info_details["metrics"]["run_time"] = f"{str(round(run_time, 3))} seconds" - - info_details["reached_cutoff"] = float(run_time) > challenge_data.cutoff + info_details.metrics.cost = cost if "--mock" not in sys.argv: update_challenges_already_beaten( config.challenges_already_beaten_file, info_details, test_name ) - if info_details.get("tests") is not None: - for nested_test_name, nested_test_info in info_details[ - "tests" - ].items(): - update_challenges_already_beaten( - config.challenges_already_beaten_file, - nested_test_info, - nested_test_name, - ) - SingletonReportManager().INFO_MANAGER.add_test(test_name, info_details) + SingletonReportManager().INFO_MANAGER.add_test_report(test_name, info_details) def update_challenges_already_beaten( - challenges_already_beaten_file: Path, info_details: Dict[str, Any], test_name: str + challenges_already_beaten_file: Path, info_details: Test, test_name: str ) -> None: - current_run_successful = info_details["metrics"]["success"] + current_run_successful = info_details.metrics.success try: with open(challenges_already_beaten_file, "r") as f: challenge_data = json.load(f) @@ -184,9 +152,7 @@ def update_challenges_already_beaten( json.dump(challenge_data, f, indent=4) -def session_finish( - agbenchmark_config: AgentBenchmarkConfig, suite_reports: dict -) -> None: - SingletonReportManager().INTERNAL_INFO_MANAGER.save() - SingletonReportManager().INFO_MANAGER.end_info_report(agbenchmark_config) +def session_finish(agbenchmark_config: AgentBenchmarkConfig) -> None: + SingletonReportManager().INFO_MANAGER.finalize_session_report(agbenchmark_config) SingletonReportManager().REGRESSION_MANAGER.save() + SingletonReportManager().SUCCESS_RATE_TRACKER.save() diff --git a/benchmark/agbenchmark/utils/challenge.py b/benchmark/agbenchmark/utils/challenge.py index c7d1f36f6..0650ff00e 100644 --- a/benchmark/agbenchmark/utils/challenge.py +++ b/benchmark/agbenchmark/utils/challenge.py @@ -61,9 +61,6 @@ class Challenge(ABC): async def test_method( self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest ) -> None: - # skip optional categories - self.skip_optional_categories(config) - if os.environ.get("HELICONE_API_KEY"): from helicone.lock import HeliconeLockManager @@ -269,16 +266,3 @@ class Challenge(ABC): return 1 return None - - @classmethod - def skip_optional_categories(cls, config: AgentBenchmarkConfig) -> None: - challenge_categories = set(c.value for c in cls.data.category) - challenge_optional_categories = challenge_categories & set(OPTIONAL_CATEGORIES) - if challenge_optional_categories and not ( - config.categories - and set(challenge_optional_categories).issubset(set(config.categories)) - ): - pytest.skip( - f"Category {', '.join(challenge_optional_categories)} is optional, " - "and not explicitly selected in the benchmark config." - ) diff --git a/benchmark/agbenchmark/utils/data_types.py b/benchmark/agbenchmark/utils/data_types.py index b38e5ef23..e9b1fa223 100644 --- a/benchmark/agbenchmark/utils/data_types.py +++ b/benchmark/agbenchmark/utils/data_types.py @@ -108,8 +108,8 @@ class ChallengeData(BaseModel): task: str dependencies: List[str] cutoff: int - ground: Ground | Dict[str, Ground] - info: Info | Dict[str, Info] + ground: Ground + info: Info metadata: Optional[Dict[str, Any]] = None spec_file: Path | None = Field(None, exclude=True) diff --git a/benchmark/agbenchmark/utils/utils.py b/benchmark/agbenchmark/utils/utils.py index a7756766a..31596a9a7 100644 --- a/benchmark/agbenchmark/utils/utils.py +++ b/benchmark/agbenchmark/utils/utils.py @@ -8,6 +8,7 @@ from typing import Any, Optional from dotenv import load_dotenv +from agbenchmark.reports.processing.report_types import Test from agbenchmark.utils.data_types import DIFFICULTY_MAP, DifficultyLevel load_dotenv() @@ -63,41 +64,31 @@ def get_test_path(json_file: str | Path) -> str: def get_highest_success_difficulty( - data: dict, just_string: Optional[bool] = None + data: dict[str, Test], just_string: Optional[bool] = None ) -> str: highest_difficulty = None highest_difficulty_level = 0 for test_name, test_data in data.items(): try: - if test_data.get("tests", None): - highest_difficulty_str = test_data["metrics"]["highest_difficulty"] + if test_data.metrics.success: + difficulty_str = test_data.metrics.difficulty + if not difficulty_str: + continue + try: - highest_difficulty = DifficultyLevel[highest_difficulty_str] - highest_difficulty_level = DIFFICULTY_MAP[highest_difficulty] + difficulty_enum = DifficultyLevel[difficulty_str.lower()] + difficulty_level = DIFFICULTY_MAP[difficulty_enum] + + if difficulty_level > highest_difficulty_level: + highest_difficulty = difficulty_enum + highest_difficulty_level = difficulty_level except KeyError: logger.warning( - f"Unexpected difficulty level '{highest_difficulty_str}' " + f"Unexpected difficulty level '{difficulty_str}' " f"in test '{test_name}'" ) continue - else: - if test_data["metrics"]["success"]: - difficulty_str = test_data["metrics"]["difficulty"] - - try: - difficulty_enum = DifficultyLevel[difficulty_str.lower()] - difficulty_level = DIFFICULTY_MAP[difficulty_enum] - - if difficulty_level > highest_difficulty_level: - highest_difficulty = difficulty_enum - highest_difficulty_level = difficulty_level - except KeyError: - logger.warning( - f"Unexpected difficulty level '{difficulty_str}' " - f"in test '{test_name}'" - ) - continue except Exception as e: logger.warning( "An unexpected error [1] occurred while analyzing report [2]."