AutoGPT/benchmark/agbenchmark/agent_interface.py

134 lines
3.8 KiB
Python

import os
import platform
import queue
import select
import shutil
import subprocess
import sys
import time
from threading import Thread
from typing import Any, List
import psutil
from dotenv import load_dotenv
import agbenchmark.start_benchmark
load_dotenv()
helicone_graphql_logs = os.getenv("HELICONE_GRAPHQL_LOGS")
HELICONE_GRAPHQL_LOGS = (
helicone_graphql_logs.lower() == "true" if helicone_graphql_logs else False
)
def run_linux_env(process: Any, start_time: float, timeout: float) -> None:
while True:
try:
# This checks if there's data to be read from stdout without blocking.
if process.stdout and select.select([process.stdout], [], [], 0)[0]:
output = process.stdout.readline()
print(output.strip())
except Exception as e:
continue
# Check if process has ended, has no more output, or exceeded timeout
if process.poll() is not None or (time.time() - start_time > timeout):
break
if time.time() - start_time > timeout:
print("The Python function has exceeded the time limit and was terminated.")
parent = psutil.Process(process.pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
else:
print("The Python function has finished running.")
def enqueue_output(out: Any, my_queue: Any) -> None:
for line in iter(out.readline, b""):
my_queue.put(line)
out.close()
def run_windows_env(process: Any, start_time: float, timeout: float) -> None:
my_queue: Any = queue.Queue()
thread = Thread(target=enqueue_output, args=(process.stdout, my_queue))
thread.daemon = True
thread.start()
while True:
try:
output = my_queue.get_nowait().strip()
print(output)
except queue.Empty:
pass
if process.poll() is not None or (time.time() - start_time > timeout):
break
if time.time() - start_time > timeout:
print("The Python function has exceeded the time limit and was terminated.")
process.terminate()
def run_agent(task: str, timeout: int) -> None:
"""Calling to get a response"""
entry_path = "agbenchmark.benchmarks"
print(f"Running '{entry_path}' with timeout {timeout}")
command = [sys.executable, "-m", entry_path, str(task)]
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
cwd=agbenchmark.start_benchmark.HOME_DIRECTORY,
bufsize=1,
)
start_time = time.time()
if platform.system() == "Windows":
run_windows_env(process, start_time, timeout)
else:
run_linux_env(process, start_time, timeout)
process.wait()
if process.returncode != 0:
print(f"The agent timed out")
def get_list_of_file_paths(
challenge_dir_path: str, artifact_folder_name: str
) -> List[str]:
# this file is at agbenchmark\agent_interface.py
source_dir = os.path.join(
agbenchmark.start_benchmark.CURRENT_DIRECTORY,
"..",
challenge_dir_path,
artifact_folder_name,
)
if not os.path.exists(source_dir):
return []
return [os.path.join(source_dir, file_name) for file_name in os.listdir(source_dir)]
def copy_artifacts_into_workspace(
workspace: str | dict[str, str], artifact_folder_name: str, challenge_dir_path: str
) -> None:
if isinstance(workspace, dict):
if artifact_folder_name == "artifacts_in":
workspace = workspace["input"]
else:
workspace = workspace["output"]
file_paths = get_list_of_file_paths(challenge_dir_path, artifact_folder_name)
for file_path in file_paths:
if os.path.isfile(file_path):
shutil.copy(file_path, workspace)