refactor(benchmark): Refactor & typefix report generation and handling logic

- Rename functions in reports.py and ReportManager.py to better reflect what they do
   - `get_previous_test_results` -> `get_and_update_success_history`
   - `generate_single_call_report` -> `initialize_test_report`
   - `finalize_reports` -> `finalize_test_report`
   - `ReportManager.end_info_report` -> `SessionReportManager.finalize_session_report`
- Modify `pytest_runtest_makereport` hook in conftest.py to finalize the report immediately after the challenge finishes running instead of after teardown
   - Move result processing logic from `initialize_test_report` to `finalize_test_report` in reports.py
- Use `Test` and `Report` types from report_types.py where possible instead of untyped dicts: reports.py, utils.py, ReportManager.py
- Differentiate `ReportManager` into `SessionReportManager`, `RegressionTestsTracker`, `SuccessRateTracker`
- Move filtering of optional challenge categories from challenge.py (`Challenge.skip_optional_categories`) to conftest.py (`pytest_collection_modifyitems`)
- Remove unused `scores` fixture in conftest.py
pull/6691/head
Reinier van der Leer 2024-01-09 16:02:25 +01:00
parent 370d6dbf5d
commit 6a256fef4c
No known key found for this signature in database
GPG Key ID: CDC1180FDAE06193
7 changed files with 230 additions and 243 deletions

View File

@ -4,7 +4,7 @@ from datetime import datetime
from pathlib import Path
from typing import Optional
from pydantic import BaseSettings
from pydantic import BaseSettings, Field
def _calculate_info_test_path(base_path: Path, benchmark_start_time: datetime) -> Path:
@ -57,7 +57,7 @@ class AgentBenchmarkConfig(BaseSettings, extra="allow"):
subject application exposes an Agent Protocol compliant API.
"""
agbenchmark_config_dir: Path
agbenchmark_config_dir: Path = Field(..., exclude=True)
"""Path to the agbenchmark_config folder of the subject agent application."""
categories: list[str] | None = None

View File

@ -6,17 +6,18 @@ import shutil
import threading
import time
from pathlib import Path
from typing import Any, Generator
from typing import Generator
import pytest
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.reports.ReportManager import RegressionTestsTracker
from agbenchmark.reports.reports import (
finalize_reports,
generate_single_call_report,
finalize_test_report,
initialize_test_report,
session_finish,
)
from agbenchmark.utils.challenge import Challenge
from agbenchmark.utils.challenge import OPTIONAL_CATEGORIES, Challenge
from agbenchmark.utils.data_types import Category
GLOBAL_TIMEOUT = (
@ -28,7 +29,6 @@ logger = logging.getLogger(__name__)
pytest_plugins = ["agbenchmark.utils.dependencies"]
collect_ignore = ["challenges"]
suite_reports: dict[str, list] = {}
@pytest.fixture(scope="module")
@ -118,18 +118,18 @@ def check_regression(request: pytest.FixtureRequest) -> None:
request: The request object from which the test name and the benchmark
configuration are retrieved.
"""
test_name = request.node.parent.name
with contextlib.suppress(FileNotFoundError):
regression_report = agbenchmark_config.regression_tests_file
data = json.loads(regression_report.read_bytes())
challenge_location = getattr(request.node.parent.cls, "CHALLENGE_LOCATION", "")
rt_tracker = RegressionTestsTracker(agbenchmark_config.regression_tests_file)
test_name = request.node.parent.name
challenge_location = getattr(request.node.parent.cls, "CHALLENGE_LOCATION", "")
skip_string = f"Skipping {test_name} at {challenge_location}"
# Check if the test name exists in the regression tests
if request.config.getoption("--improve") and data.get(test_name, None):
is_regression_test = rt_tracker.has_regression_test(test_name)
if request.config.getoption("--improve") and is_regression_test:
pytest.skip(f"{skip_string} because it's a regression test")
elif request.config.getoption("--maintain") and not data.get(test_name, None):
elif request.config.getoption("--maintain") and not is_regression_test:
pytest.skip(f"{skip_string} because it's not a regression test")
@ -177,20 +177,14 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
call: The call object from which the test result is retrieved.
"""
challenge: type[Challenge] = item.cls # type: ignore
challenge_data = challenge.data
challenge_location = challenge.CHALLENGE_LOCATION
if call.when == "setup":
test_name = item.nodeid.split("::")[1]
item.user_properties.append(("test_name", test_name))
initialize_test_report(item, challenge.data)
if call.when == "call":
answers = getattr(item, "answers", None)
test_name = item.nodeid.split("::")[1]
item.test_name = test_name
generate_single_call_report(
item, call, challenge_data, answers, challenge_location, test_name
)
if call.when == "teardown":
finalize_reports(agbenchmark_config, item, challenge_data)
finalize_test_report(item, call, agbenchmark_config)
def timeout_monitor(start_time: int) -> None:
@ -226,21 +220,7 @@ def pytest_sessionfinish(session: pytest.Session) -> None:
Finalizes and saves the test reports.
"""
session_finish(agbenchmark_config, suite_reports)
@pytest.fixture
def scores(request: pytest.FixtureRequest) -> None:
"""
Pytest fixture that retrieves the scores of the test class.
The scores are retrieved from the `Challenge.scores` attribute
using the test class name.
Args:
request: The request object.
"""
challenge: type[Challenge] = request.node.cls
return challenge.scores.get(challenge.__name__)
session_finish(agbenchmark_config)
def pytest_collection_modifyitems(
@ -255,10 +235,7 @@ def pytest_collection_modifyitems(
items: The collected test items to be modified.
config: The active pytest configuration.
"""
regression_file = agbenchmark_config.regression_tests_file
regression_tests: dict[str, Any] = (
json.loads(regression_file.read_bytes()) if regression_file.is_file() else {}
)
rt_tracker = RegressionTestsTracker(agbenchmark_config.regression_tests_file)
try:
challenges_beaten_in_the_past = json.loads(
@ -295,7 +272,7 @@ def pytest_collection_modifyitems(
# --maintain -> only challenges expected to be passed (= regression tests)
# --improve -> only challenges that so far are not passed (reliably)
# --explore -> only challenges that have never been passed
is_regression_test = regression_tests.get(challenge.data.name, None)
is_regression_test = rt_tracker.has_regression_test(challenge.data.name)
has_been_passed = challenges_beaten_in_the_past.get(challenge.data.name, False)
if (
(config.getoption("--maintain") and not is_regression_test)
@ -319,17 +296,17 @@ def pytest_collection_modifyitems(
elif config.getoption("--improve"):
# Filter dependencies, keep only deps that are not "regression" tests
dependencies = [
d for d in dependencies if not regression_tests.get(d, None)
d for d in dependencies if not rt_tracker.has_regression_test(d)
]
# Set category markers
challenge_categories = [c.value for c in challenge.data.category]
challenge_categories = set(c.value for c in challenge.data.category)
for category in challenge_categories:
item.add_marker(category)
# Enforce category selection
if selected_categories:
if not set(challenge_categories).intersection(set(selected_categories)):
if not challenge_categories.intersection(set(selected_categories)):
items.remove(item)
continue
# # Filter dependencies, keep only deps from selected categories
@ -338,6 +315,22 @@ def pytest_collection_modifyitems(
# if not set(d.categories).intersection(set(selected_categories))
# ]
# Skip items in optional categories that are not selected for the subject agent
challenge_optional_categories = challenge_categories & set(OPTIONAL_CATEGORIES)
if challenge_optional_categories and not (
agbenchmark_config.categories
and challenge_optional_categories.issubset(
set(agbenchmark_config.categories)
)
):
logger.debug(
f"Skipping {challenge_name}: "
f"category {' and '.join(challenge_optional_categories)} is optional, "
"and not explicitly selected in the benchmark config."
)
items.remove(item)
continue
# Add marker for the DependencyManager
item.add_marker(pytest.mark.depends(on=dependencies, name=challenge_name))

View File

@ -1,21 +1,29 @@
import copy
import json
import logging
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.reports.processing.graphs import save_single_radar_chart
from agbenchmark.reports.processing.process_report import get_agent_category
from agbenchmark.reports.processing.report_types import Report
from agbenchmark.reports.processing.report_types import MetricsOverall, Report, Test
from agbenchmark.utils.utils import get_highest_success_difficulty
logger = logging.getLogger(__name__)
class SingletonReportManager:
instance = None
INFO_MANAGER: "SessionReportManager"
REGRESSION_MANAGER: "RegressionTestsTracker"
SUCCESS_RATE_TRACKER: "SuccessRatesTracker"
def __new__(cls):
if not cls.instance:
cls.instance = super(SingletonReportManager, cls).__new__(cls)
@ -26,17 +34,16 @@ class SingletonReportManager:
) # or any logic to fetch the datetime
# Make the Managers class attributes
cls.REGRESSION_MANAGER = ReportManager(
agent_benchmark_config.regression_tests_file,
benchmark_start_time_dt,
)
cls.INFO_MANAGER = ReportManager(
cls.INFO_MANAGER = SessionReportManager(
agent_benchmark_config.get_report_dir(benchmark_start_time_dt)
/ "report.json",
benchmark_start_time_dt,
)
cls.INTERNAL_INFO_MANAGER = ReportManager(
agent_benchmark_config.success_rate_file, benchmark_start_time_dt
cls.REGRESSION_MANAGER = RegressionTestsTracker(
agent_benchmark_config.regression_tests_file
)
cls.SUCCESS_RATE_TRACKER = SuccessRatesTracker(
agent_benchmark_config.success_rate_file
)
return cls.instance
@ -44,39 +51,33 @@ class SingletonReportManager:
@classmethod
def clear_instance(cls):
cls.instance = None
cls.REGRESSION_MANAGER = None
cls.INFO_MANAGER = None
cls.INTERNAL_INFO_MANAGER = None
cls.REGRESSION_MANAGER = None
cls.SUCCESS_RATE_TRACKER = None
class ReportManager:
class BaseReportManager:
"""Abstracts interaction with the regression tests file"""
def __init__(self, report_file: Path, benchmark_start_time: datetime):
tests: dict[str, Any]
def __init__(self, report_file: Path):
self.report_file = report_file
self.start_time = time.time()
self.benchmark_start_time = benchmark_start_time
self.load()
def load(self) -> None:
if not self.report_file.exists():
self.report_file.parent.mkdir(exist_ok=True)
self.report_file.touch()
try:
with self.report_file.open("r") as f:
file_content = (
f.read().strip()
) # read the content and remove any leading/trailing whitespace
if file_content: # if file is not empty, load the json
data = json.loads(file_content)
self.tests = {k: data[k] for k in sorted(data)}
else: # if file is empty, assign an empty dictionary
self.tests = {}
data = json.load(f)
self.tests = {k: data[k] for k in sorted(data)}
except FileNotFoundError:
self.tests = {}
except json.decoder.JSONDecodeError: # If JSON is invalid
except json.decoder.JSONDecodeError as e:
logger.warning(f"Could not parse {self.report_file}: {e}")
self.tests = {}
self.save()
@ -84,13 +85,6 @@ class ReportManager:
with self.report_file.open("w") as f:
json.dump(self.tests, f, indent=4)
def add_test(self, test_name: str, test_details: dict | list) -> None:
if test_name.startswith("Test"):
test_name = test_name[4:]
self.tests[test_name] = test_details
self.save()
def remove_test(self, test_name: str) -> None:
if test_name in self.tests:
del self.tests[test_name]
@ -100,34 +94,61 @@ class ReportManager:
self.tests = {}
self.save()
def end_info_report(self, config: AgentBenchmarkConfig) -> None:
class SessionReportManager(BaseReportManager):
"""Abstracts interaction with the regression tests file"""
tests: dict[str, Test] | Report
def __init__(self, report_file: Path, benchmark_start_time: datetime):
super().__init__(report_file)
self.start_time = time.time()
self.benchmark_start_time = benchmark_start_time
def save(self) -> None:
with self.report_file.open("w") as f:
if isinstance(self.tests, Report):
f.write(self.tests.json(indent=4))
else:
json.dump({k: v.dict() for k, v in self.tests.items()}, f, indent=4)
def add_test_report(self, test_name: str, test_report: Test) -> None:
if isinstance(self.tests, Report):
raise RuntimeError("Session report already finalized")
if test_name.startswith("Test"):
test_name = test_name[4:]
self.tests[test_name] = test_report
self.save()
def finalize_session_report(self, config: AgentBenchmarkConfig) -> None:
command = " ".join(sys.argv)
self.tests = {
"command": command.split(os.sep)[-1],
"benchmark_git_commit_sha": "---",
"agent_git_commit_sha": "---",
"completion_time": datetime.now(timezone.utc).strftime(
if isinstance(self.tests, Report):
raise RuntimeError("Session report already finalized")
self.tests = Report(
command=command.split(os.sep)[-1],
benchmark_git_commit_sha="---",
agent_git_commit_sha="---",
completion_time=datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%S+00:00"
),
"benchmark_start_time": self.benchmark_start_time.strftime(
benchmark_start_time=self.benchmark_start_time.strftime(
"%Y-%m-%dT%H:%M:%S+00:00"
),
"metrics": {
"run_time": str(round(time.time() - self.start_time, 2)) + " seconds",
"highest_difficulty": get_highest_success_difficulty(self.tests),
"total_cost": self.get_total_costs(),
},
"tests": copy.copy(self.tests),
"config": {
k: v for k, v in json.loads(config.json()).items() if v is not None
},
}
Report.parse_obj(self.tests)
metrics=MetricsOverall(
run_time=str(round(time.time() - self.start_time, 2)) + " seconds",
highest_difficulty=get_highest_success_difficulty(self.tests),
total_cost=self.get_total_costs(),
),
tests=copy.copy(self.tests),
config=config.dict(exclude_none=True),
)
converted_data = Report.parse_obj(self.tests)
agent_categories = get_agent_category(converted_data)
agent_categories = get_agent_category(self.tests)
if len(agent_categories) > 1:
save_single_radar_chart(
agent_categories,
@ -137,12 +158,15 @@ class ReportManager:
self.save()
def get_total_costs(self):
if isinstance(self.tests, Report):
tests = self.tests.tests
else:
tests = self.tests
total_cost = 0
all_costs_none = True
for test_name, test_data in self.tests.items():
cost = test_data["metrics"].get(
"cost", 0
) # gets the cost or defaults to 0 if cost is missing
for test_data in tests.values():
cost = test_data.metrics.cost or 0.0
if cost is not None: # check if cost is not None
all_costs_none = False
@ -150,3 +174,32 @@ class ReportManager:
if all_costs_none:
total_cost = None
return total_cost
class RegressionTestsTracker(BaseReportManager):
"""Abstracts interaction with the regression tests file"""
tests: dict[str, dict]
def add_test(self, test_name: str, test_details: dict) -> None:
if test_name.startswith("Test"):
test_name = test_name[4:]
self.tests[test_name] = test_details
self.save()
def has_regression_test(self, test_name: str) -> bool:
return self.tests.get(test_name) is not None
class SuccessRatesTracker(BaseReportManager):
"""Abstracts interaction with the regression tests file"""
tests: dict[str, list[bool]]
def update(self, test_name: str, success_history: list[bool]) -> None:
if test_name.startswith("Test"):
test_name = test_name[4:]
self.tests[test_name] = success_history
self.save()

View File

@ -3,11 +3,11 @@ import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict
import pytest
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.reports.processing.report_types import Metrics, Test
from agbenchmark.reports.ReportManager import SingletonReportManager
from agbenchmark.utils.data_types import ChallengeData, DifficultyLevel
from agbenchmark.utils.get_data_from_helicone import get_data_from_helicone
@ -16,24 +16,22 @@ from agbenchmark.utils.utils import calculate_success_percentage
logger = logging.getLogger(__name__)
def get_previous_test_results(
test_name: str, info_details: dict[str, Any]
) -> list[bool]:
def get_and_update_success_history(test_name: str, info_details: Test) -> list[bool]:
mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv
prev_test_results = SingletonReportManager().INTERNAL_INFO_MANAGER.tests.get(
prev_test_results = SingletonReportManager().SUCCESS_RATE_TRACKER.tests.get(
test_name, []
)
if not mock:
if not mock and info_details.metrics.success is not None:
# only add if it's an actual test
prev_test_results.append(info_details["metrics"]["success"])
SingletonReportManager().INTERNAL_INFO_MANAGER.add_test(
prev_test_results.append(info_details.metrics.success)
SingletonReportManager().SUCCESS_RATE_TRACKER.update(
test_name, prev_test_results
)
# can calculate success rate regardless of mock
info_details["metrics"]["success_%"] = calculate_success_percentage(
info_details.metrics.success_percentage = calculate_success_percentage(
prev_test_results
)
@ -42,25 +40,22 @@ def get_previous_test_results(
def update_regression_tests(
prev_test_results: list[bool],
info_details: dict,
info_details: Test,
test_name: str,
test_details: dict,
) -> None:
if len(prev_test_results) >= 3 and prev_test_results[-3:] == [True, True, True]:
# if the last 3 tests were successful, add to the regression tests
info_details["is_regression"] = True
SingletonReportManager().REGRESSION_MANAGER.add_test(test_name, test_details)
info_details.is_regression = True
SingletonReportManager().REGRESSION_MANAGER.add_test(
test_name, info_details.dict(include={"difficulty", "data_path"})
)
def generate_single_call_report(
def initialize_test_report(
item: pytest.Item,
call: pytest.CallInfo,
challenge_data: ChallengeData,
answers: dict[str, Any],
challenge_location: str,
test_name: str,
) -> None:
difficulty = challenge_data.info.difficulty
challenge_info: ChallengeData,
):
difficulty = challenge_info.info.difficulty
if isinstance(difficulty, DifficultyLevel):
difficulty = difficulty.value
@ -70,60 +65,55 @@ def generate_single_call_report(
# test_name = item.nodeid.split("::")[1]
# item.test_name = test_name
test_details = {
"difficulty": difficulty,
"data_path": challenge_location,
}
info_details: Any = {
"data_path": challenge_location,
"is_regression": False,
"category": challenge_data.category,
"task": challenge_data.task,
"answer": challenge_data.ground.answer,
"description": challenge_data.info.description,
"metrics": {
"difficulty": difficulty,
"success": False,
"attempted": True,
},
# "answers": answers,
}
if answers:
info_details["answers"] = answers
if challenge_data.metadata:
info_details["metadata"] = challenge_data.metadata
mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv
if call:
if call.excinfo is None:
info_details["metrics"]["success"] = True
else:
if not mock: # don't remove if it's a mock test
SingletonReportManager().REGRESSION_MANAGER.remove_test(test_name)
info_details["metrics"]["fail_reason"] = str(call.excinfo.value)
if call.excinfo.typename == "Skipped":
info_details["metrics"]["attempted"] = False
prev_test_results: list[bool] = get_previous_test_results(test_name, info_details)
update_regression_tests(prev_test_results, info_details, test_name, test_details)
test_info = dict(item.user_properties).get("info_details") or Test(
data_path=str(challenge_info.spec_file),
is_regression=False,
category=[c.value for c in challenge_info.category],
task=challenge_info.task,
answer=challenge_info.ground.answer,
description=challenge_info.info.description,
metrics=Metrics(
difficulty=challenge_info.info.difficulty.value,
attempted=False,
),
)
# user facing reporting
if item:
item.info_details = info_details
item.user_properties.append(("info_details", test_info))
return info_details
return test_info
def finalize_reports(
config: AgentBenchmarkConfig, item: pytest.Item, challenge_data: ChallengeData
def finalize_test_report(
item: pytest.Item, call: pytest.CallInfo, config: AgentBenchmarkConfig
) -> None:
run_time = dict(item.user_properties).get("run_time")
user_properties: dict = dict(item.user_properties)
run_time = user_properties.get("run_time")
info_details = getattr(item, "info_details", {})
test_name = getattr(item, "test_name", "")
info_details: Test = user_properties.get("info_details", {})
test_name: str = user_properties.get("test_name", "")
mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv
if call:
logger.debug(f"Finalizing report with CallInfo: {vars(call)}")
if call.excinfo is None:
info_details.metrics.success = True
else:
if not mock: # don't remove if it's a mock test
SingletonReportManager().REGRESSION_MANAGER.remove_test(test_name)
info_details.metrics.fail_reason = str(call.excinfo.value)
if call.excinfo.typename == "Skipped":
info_details.metrics.attempted = False
info_details.metrics.attempted = True
info_details.metrics.run_time = f"{str(round(call.duration, 3))} seconds"
info_details.reached_cutoff = user_properties.get("timed_out", False)
prev_test_results: list[bool] = get_and_update_success_history(
test_name, info_details
)
update_regression_tests(prev_test_results, info_details, test_name)
if info_details and test_name:
if run_time is not None:
@ -133,42 +123,20 @@ def finalize_reports(
cost = get_data_from_helicone(test_name)
logger.debug(f"Cost: {cost}")
info_details["metrics"]["cost"] = cost
if info_details["metrics"].get("success", None) is None:
info_details["metrics"]["attempted"] = False
info_details["metrics"]["success"] = False
elif (
info_details["metrics"].get("success") is False
and "attempted" not in info_details["metrics"]
):
info_details["metrics"]["attempted"] = False
info_details["metrics"]["run_time"] = f"{str(round(run_time, 3))} seconds"
info_details["reached_cutoff"] = float(run_time) > challenge_data.cutoff
info_details.metrics.cost = cost
if "--mock" not in sys.argv:
update_challenges_already_beaten(
config.challenges_already_beaten_file, info_details, test_name
)
if info_details.get("tests") is not None:
for nested_test_name, nested_test_info in info_details[
"tests"
].items():
update_challenges_already_beaten(
config.challenges_already_beaten_file,
nested_test_info,
nested_test_name,
)
SingletonReportManager().INFO_MANAGER.add_test(test_name, info_details)
SingletonReportManager().INFO_MANAGER.add_test_report(test_name, info_details)
def update_challenges_already_beaten(
challenges_already_beaten_file: Path, info_details: Dict[str, Any], test_name: str
challenges_already_beaten_file: Path, info_details: Test, test_name: str
) -> None:
current_run_successful = info_details["metrics"]["success"]
current_run_successful = info_details.metrics.success
try:
with open(challenges_already_beaten_file, "r") as f:
challenge_data = json.load(f)
@ -184,9 +152,7 @@ def update_challenges_already_beaten(
json.dump(challenge_data, f, indent=4)
def session_finish(
agbenchmark_config: AgentBenchmarkConfig, suite_reports: dict
) -> None:
SingletonReportManager().INTERNAL_INFO_MANAGER.save()
SingletonReportManager().INFO_MANAGER.end_info_report(agbenchmark_config)
def session_finish(agbenchmark_config: AgentBenchmarkConfig) -> None:
SingletonReportManager().INFO_MANAGER.finalize_session_report(agbenchmark_config)
SingletonReportManager().REGRESSION_MANAGER.save()
SingletonReportManager().SUCCESS_RATE_TRACKER.save()

View File

@ -61,9 +61,6 @@ class Challenge(ABC):
async def test_method(
self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest
) -> None:
# skip optional categories
self.skip_optional_categories(config)
if os.environ.get("HELICONE_API_KEY"):
from helicone.lock import HeliconeLockManager
@ -269,16 +266,3 @@ class Challenge(ABC):
return 1
return None
@classmethod
def skip_optional_categories(cls, config: AgentBenchmarkConfig) -> None:
challenge_categories = set(c.value for c in cls.data.category)
challenge_optional_categories = challenge_categories & set(OPTIONAL_CATEGORIES)
if challenge_optional_categories and not (
config.categories
and set(challenge_optional_categories).issubset(set(config.categories))
):
pytest.skip(
f"Category {', '.join(challenge_optional_categories)} is optional, "
"and not explicitly selected in the benchmark config."
)

View File

@ -108,8 +108,8 @@ class ChallengeData(BaseModel):
task: str
dependencies: List[str]
cutoff: int
ground: Ground | Dict[str, Ground]
info: Info | Dict[str, Info]
ground: Ground
info: Info
metadata: Optional[Dict[str, Any]] = None
spec_file: Path | None = Field(None, exclude=True)

View File

@ -8,6 +8,7 @@ from typing import Any, Optional
from dotenv import load_dotenv
from agbenchmark.reports.processing.report_types import Test
from agbenchmark.utils.data_types import DIFFICULTY_MAP, DifficultyLevel
load_dotenv()
@ -63,41 +64,31 @@ def get_test_path(json_file: str | Path) -> str:
def get_highest_success_difficulty(
data: dict, just_string: Optional[bool] = None
data: dict[str, Test], just_string: Optional[bool] = None
) -> str:
highest_difficulty = None
highest_difficulty_level = 0
for test_name, test_data in data.items():
try:
if test_data.get("tests", None):
highest_difficulty_str = test_data["metrics"]["highest_difficulty"]
if test_data.metrics.success:
difficulty_str = test_data.metrics.difficulty
if not difficulty_str:
continue
try:
highest_difficulty = DifficultyLevel[highest_difficulty_str]
highest_difficulty_level = DIFFICULTY_MAP[highest_difficulty]
difficulty_enum = DifficultyLevel[difficulty_str.lower()]
difficulty_level = DIFFICULTY_MAP[difficulty_enum]
if difficulty_level > highest_difficulty_level:
highest_difficulty = difficulty_enum
highest_difficulty_level = difficulty_level
except KeyError:
logger.warning(
f"Unexpected difficulty level '{highest_difficulty_str}' "
f"Unexpected difficulty level '{difficulty_str}' "
f"in test '{test_name}'"
)
continue
else:
if test_data["metrics"]["success"]:
difficulty_str = test_data["metrics"]["difficulty"]
try:
difficulty_enum = DifficultyLevel[difficulty_str.lower()]
difficulty_level = DIFFICULTY_MAP[difficulty_enum]
if difficulty_level > highest_difficulty_level:
highest_difficulty = difficulty_enum
highest_difficulty_level = difficulty_level
except KeyError:
logger.warning(
f"Unexpected difficulty level '{difficulty_str}' "
f"in test '{test_name}'"
)
continue
except Exception as e:
logger.warning(
"An unexpected error [1] occurred while analyzing report [2]."