Fix file operations logger (#3489)

Co-authored-by: Reinier van der Leer <github@pwuts.nl>
pull/2988/head^2
Bob 2023-05-01 11:37:30 -04:00 committed by GitHub
parent 9c56b1beef
commit 94ec4a4ea5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 301 additions and 95 deletions

View File

@ -1,9 +1,10 @@
"""File operations for AutoGPT"""
from __future__ import annotations
import hashlib
import os
import os.path
from typing import Generator
from typing import Dict, Generator, Literal, Tuple
import requests
from colorama import Back, Fore
@ -17,31 +18,96 @@ from autogpt.utils import readable_file_size
CFG = Config()
Operation = Literal["write", "append", "delete"]
def check_duplicate_operation(operation: str, filename: str) -> bool:
"""Check if the operation has already been performed on the given file
Args:
operation (str): The operation to check for
filename (str): The name of the file to check for
def text_checksum(text: str) -> str:
"""Get the hex checksum for the given text."""
return hashlib.md5(text.encode("utf-8")).hexdigest()
def operations_from_log(log_path: str) -> Generator[Tuple[Operation, str, str | None]]:
"""Parse the file operations log and return a tuple containing the log entries"""
try:
log = open(log_path, "r", encoding="utf-8")
except FileNotFoundError:
return
for line in log:
line = line.replace("File Operation Logger", "").strip()
if not line:
continue
operation, tail = line.split(": ", maxsplit=1)
operation = operation.strip()
if operation in ("write", "append"):
try:
path, checksum = (x.strip() for x in tail.rsplit(" #", maxsplit=1))
except ValueError:
path, checksum = tail.strip(), None
yield (operation, path, checksum)
elif operation == "delete":
yield (operation, tail.strip(), None)
log.close()
def file_operations_state(log_path: str) -> Dict:
"""Iterates over the operations log and returns the expected state.
Parses a log file at CFG.file_logger_path to construct a dictionary that maps
each file path written or appended to its checksum. Deleted files are removed
from the dictionary.
Returns:
bool: True if the operation has already been performed on the file
A dictionary mapping file paths to their checksums.
Raises:
FileNotFoundError: If CFG.file_logger_path is not found.
ValueError: If the log file content is not in the expected format.
"""
log_content = read_file(CFG.file_logger_path)
log_entry = f"{operation}: {filename}\n"
return log_entry in log_content
state = {}
for operation, path, checksum in operations_from_log(log_path):
if operation in ("write", "append"):
state[path] = checksum
elif operation == "delete":
del state[path]
return state
def log_operation(operation: str, filename: str) -> None:
def is_duplicate_operation(
operation: Operation, filename: str, checksum: str | None = None
) -> bool:
"""Check if the operation has already been performed
Args:
operation: The operation to check for
filename: The name of the file to check for
checksum: The checksum of the contents to be written
Returns:
True if the operation has already been performed on the file
"""
state = file_operations_state(CFG.file_logger_path)
if operation == "delete" and filename not in state:
return True
if operation == "write" and state.get(filename) == checksum:
return True
return False
def log_operation(operation: str, filename: str, checksum: str | None = None) -> None:
"""Log the file operation to the file_logger.txt
Args:
operation (str): The operation to log
filename (str): The name of the file the operation was performed on
operation: The operation to log
filename: The name of the file the operation was performed on
checksum: The checksum of the contents to be written
"""
log_entry = f"{operation}: {filename}\n"
append_to_file(CFG.file_logger_path, log_entry, should_log=False)
log_entry = f"{operation}: {filename}"
if checksum is not None:
log_entry += f" #{checksum}"
logger.debug(f"Logging file operation: {log_entry}")
append_to_file(CFG.file_logger_path, f"{log_entry}\n", should_log=False)
def split_file(
@ -90,8 +156,8 @@ def read_file(filename: str) -> str:
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
return content
except Exception as e:
return f"Error: {str(e)}"
except Exception as err:
return f"Error: {err}"
def ingest_file(
@ -124,8 +190,8 @@ def ingest_file(
memory.add(memory_to_add)
logger.info(f"Done ingesting {num_chunks} chunks from {filename}.")
except Exception as e:
logger.info(f"Error while ingesting file '{filename}': {str(e)}")
except Exception as err:
logger.info(f"Error while ingesting file '{filename}': {err}")
@command("write_to_file", "Write to file", '"filename": "<filename>", "text": "<text>"')
@ -139,17 +205,18 @@ def write_to_file(filename: str, text: str) -> str:
Returns:
str: A message indicating success or failure
"""
if check_duplicate_operation("write", filename):
checksum = text_checksum(text)
if is_duplicate_operation("write", filename, checksum):
return "Error: File has already been updated."
try:
directory = os.path.dirname(filename)
os.makedirs(directory, exist_ok=True)
with open(filename, "w", encoding="utf-8") as f:
f.write(text)
log_operation("write", filename)
log_operation("write", filename, checksum)
return "File written to successfully."
except Exception as e:
return f"Error: {str(e)}"
except Exception as err:
return f"Error: {err}"
@command(
@ -169,15 +236,17 @@ def append_to_file(filename: str, text: str, should_log: bool = True) -> str:
try:
directory = os.path.dirname(filename)
os.makedirs(directory, exist_ok=True)
with open(filename, "a") as f:
with open(filename, "a", encoding="utf-8") as f:
f.write(text)
if should_log:
log_operation("append", filename)
with open(filename, "r", encoding="utf-8") as f:
checksum = text_checksum(f.read())
log_operation("append", filename, checksum=checksum)
return "Text appended successfully."
except Exception as e:
return f"Error: {str(e)}"
except Exception as err:
return f"Error: {err}"
@command("delete_file", "Delete file", '"filename": "<filename>"')
@ -190,14 +259,14 @@ def delete_file(filename: str) -> str:
Returns:
str: A message indicating success or failure
"""
if check_duplicate_operation("delete", filename):
if is_duplicate_operation("delete", filename):
return "Error: File has already been deleted."
try:
os.remove(filename)
log_operation("delete", filename)
return "File deleted successfully."
except Exception as e:
return f"Error: {str(e)}"
except Exception as err:
return f"Error: {err}"
@command("list_files", "List Files in Directory", '"directory": "<directory>"')
@ -266,7 +335,7 @@ def download_file(url, filename):
spinner.update_message(f"{message} {progress}")
return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(downloaded_size)})'
except requests.HTTPError as e:
return f"Got an HTTP Error whilst trying to download file: {e}"
except Exception as e:
return "Error: " + str(e)
except requests.HTTPError as err:
return f"Got an HTTP Error whilst trying to download file: {err}"
except Exception as err:
return f"Error: {err}"

View File

@ -2,25 +2,19 @@
This set of unit tests is designed to test the file operations that autoGPT has access to.
"""
import hashlib
import os
import re
from io import TextIOWrapper
from pathlib import Path
from tempfile import gettempdir
import pytest
from pytest_mock import MockerFixture
from autogpt.commands.file_operations import (
append_to_file,
check_duplicate_operation,
delete_file,
download_file,
list_files,
log_operation,
read_file,
split_file,
write_to_file,
)
import autogpt.commands.file_operations as file_ops
from autogpt.config import Config
from autogpt.utils import readable_file_size
from autogpt.workspace import Workspace
@pytest.fixture()
@ -29,66 +23,186 @@ def file_content():
@pytest.fixture()
def test_file(workspace, file_content):
test_file = str(workspace.get_path("test_file.txt"))
with open(test_file, "w") as f:
f.write(file_content)
return test_file
def test_file_path(config, workspace: Workspace):
return workspace.get_path("test_file.txt")
@pytest.fixture()
def test_directory(workspace):
return str(workspace.get_path("test_directory"))
def test_file(test_file_path: Path):
file = open(test_file_path, "w")
yield file
if not file.closed:
file.close()
@pytest.fixture()
def test_nested_file(workspace):
return str(workspace.get_path("nested/test_file.txt"))
def test_file_with_content_path(test_file: TextIOWrapper, file_content):
test_file.write(file_content)
test_file.close()
file_ops.log_operation(
"write", test_file.name, file_ops.text_checksum(file_content)
)
return Path(test_file.name)
def test_check_duplicate_operation(config, test_file):
log_operation("write", test_file)
assert check_duplicate_operation("write", test_file) is True
@pytest.fixture()
def test_directory(config, workspace: Workspace):
return workspace.get_path("test_directory")
@pytest.fixture()
def test_nested_file(config, workspace: Workspace):
return workspace.get_path("nested/test_file.txt")
def test_file_operations_log(test_file: TextIOWrapper):
log_file_content = (
"File Operation Logger\n"
"write: path/to/file1.txt #checksum1\n"
"write: path/to/file2.txt #checksum2\n"
"write: path/to/file3.txt #checksum3\n"
"append: path/to/file2.txt #checksum4\n"
"delete: path/to/file3.txt\n"
)
test_file.write(log_file_content)
test_file.close()
expected = [
("write", "path/to/file1.txt", "checksum1"),
("write", "path/to/file2.txt", "checksum2"),
("write", "path/to/file3.txt", "checksum3"),
("append", "path/to/file2.txt", "checksum4"),
("delete", "path/to/file3.txt", None),
]
assert list(file_ops.operations_from_log(test_file.name)) == expected
def test_file_operations_state(test_file: TextIOWrapper):
# Prepare a fake log file
log_file_content = (
"File Operation Logger\n"
"write: path/to/file1.txt #checksum1\n"
"write: path/to/file2.txt #checksum2\n"
"write: path/to/file3.txt #checksum3\n"
"append: path/to/file2.txt #checksum4\n"
"delete: path/to/file3.txt\n"
)
test_file.write(log_file_content)
test_file.close()
# Call the function and check the returned dictionary
expected_state = {
"path/to/file1.txt": "checksum1",
"path/to/file2.txt": "checksum4",
}
assert file_ops.file_operations_state(test_file.name) == expected_state
def test_is_duplicate_operation(config, mocker: MockerFixture):
# Prepare a fake state dictionary for the function to use
state = {
"path/to/file1.txt": "checksum1",
"path/to/file2.txt": "checksum2",
}
mocker.patch.object(file_ops, "file_operations_state", lambda _: state)
# Test cases with write operations
assert (
file_ops.is_duplicate_operation("write", "path/to/file1.txt", "checksum1")
is True
)
assert (
file_ops.is_duplicate_operation("write", "path/to/file1.txt", "checksum2")
is False
)
assert (
file_ops.is_duplicate_operation("write", "path/to/file3.txt", "checksum3")
is False
)
# Test cases with append operations
assert (
file_ops.is_duplicate_operation("append", "path/to/file1.txt", "checksum1")
is False
)
# Test cases with delete operations
assert file_ops.is_duplicate_operation("delete", "path/to/file1.txt") is False
assert file_ops.is_duplicate_operation("delete", "path/to/file3.txt") is True
# Test logging a file operation
def test_log_operation(test_file, config):
file_logger_name = config.file_logger_path
if os.path.exists(file_logger_name):
os.remove(file_logger_name)
log_operation("log_test", test_file)
with open(config.file_logger_path, "r") as f:
def test_log_operation(config: Config):
file_ops.log_operation("log_test", "path/to/test")
with open(config.file_logger_path, "r", encoding="utf-8") as f:
content = f.read()
assert f"log_test: {test_file}" in content
assert f"log_test: path/to/test\n" in content
def test_text_checksum(file_content: str):
checksum = file_ops.text_checksum(file_content)
different_checksum = file_ops.text_checksum("other content")
assert re.match(r"^[a-fA-F0-9]+$", checksum) is not None
assert checksum != different_checksum
def test_log_operation_with_checksum(config: Config):
file_ops.log_operation("log_test", "path/to/test", checksum="ABCDEF")
with open(config.file_logger_path, "r", encoding="utf-8") as f:
content = f.read()
assert f"log_test: path/to/test #ABCDEF\n" in content
# Test splitting a file into chunks
def test_split_file():
content = "abcdefghij"
chunks = list(split_file(content, max_length=4, overlap=1))
chunks = list(file_ops.split_file(content, max_length=4, overlap=1))
expected = ["abcd", "defg", "ghij"]
assert chunks == expected
def test_read_file(test_file, file_content):
content = read_file(test_file)
def test_read_file(test_file_with_content_path: Path, file_content):
content = file_ops.read_file(test_file_with_content_path)
assert content == file_content
def test_write_to_file(config, test_nested_file):
def test_write_to_file(test_file_path: Path):
new_content = "This is new content.\n"
write_to_file(test_nested_file, new_content)
with open(test_nested_file, "r") as f:
file_ops.write_to_file(str(test_file_path), new_content)
with open(test_file_path, "r", encoding="utf-8") as f:
content = f.read()
assert content == new_content
def test_append_to_file(test_nested_file):
append_text = "This is appended text.\n"
write_to_file(test_nested_file, append_text)
def test_write_file_logs_checksum(config: Config, test_file_path: Path):
new_content = "This is new content.\n"
new_checksum = file_ops.text_checksum(new_content)
file_ops.write_to_file(str(test_file_path), new_content)
with open(config.file_logger_path, "r", encoding="utf-8") as f:
log_entry = f.read()
assert log_entry == f"write: {test_file_path} #{new_checksum}\n"
append_to_file(test_nested_file, append_text)
def test_write_file_fails_if_content_exists(test_file_path: Path):
new_content = "This is new content.\n"
file_ops.log_operation(
"write",
str(test_file_path),
checksum=file_ops.text_checksum(new_content),
)
result = file_ops.write_to_file(str(test_file_path), new_content)
assert result == "Error: File has already been updated."
def test_write_file_succeeds_if_content_different(test_file_with_content_path: Path):
new_content = "This is different content.\n"
result = file_ops.write_to_file(str(test_file_with_content_path), new_content)
assert result == "File written to successfully."
def test_append_to_file(test_nested_file: Path):
append_text = "This is appended text.\n"
file_ops.write_to_file(test_nested_file, append_text)
file_ops.append_to_file(test_nested_file, append_text)
with open(test_nested_file, "r") as f:
content_after = f.read()
@ -96,24 +210,45 @@ def test_append_to_file(test_nested_file):
assert content_after == append_text + append_text
def test_delete_file(config, test_file):
delete_file(test_file)
assert os.path.exists(test_file) is False
assert delete_file(test_file) == "Error: File has already been deleted."
def test_append_to_file_uses_checksum_from_appended_file(
config: Config, test_file_path: Path
):
append_text = "This is appended text.\n"
file_ops.append_to_file(test_file_path, append_text)
file_ops.append_to_file(test_file_path, append_text)
with open(config.file_logger_path, "r", encoding="utf-8") as f:
log_contents = f.read()
digest = hashlib.md5()
digest.update(append_text.encode("utf-8"))
checksum1 = digest.hexdigest()
digest.update(append_text.encode("utf-8"))
checksum2 = digest.hexdigest()
assert log_contents == (
f"append: {test_file_path} #{checksum1}\n"
f"append: {test_file_path} #{checksum2}\n"
)
def test_delete_missing_file(test_file):
os.remove(test_file)
def test_delete_file(test_file_with_content_path: Path):
result = file_ops.delete_file(str(test_file_with_content_path))
assert result == "File deleted successfully."
assert os.path.exists(test_file_with_content_path) is False
def test_delete_missing_file(config):
filename = "path/to/file/which/does/not/exist"
# confuse the log
file_ops.log_operation("write", filename, checksum="fake")
try:
os.remove(test_file)
except FileNotFoundError as e:
error_string = str(e)
assert error_string in delete_file(test_file)
os.remove(filename)
except FileNotFoundError as err:
assert str(err) in file_ops.delete_file(filename)
return
assert True, "Failed to test delete_file"
assert False, f"Failed to test delete_file; {filename} not expected to exist"
def test_list_files(config, workspace, test_directory):
def test_list_files(workspace: Workspace, test_directory: Path):
# Case 1: Create files A and B, search for A, and ensure we don't return A and B
file_a = workspace.get_path("file_a.txt")
file_b = workspace.get_path("file_b.txt")
@ -131,7 +266,7 @@ def test_list_files(config, workspace, test_directory):
with open(os.path.join(test_directory, file_a.name), "w") as f:
f.write("This is file A in the subdirectory.")
files = list_files(str(workspace.root))
files = file_ops.list_files(str(workspace.root))
assert file_a.name in files
assert file_b.name in files
assert os.path.join(Path(test_directory).name, file_a.name) in files
@ -144,26 +279,28 @@ def test_list_files(config, workspace, test_directory):
# Case 2: Search for a file that does not exist and make sure we don't throw
non_existent_file = "non_existent_file.txt"
files = list_files("")
files = file_ops.list_files("")
assert non_existent_file not in files
def test_download_file():
def test_download_file(config, workspace: Workspace):
url = "https://github.com/Significant-Gravitas/Auto-GPT/archive/refs/tags/v0.2.2.tar.gz"
local_name = os.path.join(gettempdir(), "auto-gpt.tar.gz")
local_name = workspace.get_path("auto-gpt.tar.gz")
size = 365023
readable_size = readable_file_size(size)
assert (
download_file(url, local_name)
file_ops.download_file(url, local_name)
== f'Successfully downloaded and locally stored file: "{local_name}"! (Size: {readable_size})'
)
assert os.path.isfile(local_name) is True
assert os.path.getsize(local_name) == size
url = "https://github.com/Significant-Gravitas/Auto-GPT/archive/refs/tags/v0.0.0.tar.gz"
assert "Got an HTTP Error whilst trying to download file" in download_file(
assert "Got an HTTP Error whilst trying to download file" in file_ops.download_file(
url, local_name
)
url = "https://thiswebsiteiswrong.hmm/v0.0.0.tar.gz"
assert "Failed to establish a new connection:" in download_file(url, local_name)
assert "Failed to establish a new connection:" in file_ops.download_file(
url, local_name
)