Revert "Revert "Merge branch 'master' into stable""

This reverts commit 999990b614.
pull/3683/head
Reinier van der Leer 2023-05-02 13:26:30 +02:00
parent 91537b0496
commit 3a80e2f399
No known key found for this signature in database
GPG Key ID: 64035FE419545762
45 changed files with 1601 additions and 346 deletions

View File

@ -188,3 +188,10 @@ OPENAI_API_KEY=your-openai-api-key
# TW_CONSUMER_SECRET=
# TW_ACCESS_TOKEN=
# TW_ACCESS_TOKEN_SECRET=
################################################################################
### ALLOWLISTED PLUGINS
################################################################################
#ALLOWLISTED_PLUGINS - Sets the listed plugins that are allowed (Example: plugin1,plugin2,plugin3)
ALLOWLISTED_PLUGINS=

View File

@ -26,3 +26,23 @@ jobs:
repoToken: "${{ secrets.GITHUB_TOKEN }}"
commentOnDirty: "This pull request has conflicts with the base branch, please resolve those so we can evaluate the pull request."
commentOnClean: "Conflicts have been resolved! 🎉 A maintainer will review the pull request shortly."
size:
if: ${{ github.event_name == 'pull_request_target' }}
permissions:
issues: write
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: codelytv/pr-size-labeler@v1.7.0
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
s_label: "size/s"
s_max_size: "10"
m_label: "size/m"
m_max_size: "50"
l_label: "size/l"
l_max_size: "200"
xl_label: "size/xl"
fail_if_xl: "false"
github_api_url: "api.github.com"

2
.gitignore vendored
View File

@ -157,5 +157,7 @@ vicuna-*
# mac
.DS_Store
openai/
# news
CURRENT_BULLETIN.md

10
.isort.cfg Normal file
View File

@ -0,0 +1,10 @@
[settings]
profile = black
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
ensure_newline_before_comments = true
line_length = 88
sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
skip = .tox,__pycache__,*.pyc,venv*/*,reports,venv,env,node_modules,.env,.venv,dist

File diff suppressed because one or more lines are too long

View File

@ -19,18 +19,25 @@ class Agent:
memory: The memory object to use.
full_message_history: The full message history.
next_action_count: The number of actions to execute.
system_prompt: The system prompt is the initial prompt that defines everything the AI needs to know to achieve its task successfully.
Currently, the dynamic and customizable information in the system prompt are ai_name, description and goals.
system_prompt: The system prompt is the initial prompt that defines everything
the AI needs to know to achieve its task successfully.
Currently, the dynamic and customizable information in the system prompt are
ai_name, description and goals.
triggering_prompt: The last sentence the AI will see before answering. For Auto-GPT, this prompt is:
Determine which next command to use, and respond using the format specified above:
The triggering prompt is not part of the system prompt because between the system prompt and the triggering
prompt we have contextual information that can distract the AI and make it forget that its goal is to find the next task to achieve.
triggering_prompt: The last sentence the AI will see before answering.
For Auto-GPT, this prompt is:
Determine which next command to use, and respond using the format specified
above:
The triggering prompt is not part of the system prompt because between the
system prompt and the triggering
prompt we have contextual information that can distract the AI and make it
forget that its goal is to find the next task to achieve.
SYSTEM PROMPT
CONTEXTUAL INFORMATION (memory, previous conversations, anything relevant)
TRIGGERING PROMPT
The triggering prompt reminds the AI about its short term meta task (defining the next task)
The triggering prompt reminds the AI about its short term meta task
(defining the next task)
"""
def __init__(
@ -39,6 +46,8 @@ class Agent:
memory,
full_message_history,
next_action_count,
command_registry,
config,
system_prompt,
triggering_prompt,
):
@ -46,6 +55,8 @@ class Agent:
self.memory = memory
self.full_message_history = full_message_history
self.next_action_count = next_action_count
self.command_registry = command_registry
self.config = config
self.system_prompt = system_prompt
self.triggering_prompt = triggering_prompt
@ -73,6 +84,7 @@ class Agent:
# Send message to AI, get response
with Spinner("Thinking... "):
assistant_reply = chat_with_ai(
self,
self.system_prompt,
self.triggering_prompt,
self.full_message_history,
@ -81,6 +93,10 @@ class Agent:
) # TODO: This hardcodes the model to use GPT3.5. Make this an argument
assistant_reply_json = fix_json_using_multiple_techniques(assistant_reply)
for plugin in cfg.plugins:
if not plugin.can_handle_post_planning():
continue
assistant_reply_json = plugin.post_planning(self, assistant_reply_json)
# Print Assistant thoughts
if assistant_reply_json != {}:
@ -89,14 +105,13 @@ class Agent:
try:
print_assistant_thoughts(self.ai_name, assistant_reply_json)
command_name, arguments = get_command(assistant_reply_json)
# command_name, arguments = assistant_reply_json_valid["command"]["name"], assistant_reply_json_valid["command"]["args"]
if cfg.speak_mode:
say_text(f"I want to execute {command_name}")
except Exception as e:
logger.error("Error: \n", str(e))
if not cfg.continuous_mode and self.next_action_count == 0:
### GET USER AUTHORIZATION TO EXECUTE COMMAND ###
# ### GET USER AUTHORIZATION TO EXECUTE COMMAND ###
# Get key press: Prompt the user to press enter to continue or escape
# to exit
logger.typewriter_log(
@ -168,30 +183,46 @@ class Agent:
elif command_name == "human_feedback":
result = f"Human feedback: {user_input}"
else:
result = (
f"Command {command_name} returned: "
f"{execute_command(command_name, arguments)}"
for plugin in cfg.plugins:
if not plugin.can_handle_pre_command():
continue
command_name, arguments = plugin.pre_command(
command_name, arguments
)
command_result = execute_command(
self.command_registry,
command_name,
arguments,
self.config.prompt_generator,
)
result = f"Command {command_name} returned: " f"{command_result}"
for plugin in cfg.plugins:
if not plugin.can_handle_post_command():
continue
result = plugin.post_command(command_name, result)
if self.next_action_count > 0:
self.next_action_count -= 1
memory_to_add = (
f"Assistant Reply: {assistant_reply} "
f"\nResult: {result} "
f"\nHuman Feedback: {user_input} "
)
self.memory.add(memory_to_add)
# Check if there's a result from the command append it to the message
# history
if result is not None:
self.full_message_history.append(create_chat_message("system", result))
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result)
else:
self.full_message_history.append(
create_chat_message("system", "Unable to execute command")
)
logger.typewriter_log(
"SYSTEM: ", Fore.YELLOW, "Unable to execute command"
if command_name != "do_nothing":
memory_to_add = (
f"Assistant Reply: {assistant_reply} "
f"\nResult: {result} "
f"\nHuman Feedback: {user_input} "
)
self.memory.add(memory_to_add)
# Check if there's a result from the command append it to the message
# history
if result is not None:
self.full_message_history.append(
create_chat_message("system", result)
)
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result)
else:
self.full_message_history.append(
create_chat_message("system", "Unable to execute command")
)
logger.typewriter_log(
"SYSTEM: ", Fore.YELLOW, "Unable to execute command"
)

View File

@ -1,10 +1,11 @@
"""Agent manager for managing GPT agents"""
from __future__ import annotations
from typing import Union
from typing import List, Union
from autogpt.config.config import Singleton
from autogpt.config.config import Config, Singleton
from autogpt.llm_utils import create_chat_completion
from autogpt.types.openai import Message
class AgentManager(metaclass=Singleton):
@ -13,6 +14,7 @@ class AgentManager(metaclass=Singleton):
def __init__(self):
self.next_key = 0
self.agents = {} # key, (task, full_message_history, model)
self.cfg = Config()
# Create new GPT agent
# TODO: Centralise use of create_chat_completion() to globally enforce token limit
@ -28,19 +30,32 @@ class AgentManager(metaclass=Singleton):
Returns:
The key of the new agent
"""
messages = [
messages: List[Message] = [
{"role": "user", "content": prompt},
]
for plugin in self.cfg.plugins:
if not plugin.can_handle_pre_instruction():
continue
if plugin_messages := plugin.pre_instruction(messages):
messages.extend(iter(plugin_messages))
# Start GPT instance
agent_reply = create_chat_completion(
model=model,
messages=messages,
)
# Update full message history
messages.append({"role": "assistant", "content": agent_reply})
plugins_reply = ""
for i, plugin in enumerate(self.cfg.plugins):
if not plugin.can_handle_on_instruction():
continue
if plugin_result := plugin.on_instruction(messages):
sep = "\n" if i else ""
plugins_reply = f"{plugins_reply}{sep}{plugin_result}"
if plugins_reply and plugins_reply != "":
messages.append({"role": "assistant", "content": plugins_reply})
key = self.next_key
# This is done instead of len(agents) to make keys unique even if agents
# are deleted
@ -48,6 +63,11 @@ class AgentManager(metaclass=Singleton):
self.agents[key] = (task, messages, model)
for plugin in self.cfg.plugins:
if not plugin.can_handle_post_instruction():
continue
agent_reply = plugin.post_instruction(agent_reply)
return key, agent_reply
def message_agent(self, key: str | int, message: str) -> str:
@ -65,15 +85,37 @@ class AgentManager(metaclass=Singleton):
# Add user message to message history before sending to agent
messages.append({"role": "user", "content": message})
for plugin in self.cfg.plugins:
if not plugin.can_handle_pre_instruction():
continue
if plugin_messages := plugin.pre_instruction(messages):
for plugin_message in plugin_messages:
messages.append(plugin_message)
# Start GPT instance
agent_reply = create_chat_completion(
model=model,
messages=messages,
)
# Update full message history
messages.append({"role": "assistant", "content": agent_reply})
plugins_reply = agent_reply
for i, plugin in enumerate(self.cfg.plugins):
if not plugin.can_handle_on_instruction():
continue
if plugin_result := plugin.on_instruction(messages):
sep = "\n" if i else ""
plugins_reply = f"{plugins_reply}{sep}{plugin_result}"
# Update full message history
if plugins_reply and plugins_reply != "":
messages.append({"role": "assistant", "content": plugins_reply})
for plugin in self.cfg.plugins:
if not plugin.can_handle_post_instruction():
continue
agent_reply = plugin.post_instruction(agent_reply)
return agent_reply
def list_agents(self) -> list[tuple[str | int, str]]:
@ -86,7 +128,7 @@ class AgentManager(metaclass=Singleton):
# Return a list of agent keys and their tasks
return [(key, task) for key, (task, _, _) in self.agents.items()]
def delete_agent(self, key: Union[str, int]) -> bool:
def delete_agent(self, key: str | int) -> bool:
"""Delete an agent from the agent manager
Args:

View File

@ -3,33 +3,12 @@ import json
from typing import Dict, List, NoReturn, Union
from autogpt.agent.agent_manager import AgentManager
from autogpt.commands.analyze_code import analyze_code
from autogpt.commands.audio_text import read_audio_from_file
from autogpt.commands.execute_code import (
execute_python_file,
execute_shell,
execute_shell_popen,
)
from autogpt.commands.file_operations import (
append_to_file,
delete_file,
download_file,
read_file,
search_files,
write_to_file,
)
from autogpt.commands.git_operations import clone_repository
from autogpt.commands.google_search import google_official_search, google_search
from autogpt.commands.image_gen import generate_image
from autogpt.commands.improve_code import improve_code
from autogpt.commands.twitter import send_tweet
from autogpt.commands.command import CommandRegistry, command
from autogpt.commands.web_requests import scrape_links, scrape_text
from autogpt.commands.web_selenium import browse_website
from autogpt.commands.write_tests import write_tests
from autogpt.config import Config
from autogpt.json_utils.json_fix_llm import fix_and_parse_json
from autogpt.memory import get_memory
from autogpt.processing.text import summarize_text
from autogpt.prompts.generator import PromptGenerator
from autogpt.speech import say_text
CFG = Config()
@ -108,7 +87,12 @@ def map_command_synonyms(command_name: str):
return command_name
def execute_command(command_name: str, arguments):
def execute_command(
command_registry: CommandRegistry,
command_name: str,
arguments,
prompt: PromptGenerator,
):
"""Execute the command and return the result
Args:
@ -119,105 +103,29 @@ def execute_command(command_name: str, arguments):
str: The result of the command
"""
try:
cmd = command_registry.commands.get(command_name)
# If the command is found, call it with the provided arguments
if cmd:
return cmd(**arguments)
# TODO: Remove commands below after they are moved to the command registry.
command_name = map_command_synonyms(command_name.lower())
if command_name == "google":
# Check if the Google API key is set and use the official search method
# If the API key is not set or has only whitespaces, use the unofficial
# search method
key = CFG.google_api_key
if key and key.strip() and key != "your-google-api-key":
google_result = google_official_search(arguments["input"])
return google_result
else:
google_result = google_search(arguments["input"])
# google_result can be a list or a string depending on the search results
if isinstance(google_result, list):
safe_message = [
google_result_single.encode("utf-8", "ignore")
for google_result_single in google_result
]
else:
safe_message = google_result.encode("utf-8", "ignore")
if command_name == "memory_add":
return get_memory(CFG).add(arguments["string"])
return safe_message.decode("utf-8")
elif command_name == "memory_add":
memory = get_memory(CFG)
return memory.add(arguments["string"])
elif command_name == "start_agent":
return start_agent(
arguments["name"], arguments["task"], arguments["prompt"]
)
elif command_name == "message_agent":
return message_agent(arguments["key"], arguments["message"])
elif command_name == "list_agents":
return list_agents()
elif command_name == "delete_agent":
return delete_agent(arguments["key"])
elif command_name == "get_text_summary":
return get_text_summary(arguments["url"], arguments["question"])
elif command_name == "get_hyperlinks":
return get_hyperlinks(arguments["url"])
elif command_name == "clone_repository":
return clone_repository(
arguments["repository_url"], arguments["clone_path"]
)
elif command_name == "read_file":
return read_file(arguments["file"])
elif command_name == "write_to_file":
return write_to_file(arguments["file"], arguments["text"])
elif command_name == "append_to_file":
return append_to_file(arguments["file"], arguments["text"])
elif command_name == "delete_file":
return delete_file(arguments["file"])
elif command_name == "search_files":
return search_files(arguments["directory"])
elif command_name == "download_file":
if not CFG.allow_downloads:
return "Error: You do not have user authorization to download files locally."
return download_file(arguments["url"], arguments["file"])
elif command_name == "browse_website":
return browse_website(arguments["url"], arguments["question"])
# TODO: Change these to take in a file rather than pasted code, if
# non-file is given, return instructions "Input should be a python
# filepath, write your code to file and try again"
elif command_name == "analyze_code":
return analyze_code(arguments["code"])
elif command_name == "improve_code":
return improve_code(arguments["suggestions"], arguments["code"])
elif command_name == "write_tests":
return write_tests(arguments["code"], arguments.get("focus"))
elif command_name == "execute_python_file": # Add this command
return execute_python_file(arguments["file"])
elif command_name == "execute_shell":
if CFG.execute_local_commands:
return execute_shell(arguments["command_line"])
else:
return (
"You are not allowed to run local shell commands. To execute"
" shell commands, EXECUTE_LOCAL_COMMANDS must be set to 'True' "
"in your config. Do not attempt to bypass the restriction."
)
elif command_name == "execute_shell_popen":
if CFG.execute_local_commands:
return execute_shell_popen(arguments["command_line"])
else:
return (
"You are not allowed to run local shell commands. To execute"
" shell commands, EXECUTE_LOCAL_COMMANDS must be set to 'True' "
"in your config. Do not attempt to bypass the restriction."
)
elif command_name == "read_audio_from_file":
return read_audio_from_file(arguments["file"])
elif command_name == "generate_image":
return generate_image(arguments["prompt"])
elif command_name == "send_tweet":
return send_tweet(arguments["text"])
# filepath, write your code to file and try again
elif command_name == "do_nothing":
return "No action performed."
elif command_name == "task_complete":
shutdown()
else:
for command in prompt.commands:
if command_name == command["label"] or command_name == command["name"]:
return command["function"](*arguments.values())
return (
f"Unknown command '{command_name}'. Please refer to the 'COMMANDS'"
" list for available commands and only respond in the specified JSON"
@ -227,6 +135,9 @@ def execute_command(command_name: str, arguments):
return f"Error: {str(e)}"
@command(
"get_text_summary", "Get text summary", '"url": "<url>", "question": "<question>"'
)
def get_text_summary(url: str, question: str) -> str:
"""Return the results of a Google search
@ -242,6 +153,7 @@ def get_text_summary(url: str, question: str) -> str:
return f""" "Result" : {summary}"""
@command("get_hyperlinks", "Get text summary", '"url": "<url>"')
def get_hyperlinks(url: str) -> Union[str, List[str]]:
"""Return the results of a Google search
@ -260,6 +172,11 @@ def shutdown() -> NoReturn:
quit()
@command(
"start_agent",
"Start GPT Agent",
'"name": "<name>", "task": "<short_task_desc>", "prompt": "<prompt>"',
)
def start_agent(name: str, task: str, prompt: str, model=CFG.fast_llm_model) -> str:
"""Start an agent with a given name, task, and prompt
@ -292,6 +209,7 @@ def start_agent(name: str, task: str, prompt: str, model=CFG.fast_llm_model) ->
return f"Agent {name} created with key {key}. First response: {agent_response}"
@command("message_agent", "Message GPT Agent", '"key": "<key>", "message": "<message>"')
def message_agent(key: str, message: str) -> str:
"""Message an agent with a given key and message"""
# Check if the key is a valid integer
@ -306,7 +224,8 @@ def message_agent(key: str, message: str) -> str:
return agent_response
def list_agents():
@command("list_agents", "List GPT Agents", "")
def list_agents() -> str:
"""List all agents
Returns:
@ -317,6 +236,7 @@ def list_agents():
)
@command("delete_agent", "Delete GPT Agent", '"key": "<key>"')
def delete_agent(key: str) -> str:
"""Delete an agent with a given key

View File

@ -6,11 +6,12 @@ from autogpt import token_counter
from autogpt.config import Config
from autogpt.llm_utils import create_chat_completion
from autogpt.logs import logger
from autogpt.types.openai import Message
cfg = Config()
def create_chat_message(role, content):
def create_chat_message(role, content) -> Message:
"""
Create a chat message with the given role and content.
@ -51,7 +52,7 @@ def generate_context(prompt, relevant_memory, full_message_history, model):
# TODO: Change debug from hardcode to argument
def chat_with_ai(
prompt, user_input, full_message_history, permanent_memory, token_limit
agent, prompt, user_input, full_message_history, permanent_memory, token_limit
):
"""Interact with the OpenAI API, sending the prompt, user input, message history,
and permanent memory."""
@ -135,6 +136,25 @@ def chat_with_ai(
# Append user input, the length of this is accounted for above
current_context.extend([create_chat_message("user", user_input)])
plugin_count = len(cfg.plugins)
for i, plugin in enumerate(cfg.plugins):
if not plugin.can_handle_on_planning():
continue
plugin_response = plugin.on_planning(
agent.prompt_generator, current_context
)
if not plugin_response or plugin_response == "":
continue
tokens_to_add = token_counter.count_message_tokens(
[create_chat_message("system", plugin_response)], model
)
if current_tokens_used + tokens_to_add > send_token_limit:
if cfg.debug_mode:
print("Plugin response too long, skipping:", plugin_response)
print("Plugins remaining at stop:", plugin_count - i)
break
current_context.append(create_chat_message("system", plugin_response))
# Calculate remaining tokens
tokens_remaining = token_limit - current_tokens_used
# assert tokens_remaining >= 0, "Tokens remaining is negative.

View File

@ -75,11 +75,13 @@ def main(
from colorama import Fore
from autogpt.agent.agent import Agent
from autogpt.commands.command import CommandRegistry
from autogpt.config import Config, check_openai_api_key
from autogpt.configurator import create_config
from autogpt.logs import logger
from autogpt.memory import get_memory
from autogpt.prompt import construct_prompt
from autogpt.plugins import scan_plugins
from autogpt.prompts.prompt import construct_main_ai_config
from autogpt.utils import get_current_git_branch, get_latest_bulletin
if ctx.invoked_subcommand is None:
@ -123,7 +125,26 @@ def main(
"parts of Auto-GPT with this version. "
"Please consider upgrading to Python 3.10 or higher.",
)
system_prompt = construct_prompt()
cfg = Config()
cfg.set_plugins(scan_plugins(cfg, cfg.debug_mode))
# Create a CommandRegistry instance and scan default folder
command_registry = CommandRegistry()
command_registry.import_commands("autogpt.commands.analyze_code")
command_registry.import_commands("autogpt.commands.audio_text")
command_registry.import_commands("autogpt.commands.execute_code")
command_registry.import_commands("autogpt.commands.file_operations")
command_registry.import_commands("autogpt.commands.git_operations")
command_registry.import_commands("autogpt.commands.google_search")
command_registry.import_commands("autogpt.commands.image_gen")
command_registry.import_commands("autogpt.commands.improve_code")
command_registry.import_commands("autogpt.commands.twitter")
command_registry.import_commands("autogpt.commands.web_selenium")
command_registry.import_commands("autogpt.commands.write_tests")
command_registry.import_commands("autogpt.app")
ai_name = ""
ai_config = construct_main_ai_config()
ai_config.command_registry = command_registry
# print(prompt)
# Initialize variables
full_message_history = []
@ -140,11 +161,16 @@ def main(
"Using memory of type:", Fore.GREEN, f"{memory.__class__.__name__}"
)
logger.typewriter_log("Using Browser:", Fore.GREEN, cfg.selenium_web_browser)
system_prompt = ai_config.construct_full_prompt()
if cfg.debug_mode:
logger.typewriter_log("Prompt:", Fore.GREEN, system_prompt)
agent = Agent(
ai_name=ai_name,
memory=memory,
full_message_history=full_message_history,
next_action_count=next_action_count,
command_registry=command_registry,
config=ai_config,
system_prompt=system_prompt,
triggering_prompt=triggering_prompt,
)

View File

@ -1,9 +1,15 @@
"""Code evaluation module."""
from __future__ import annotations
from autogpt.commands.command import command
from autogpt.llm_utils import call_ai_function
@command(
"analyze_code",
"Analyze Code",
'"code": "<full_code_string>"',
)
def analyze_code(code: str) -> list[str]:
"""
A function that takes in a string and returns a response from create chat

View File

@ -1,24 +1,51 @@
"""Commands for converting audio to text."""
import json
import requests
from autogpt.commands.command import command
from autogpt.config import Config
from autogpt.workspace import path_in_workspace
cfg = Config()
CFG = Config()
def read_audio_from_file(audio_path):
audio_path = path_in_workspace(audio_path)
@command(
"read_audio_from_file",
"Convert Audio to text",
'"filename": "<filename>"',
CFG.huggingface_audio_to_text_model,
"Configure huggingface_audio_to_text_model.",
)
def read_audio_from_file(filename: str) -> str:
"""
Convert audio to text.
Args:
audio_path (str): The path to the audio file
Returns:
str: The text from the audio
"""
audio_path = path_in_workspace(filename)
with open(audio_path, "rb") as audio_file:
audio = audio_file.read()
return read_audio(audio)
def read_audio(audio):
model = cfg.huggingface_audio_to_text_model
def read_audio(audio: bytes) -> str:
"""
Convert audio to text.
Args:
audio (bytes): The audio to convert
Returns:
str: The text from the audio
"""
model = CFG.huggingface_audio_to_text_model
api_url = f"https://api-inference.huggingface.co/models/{model}"
api_token = cfg.huggingface_api_token
api_token = CFG.huggingface_api_token
headers = {"Authorization": f"Bearer {api_token}"}
if api_token is None:
@ -33,4 +60,4 @@ def read_audio(audio):
)
text = json.loads(response.content.decode("utf-8"))["text"]
return "The audio says: " + text
return f"The audio says: {text}"

153
autogpt/commands/command.py Normal file
View File

@ -0,0 +1,153 @@
import importlib
import inspect
from typing import Any, Callable, Optional
# Unique identifier for auto-gpt commands
AUTO_GPT_COMMAND_IDENTIFIER = "auto_gpt_command"
class Command:
"""A class representing a command.
Attributes:
name (str): The name of the command.
description (str): A brief description of what the command does.
signature (str): The signature of the function that the command executes. Defaults to None.
"""
def __init__(
self,
name: str,
description: str,
method: Callable[..., Any],
signature: str = "",
enabled: bool = True,
disabled_reason: Optional[str] = None,
):
self.name = name
self.description = description
self.method = method
self.signature = signature if signature else str(inspect.signature(self.method))
self.enabled = enabled
self.disabled_reason = disabled_reason
def __call__(self, *args, **kwargs) -> Any:
if not self.enabled:
return f"Command '{self.name}' is disabled: {self.disabled_reason}"
return self.method(*args, **kwargs)
def __str__(self) -> str:
return f"{self.name}: {self.description}, args: {self.signature}"
class CommandRegistry:
"""
The CommandRegistry class is a manager for a collection of Command objects.
It allows the registration, modification, and retrieval of Command objects,
as well as the scanning and loading of command plugins from a specified
directory.
"""
def __init__(self):
self.commands = {}
def _import_module(self, module_name: str) -> Any:
return importlib.import_module(module_name)
def _reload_module(self, module: Any) -> Any:
return importlib.reload(module)
def register(self, cmd: Command) -> None:
self.commands[cmd.name] = cmd
def unregister(self, command_name: str):
if command_name in self.commands:
del self.commands[command_name]
else:
raise KeyError(f"Command '{command_name}' not found in registry.")
def reload_commands(self) -> None:
"""Reloads all loaded command plugins."""
for cmd_name in self.commands:
cmd = self.commands[cmd_name]
module = self._import_module(cmd.__module__)
reloaded_module = self._reload_module(module)
if hasattr(reloaded_module, "register"):
reloaded_module.register(self)
def get_command(self, name: str) -> Callable[..., Any]:
return self.commands[name]
def call(self, command_name: str, **kwargs) -> Any:
if command_name not in self.commands:
raise KeyError(f"Command '{command_name}' not found in registry.")
command = self.commands[command_name]
return command(**kwargs)
def command_prompt(self) -> str:
"""
Returns a string representation of all registered `Command` objects for use in a prompt
"""
commands_list = [
f"{idx + 1}. {str(cmd)}" for idx, cmd in enumerate(self.commands.values())
]
return "\n".join(commands_list)
def import_commands(self, module_name: str) -> None:
"""
Imports the specified Python module containing command plugins.
This method imports the associated module and registers any functions or
classes that are decorated with the `AUTO_GPT_COMMAND_IDENTIFIER` attribute
as `Command` objects. The registered `Command` objects are then added to the
`commands` dictionary of the `CommandRegistry` object.
Args:
module_name (str): The name of the module to import for command plugins.
"""
module = importlib.import_module(module_name)
for attr_name in dir(module):
attr = getattr(module, attr_name)
# Register decorated functions
if hasattr(attr, AUTO_GPT_COMMAND_IDENTIFIER) and getattr(
attr, AUTO_GPT_COMMAND_IDENTIFIER
):
self.register(attr.command)
# Register command classes
elif (
inspect.isclass(attr) and issubclass(attr, Command) and attr != Command
):
cmd_instance = attr()
self.register(cmd_instance)
def command(
name: str,
description: str,
signature: str = "",
enabled: bool = True,
disabled_reason: Optional[str] = None,
) -> Callable[..., Any]:
"""The command decorator is used to create Command objects from ordinary functions."""
def decorator(func: Callable[..., Any]) -> Command:
cmd = Command(
name=name,
description=description,
method=func,
signature=signature,
enabled=enabled,
disabled_reason=disabled_reason,
)
def wrapper(*args, **kwargs) -> Any:
return func(*args, **kwargs)
wrapper.command = cmd
setattr(wrapper, AUTO_GPT_COMMAND_IDENTIFIER, True)
return wrapper
return decorator

View File

@ -5,19 +5,24 @@ import subprocess
import docker
from docker.errors import ImageNotFound
from autogpt.commands.command import command
from autogpt.config import Config
from autogpt.workspace import WORKSPACE_PATH, path_in_workspace
CFG = Config()
def execute_python_file(file: str) -> str:
@command("execute_python_file", "Execute Python File", '"filename": "<filename>"')
def execute_python_file(filename: str) -> str:
"""Execute a Python file in a Docker container and return the output
Args:
file (str): The name of the file to execute
filename (str): The name of the file to execute
Returns:
str: The output of the file
"""
file = filename
print(f"Executing file '{file}' in workspace '{WORKSPACE_PATH}'")
if not file.endswith(".py"):
@ -94,6 +99,15 @@ def execute_python_file(file: str) -> str:
return f"Error: {str(e)}"
@command(
"execute_shell",
"Execute Shell Command, non-interactive commands only",
'"command_line": "<command_line>"',
CFG.execute_local_commands,
"You are not allowed to run local shell commands. To execute"
" shell commands, EXECUTE_LOCAL_COMMANDS must be set to 'True' "
"in your config. Do not attempt to bypass the restriction.",
)
def execute_shell(command_line: str) -> str:
"""Execute a shell command and return the output
@ -103,6 +117,13 @@ def execute_shell(command_line: str) -> str:
Returns:
str: The output of the command
"""
if not CFG.execute_local_commands:
return (
"You are not allowed to run local shell commands. To execute"
" shell commands, EXECUTE_LOCAL_COMMANDS must be set to 'True' "
"in your config. Do not attempt to bypass the restriction."
)
current_dir = os.getcwd()
# Change dir into workspace if necessary
if str(WORKSPACE_PATH) not in current_dir:
@ -117,9 +138,16 @@ def execute_shell(command_line: str) -> str:
os.chdir(current_dir)
return output
@command(
"execute_shell_popen",
"Execute Shell Command, non-interactive commands only",
'"command_line": "<command_line>"',
CFG.execute_local_commands,
"You are not allowed to run local shell commands. To execute"
" shell commands, EXECUTE_LOCAL_COMMANDS must be set to 'True' "
"in your config. Do not attempt to bypass the restriction.",
)
def execute_shell_popen(command_line) -> str:
"""Execute a shell command with Popen and returns an english description
of the event and the process id

View File

@ -9,10 +9,13 @@ import requests
from colorama import Back, Fore
from requests.adapters import HTTPAdapter, Retry
from autogpt.commands.command import command
from autogpt.config import Config
from autogpt.spinner import Spinner
from autogpt.utils import readable_file_size
from autogpt.workspace import WORKSPACE_PATH, path_in_workspace
CFG = Config()
LOG_FILE = "file_logger.txt"
LOG_FILE_PATH = WORKSPACE_PATH / LOG_FILE
@ -81,6 +84,7 @@ def split_file(
start += max_length - overlap
@command("read_file", "Read file", '"filename": "<filename>"')
def read_file(filename: str) -> str:
"""Read a file and return the contents
@ -133,6 +137,7 @@ def ingest_file(
print(f"Error while ingesting file '{filename}': {str(e)}")
@command("write_to_file", "Write to file", '"filename": "<filename>", "text": "<text>"')
def write_to_file(filename: str, text: str) -> str:
"""Write text to a file
@ -158,6 +163,9 @@ def write_to_file(filename: str, text: str) -> str:
return f"Error: {str(e)}"
@command(
"append_to_file", "Append to file", '"filename": "<filename>", "text": "<text>"'
)
def append_to_file(filename: str, text: str, shouldLog: bool = True) -> str:
"""Append text to a file
@ -181,6 +189,7 @@ def append_to_file(filename: str, text: str, shouldLog: bool = True) -> str:
return f"Error: {str(e)}"
@command("delete_file", "Delete file", '"filename": "<filename>"')
def delete_file(filename: str) -> str:
"""Delete a file
@ -201,6 +210,7 @@ def delete_file(filename: str) -> str:
return f"Error: {str(e)}"
@command("search_files", "Search Files", '"directory": "<directory>"')
def search_files(directory: str) -> list[str]:
"""Search for files in a directory
@ -227,6 +237,13 @@ def search_files(directory: str) -> list[str]:
return found_files
@command(
"download_file",
"Search Files",
'"url": "<url>", "filename": "<filename>"',
CFG.allow_downloads,
"Error: You do not have user authorization to download files locally.",
)
def download_file(url, filename):
"""Downloads a file
Args:

View File

@ -1,26 +1,34 @@
"""Git operations for autogpt"""
import git
from git.repo import Repo
from autogpt.commands.command import command
from autogpt.config import Config
from autogpt.workspace import path_in_workspace
CFG = Config()
def clone_repository(repo_url: str, clone_path: str) -> str:
@command(
"clone_repository",
"Clone Repositoryy",
'"repository_url": "<repository_url>", "clone_path": "<clone_path>"',
CFG.github_username and CFG.github_api_key,
"Configure github_username and github_api_key.",
)
def clone_repository(repository_url: str, clone_path: str) -> str:
"""Clone a GitHub repository locally
Args:
repo_url (str): The URL of the repository to clone
repository_url (str): The URL of the repository to clone
clone_path (str): The path to clone the repository to
Returns:
str: The result of the clone operation"""
split_url = repo_url.split("//")
split_url = repository_url.split("//")
auth_repo_url = f"//{CFG.github_username}:{CFG.github_api_key}@".join(split_url)
safe_clone_path = path_in_workspace(clone_path)
try:
git.Repo.clone_from(auth_repo_url, safe_clone_path)
return f"""Cloned {repo_url} to {safe_clone_path}"""
Repo.clone_from(auth_repo_url, safe_clone_path)
return f"""Cloned {repository_url} to {safe_clone_path}"""
except Exception as e:
return f"Error: {str(e)}"

View File

@ -5,11 +5,13 @@ import json
from duckduckgo_search import ddg
from autogpt.commands.command import command
from autogpt.config import Config
CFG = Config()
@command("google", "Google Search", '"query": "<query>"', not CFG.google_api_key)
def google_search(query: str, num_results: int = 8) -> str:
"""Return the results of a Google search
@ -31,9 +33,17 @@ def google_search(query: str, num_results: int = 8) -> str:
for j in results:
search_results.append(j)
return json.dumps(search_results, ensure_ascii=False, indent=4)
results = json.dumps(search_results, ensure_ascii=False, indent=4)
return safe_google_results(results)
@command(
"google",
"Google Search",
'"query": "<query>"',
bool(CFG.google_api_key),
"Configure google_api_key.",
)
def google_official_search(query: str, num_results: int = 8) -> str | list[str]:
"""Return the results of a Google search using the official Google API
@ -82,6 +92,26 @@ def google_official_search(query: str, num_results: int = 8) -> str | list[str]:
return "Error: The provided Google API key is invalid or missing."
else:
return f"Error: {e}"
# google_result can be a list or a string depending on the search results
# Return the list of search result URLs
return search_results_links
return safe_google_results(search_results_links)
def safe_google_results(results: str | list) -> str:
"""
Return the results of a google search in a safe format.
Args:
results (str | list): The search results.
Returns:
str: The results of the search.
"""
if isinstance(results, list):
safe_message = json.dumps(
[result.enocde("utf-8", "ignore") for result in results]
)
else:
safe_message = results.encode("utf-8", "ignore").decode("utf-8")
return safe_message

View File

@ -1,6 +1,5 @@
""" Image Generation Module for AutoGPT."""
import io
import os.path
import uuid
from base64 import b64decode
@ -8,12 +7,14 @@ import openai
import requests
from PIL import Image
from autogpt.commands.command import command
from autogpt.config import Config
from autogpt.workspace import path_in_workspace
CFG = Config()
@command("generate_image", "Generate Image", '"prompt": "<prompt>"', CFG.image_provider)
def generate_image(prompt: str, size: int = 256) -> str:
"""Generate an image from a prompt.

View File

@ -2,9 +2,15 @@ from __future__ import annotations
import json
from autogpt.commands.command import command
from autogpt.llm_utils import call_ai_function
@command(
"improve_code",
"Get Improved Code",
'"suggestions": "<list_of_suggestions>", "code": "<full_code_string>"',
)
def improve_code(suggestions: list[str], code: str) -> str:
"""
A function that takes in code and suggestions and returns a response from create

View File

@ -1,12 +1,30 @@
"""A module that contains a command to send a tweet."""
import os
import tweepy
from dotenv import load_dotenv
from autogpt.commands.command import command
load_dotenv()
def send_tweet(tweet_text):
@command(
"send_tweet",
"Send Tweet",
'"tweet_text": "<tweet_text>"',
)
def send_tweet(tweet_text: str) -> str:
"""
A function that takes in a string and returns a response from create chat
completion api call.
Args:
tweet_text (str): Text to be tweeted.
Returns:
A result from sending the tweet.
"""
consumer_key = os.environ.get("TW_CONSUMER_KEY")
consumer_secret = os.environ.get("TW_CONSUMER_SECRET")
access_token = os.environ.get("TW_ACCESS_TOKEN")
@ -21,6 +39,6 @@ def send_tweet(tweet_text):
# Send tweet
try:
api.update_status(tweet_text)
print("Tweet sent successfully!")
return "Tweet sent successfully!"
except tweepy.TweepyException as e:
print("Error sending tweet: {}".format(e.reason))
return f"Error sending tweet: {e.reason}"

View File

@ -18,6 +18,7 @@ from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
import autogpt.processing.text as summary
from autogpt.commands.command import command
from autogpt.config import Config
from autogpt.processing.html import extract_hyperlinks, format_hyperlinks
@ -25,6 +26,11 @@ FILE_DIR = Path(__file__).parent.parent
CFG = Config()
@command(
"browse_website",
"Browse Website",
'"url": "<url>", "question": "<what_you_want_to_find_on_website>"',
)
def browse_website(url: str, question: str) -> tuple[str, WebDriver]:
"""Browse a website and return the answer and links to the user

View File

@ -3,9 +3,15 @@ from __future__ import annotations
import json
from autogpt.commands.command import command
from autogpt.llm_utils import call_ai_function
@command(
"write_tests",
"Write Tests",
'"code": "<full_code_string>", "focus": "<list_of_focus_areas>"',
)
def write_tests(code: str, focus: list[str]) -> str:
"""
A function that takes in code and focus topics and returns a response from create

View File

@ -5,10 +5,16 @@ A module that contains the AIConfig class object that contains the configuration
from __future__ import annotations
import os
from typing import Type
from pathlib import Path
from typing import Optional, Type
import yaml
from autogpt.prompts.generator import PromptGenerator
# Soon this will go in a folder where it remembers more stuff about the run(s)
SAVE_FILE = str(Path(os.getcwd()) / "ai_settings.yaml")
class AIConfig:
"""
@ -38,9 +44,8 @@ class AIConfig:
self.ai_name = ai_name
self.ai_role = ai_role
self.ai_goals = ai_goals
# Soon this will go in a folder where it remembers more stuff about the run(s)
SAVE_FILE = os.path.join(os.path.dirname(__file__), "..", "ai_settings.yaml")
self.prompt_generator = None
self.command_registry = None
@staticmethod
def load(config_file: str = SAVE_FILE) -> "AIConfig":
@ -89,7 +94,9 @@ class AIConfig:
with open(config_file, "w", encoding="utf-8") as file:
yaml.dump(config, file, allow_unicode=True)
def construct_full_prompt(self) -> str:
def construct_full_prompt(
self, prompt_generator: Optional[PromptGenerator] = None
) -> str:
"""
Returns a prompt to the user with the class information in an organized fashion.
@ -108,14 +115,25 @@ class AIConfig:
""
)
from autogpt.prompt import get_prompt
from autogpt.config import Config
from autogpt.prompts.prompt import build_default_prompt_generator
cfg = Config()
if prompt_generator is None:
prompt_generator = build_default_prompt_generator()
prompt_generator.goals = self.ai_goals
prompt_generator.name = self.ai_name
prompt_generator.role = self.ai_role
prompt_generator.command_registry = self.command_registry
for plugin in cfg.plugins:
if not plugin.can_handle_post_prompt():
continue
prompt_generator = plugin.post_prompt(prompt_generator)
# Construct full prompt
full_prompt = (
f"You are {self.ai_name}, {self.ai_role}\n{prompt_start}\n\nGOALS:\n\n"
)
full_prompt = f"You are {prompt_generator.name}, {prompt_generator.role}\n{prompt_start}\n\nGOALS:\n\n"
for i, goal in enumerate(self.ai_goals):
full_prompt += f"{i+1}. {goal}\n"
full_prompt += f"\n\n{get_prompt()}"
self.prompt_generator = prompt_generator
full_prompt += f"\n\n{prompt_generator.generate_prompt_string()}"
return full_prompt

View File

@ -1,8 +1,10 @@
"""Configuration class to store the state of bools for different scripts access."""
import os
from typing import List
import openai
import yaml
from auto_gpt_plugin_template import AutoGPTPluginTemplate
from colorama import Fore
from dotenv import load_dotenv
@ -123,6 +125,18 @@ class Config(metaclass=Singleton):
# Initialize the OpenAI API client
openai.api_key = self.openai_api_key
self.plugins_dir = os.getenv("PLUGINS_DIR", "plugins")
self.plugins: List[AutoGPTPluginTemplate] = []
self.plugins_openai = []
plugins_allowlist = os.getenv("ALLOWLISTED_PLUGINS")
if plugins_allowlist:
plugins_allowlist = plugins_allowlist.split(",")
self.plugins_whitelist = plugins_allowlist
else:
self.plugins_whitelist = []
self.plugins_blacklist = []
def get_azure_deployment_id_for_model(self, model: str) -> str:
"""
Returns the relevant deployment id for the model specified.
@ -241,6 +255,10 @@ class Config(metaclass=Singleton):
"""Set the debug mode value."""
self.debug_mode = value
def set_plugins(self, value: list) -> None:
"""Set the plugins value."""
self.plugins = value
def check_openai_api_key() -> None:
"""Check if the OpenAI API key is set in config.py or as an environment variable."""

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import time
from ast import List
from typing import List, Optional
import openai
from colorama import Fore, Style
@ -9,6 +9,7 @@ from openai.error import APIError, RateLimitError
from autogpt.config import Config
from autogpt.logs import logger
from autogpt.types.openai import Message
CFG = Config()
@ -37,8 +38,8 @@ def call_ai_function(
# For each arg, if any are None, convert to "None":
args = [str(arg) if arg is not None else "None" for arg in args]
# parse args to comma separated string
args = ", ".join(args)
messages = [
args: str = ", ".join(args)
messages: List[Message] = [
{
"role": "system",
"content": f"You are now the following python function: ```# {description}"
@ -53,15 +54,15 @@ def call_ai_function(
# Overly simple abstraction until we create something better
# simple retry mechanism when getting a rate error or a bad gateway
def create_chat_completion(
messages: list, # type: ignore
model: str | None = None,
messages: List[Message], # type: ignore
model: Optional[str] = None,
temperature: float = CFG.temperature,
max_tokens: int | None = None,
max_tokens: Optional[int] = None,
) -> str:
"""Create a chat completion using the OpenAI API
Args:
messages (list[dict[str, str]]): The messages to send to the chat completion
messages (List[Message]): The messages to send to the chat completion
model (str, optional): The model to use. Defaults to None.
temperature (float, optional): The temperature to use. Defaults to 0.9.
max_tokens (int, optional): The max tokens to use. Defaults to None.
@ -69,15 +70,28 @@ def create_chat_completion(
Returns:
str: The response from the chat completion
"""
response = None
num_retries = 10
warned_user = False
if CFG.debug_mode:
print(
Fore.GREEN
+ f"Creating chat completion with model {model}, temperature {temperature},"
f" max_tokens {max_tokens}" + Fore.RESET
f"{Fore.GREEN}Creating chat completion with model {model}, temperature {temperature}, max_tokens {max_tokens}{Fore.RESET}"
)
for plugin in CFG.plugins:
if plugin.can_handle_chat_completion(
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
):
message = plugin.handle_chat_completion(
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
)
if message is not None:
return message
response = None
for attempt in range(num_retries):
backoff = 2 ** (attempt + 2)
try:
@ -100,8 +114,7 @@ def create_chat_completion(
except RateLimitError:
if CFG.debug_mode:
print(
Fore.RED + "Error: ",
f"Reached rate limit, passing..." + Fore.RESET,
f"{Fore.RED}Error: ", f"Reached rate limit, passing...{Fore.RESET}"
)
if not warned_user:
logger.double_check(
@ -110,16 +123,14 @@ def create_chat_completion(
)
warned_user = True
except APIError as e:
if e.http_status == 502:
pass
else:
if e.http_status != 502:
raise
if attempt == num_retries - 1:
raise
if CFG.debug_mode:
print(
Fore.RED + "Error: ",
f"API Bad gateway. Waiting {backoff} seconds..." + Fore.RESET,
f"{Fore.RED}Error: ",
f"API Bad gateway. Waiting {backoff} seconds...{Fore.RESET}",
)
time.sleep(backoff)
if response is None:
@ -134,8 +145,12 @@ def create_chat_completion(
raise RuntimeError(f"Failed to get response after {num_retries} retries")
else:
quit(1)
return response.choices[0].message["content"]
resp = response.choices[0].message["content"]
for plugin in CFG.plugins:
if not plugin.can_handle_on_response():
continue
resp = plugin.on_response(resp)
return resp
def create_embedding_with_ada(text) -> list:
@ -158,15 +173,13 @@ def create_embedding_with_ada(text) -> list:
except RateLimitError:
pass
except APIError as e:
if e.http_status == 502:
pass
else:
if e.http_status != 502:
raise
if attempt == num_retries - 1:
raise
if CFG.debug_mode:
print(
Fore.RED + "Error: ",
f"API Bad gateway. Waiting {backoff} seconds..." + Fore.RESET,
f"{Fore.RED}Error: ",
f"API Bad gateway. Waiting {backoff} seconds...{Fore.RESET}",
)
time.sleep(backoff)

View File

@ -0,0 +1,199 @@
"""Handles loading of plugins."""
from typing import Any, Dict, List, Optional, Tuple, TypedDict, TypeVar
from auto_gpt_plugin_template import AutoGPTPluginTemplate
PromptGenerator = TypeVar("PromptGenerator")
class Message(TypedDict):
role: str
content: str
class BaseOpenAIPlugin(AutoGPTPluginTemplate):
"""
This is a BaseOpenAIPlugin class for generating Auto-GPT plugins.
"""
def __init__(self, manifests_specs_clients: dict):
# super().__init__()
self._name = manifests_specs_clients["manifest"]["name_for_model"]
self._version = manifests_specs_clients["manifest"]["schema_version"]
self._description = manifests_specs_clients["manifest"]["description_for_model"]
self._client = manifests_specs_clients["client"]
self._manifest = manifests_specs_clients["manifest"]
self._openapi_spec = manifests_specs_clients["openapi_spec"]
def can_handle_on_response(self) -> bool:
"""This method is called to check that the plugin can
handle the on_response method.
Returns:
bool: True if the plugin can handle the on_response method."""
return False
def on_response(self, response: str, *args, **kwargs) -> str:
"""This method is called when a response is received from the model."""
return response
def can_handle_post_prompt(self) -> bool:
"""This method is called to check that the plugin can
handle the post_prompt method.
Returns:
bool: True if the plugin can handle the post_prompt method."""
return False
def post_prompt(self, prompt: PromptGenerator) -> PromptGenerator:
"""This method is called just after the generate_prompt is called,
but actually before the prompt is generated.
Args:
prompt (PromptGenerator): The prompt generator.
Returns:
PromptGenerator: The prompt generator.
"""
return prompt
def can_handle_on_planning(self) -> bool:
"""This method is called to check that the plugin can
handle the on_planning method.
Returns:
bool: True if the plugin can handle the on_planning method."""
return False
def on_planning(
self, prompt: PromptGenerator, messages: List[Message]
) -> Optional[str]:
"""This method is called before the planning chat completion is done.
Args:
prompt (PromptGenerator): The prompt generator.
messages (List[str]): The list of messages.
"""
pass
def can_handle_post_planning(self) -> bool:
"""This method is called to check that the plugin can
handle the post_planning method.
Returns:
bool: True if the plugin can handle the post_planning method."""
return False
def post_planning(self, response: str) -> str:
"""This method is called after the planning chat completion is done.
Args:
response (str): The response.
Returns:
str: The resulting response.
"""
return response
def can_handle_pre_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the pre_instruction method.
Returns:
bool: True if the plugin can handle the pre_instruction method."""
return False
def pre_instruction(self, messages: List[Message]) -> List[Message]:
"""This method is called before the instruction chat is done.
Args:
messages (List[Message]): The list of context messages.
Returns:
List[Message]: The resulting list of messages.
"""
return messages
def can_handle_on_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the on_instruction method.
Returns:
bool: True if the plugin can handle the on_instruction method."""
return False
def on_instruction(self, messages: List[Message]) -> Optional[str]:
"""This method is called when the instruction chat is done.
Args:
messages (List[Message]): The list of context messages.
Returns:
Optional[str]: The resulting message.
"""
pass
def can_handle_post_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the post_instruction method.
Returns:
bool: True if the plugin can handle the post_instruction method."""
return False
def post_instruction(self, response: str) -> str:
"""This method is called after the instruction chat is done.
Args:
response (str): The response.
Returns:
str: The resulting response.
"""
return response
def can_handle_pre_command(self) -> bool:
"""This method is called to check that the plugin can
handle the pre_command method.
Returns:
bool: True if the plugin can handle the pre_command method."""
return False
def pre_command(
self, command_name: str, arguments: Dict[str, Any]
) -> Tuple[str, Dict[str, Any]]:
"""This method is called before the command is executed.
Args:
command_name (str): The command name.
arguments (Dict[str, Any]): The arguments.
Returns:
Tuple[str, Dict[str, Any]]: The command name and the arguments.
"""
return command_name, arguments
def can_handle_post_command(self) -> bool:
"""This method is called to check that the plugin can
handle the post_command method.
Returns:
bool: True if the plugin can handle the post_command method."""
return False
def post_command(self, command_name: str, response: str) -> str:
"""This method is called after the command is executed.
Args:
command_name (str): The command name.
response (str): The response.
Returns:
str: The resulting response.
"""
return response
def can_handle_chat_completion(
self, messages: Dict[Any, Any], model: str, temperature: float, max_tokens: int
) -> bool:
"""This method is called to check that the plugin can
handle the chat_completion method.
Args:
messages (List[Message]): The messages.
model (str): The model name.
temperature (float): The temperature.
max_tokens (int): The max tokens.
Returns:
bool: True if the plugin can handle the chat_completion method."""
return False
def handle_chat_completion(
self, messages: List[Message], model: str, temperature: float, max_tokens: int
) -> str:
"""This method is called when the chat completion is done.
Args:
messages (List[Message]): The messages.
model (str): The model name.
temperature (float): The temperature.
max_tokens (int): The max tokens.
Returns:
str: The resulting response.
"""
pass

265
autogpt/plugins.py Normal file
View File

@ -0,0 +1,265 @@
"""Handles loading of plugins."""
import importlib
import json
import os
import zipfile
from pathlib import Path
from typing import List, Optional, Tuple
from urllib.parse import urlparse
from zipimport import zipimporter
import openapi_python_client
import requests
from auto_gpt_plugin_template import AutoGPTPluginTemplate
from openapi_python_client.cli import Config as OpenAPIConfig
from autogpt.config import Config
from autogpt.models.base_open_ai_plugin import BaseOpenAIPlugin
def inspect_zip_for_module(zip_path: str, debug: bool = False) -> Optional[str]:
"""
Inspect a zipfile for a module.
Args:
zip_path (str): Path to the zipfile.
debug (bool, optional): Enable debug logging. Defaults to False.
Returns:
Optional[str]: The name of the module if found, else None.
"""
with zipfile.ZipFile(zip_path, "r") as zfile:
for name in zfile.namelist():
if name.endswith("__init__.py"):
if debug:
print(f"Found module '{name}' in the zipfile at: {name}")
return name
if debug:
print(f"Module '__init__.py' not found in the zipfile @ {zip_path}.")
return None
def write_dict_to_json_file(data: dict, file_path: str) -> None:
"""
Write a dictionary to a JSON file.
Args:
data (dict): Dictionary to write.
file_path (str): Path to the file.
"""
with open(file_path, "w") as file:
json.dump(data, file, indent=4)
def fetch_openai_plugins_manifest_and_spec(cfg: Config) -> dict:
"""
Fetch the manifest for a list of OpenAI plugins.
Args:
urls (List): List of URLs to fetch.
Returns:
dict: per url dictionary of manifest and spec.
"""
# TODO add directory scan
manifests = {}
for url in cfg.plugins_openai:
openai_plugin_client_dir = f"{cfg.plugins_dir}/openai/{urlparse(url).netloc}"
create_directory_if_not_exists(openai_plugin_client_dir)
if not os.path.exists(f"{openai_plugin_client_dir}/ai-plugin.json"):
try:
response = requests.get(f"{url}/.well-known/ai-plugin.json")
if response.status_code == 200:
manifest = response.json()
if manifest["schema_version"] != "v1":
print(
f"Unsupported manifest version: {manifest['schem_version']} for {url}"
)
continue
if manifest["api"]["type"] != "openapi":
print(
f"Unsupported API type: {manifest['api']['type']} for {url}"
)
continue
write_dict_to_json_file(
manifest, f"{openai_plugin_client_dir}/ai-plugin.json"
)
else:
print(f"Failed to fetch manifest for {url}: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error while requesting manifest from {url}: {e}")
else:
print(f"Manifest for {url} already exists")
manifest = json.load(open(f"{openai_plugin_client_dir}/ai-plugin.json"))
if not os.path.exists(f"{openai_plugin_client_dir}/openapi.json"):
openapi_spec = openapi_python_client._get_document(
url=manifest["api"]["url"], path=None, timeout=5
)
write_dict_to_json_file(
openapi_spec, f"{openai_plugin_client_dir}/openapi.json"
)
else:
print(f"OpenAPI spec for {url} already exists")
openapi_spec = json.load(open(f"{openai_plugin_client_dir}/openapi.json"))
manifests[url] = {"manifest": manifest, "openapi_spec": openapi_spec}
return manifests
def create_directory_if_not_exists(directory_path: str) -> bool:
"""
Create a directory if it does not exist.
Args:
directory_path (str): Path to the directory.
Returns:
bool: True if the directory was created, else False.
"""
if not os.path.exists(directory_path):
try:
os.makedirs(directory_path)
print(f"Created directory: {directory_path}")
return True
except OSError as e:
print(f"Error creating directory {directory_path}: {e}")
return False
else:
print(f"Directory {directory_path} already exists")
return True
def initialize_openai_plugins(
manifests_specs: dict, cfg: Config, debug: bool = False
) -> dict:
"""
Initialize OpenAI plugins.
Args:
manifests_specs (dict): per url dictionary of manifest and spec.
cfg (Config): Config instance including plugins config
debug (bool, optional): Enable debug logging. Defaults to False.
Returns:
dict: per url dictionary of manifest, spec and client.
"""
openai_plugins_dir = f"{cfg.plugins_dir}/openai"
if create_directory_if_not_exists(openai_plugins_dir):
for url, manifest_spec in manifests_specs.items():
openai_plugin_client_dir = f"{openai_plugins_dir}/{urlparse(url).hostname}"
_meta_option = (openapi_python_client.MetaType.SETUP,)
_config = OpenAPIConfig(
**{
"project_name_override": "client",
"package_name_override": "client",
}
)
prev_cwd = Path.cwd()
os.chdir(openai_plugin_client_dir)
Path("ai-plugin.json")
if not os.path.exists("client"):
client_results = openapi_python_client.create_new_client(
url=manifest_spec["manifest"]["api"]["url"],
path=None,
meta=_meta_option,
config=_config,
)
if client_results:
print(
f"Error creating OpenAPI client: {client_results[0].header} \n"
f" details: {client_results[0].detail}"
)
continue
spec = importlib.util.spec_from_file_location(
"client", "client/client/client.py"
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
client = module.Client(base_url=url)
os.chdir(prev_cwd)
manifest_spec["client"] = client
return manifests_specs
def instantiate_openai_plugin_clients(
manifests_specs_clients: dict, cfg: Config, debug: bool = False
) -> dict:
"""
Instantiates BaseOpenAIPlugin instances for each OpenAI plugin.
Args:
manifests_specs_clients (dict): per url dictionary of manifest, spec and client.
cfg (Config): Config instance including plugins config
debug (bool, optional): Enable debug logging. Defaults to False.
Returns:
plugins (dict): per url dictionary of BaseOpenAIPlugin instances.
"""
plugins = {}
for url, manifest_spec_client in manifests_specs_clients.items():
plugins[url] = BaseOpenAIPlugin(manifest_spec_client)
return plugins
def scan_plugins(cfg: Config, debug: bool = False) -> List[AutoGPTPluginTemplate]:
"""Scan the plugins directory for plugins and loads them.
Args:
cfg (Config): Config instance including plugins config
debug (bool, optional): Enable debug logging. Defaults to False.
Returns:
List[Tuple[str, Path]]: List of plugins.
"""
loaded_plugins = []
# Generic plugins
plugins_path_path = Path(cfg.plugins_dir)
for plugin in plugins_path_path.glob("*.zip"):
if module := inspect_zip_for_module(str(plugin), debug):
plugin = Path(plugin)
module = Path(module)
if debug:
print(f"Plugin: {plugin} Module: {module}")
zipped_package = zipimporter(str(plugin))
zipped_module = zipped_package.load_module(str(module.parent))
for key in dir(zipped_module):
if key.startswith("__"):
continue
a_module = getattr(zipped_module, key)
a_keys = dir(a_module)
if (
"_abc_impl" in a_keys
and a_module.__name__ != "AutoGPTPluginTemplate"
and blacklist_whitelist_check(a_module.__name__, cfg)
):
loaded_plugins.append(a_module())
# OpenAI plugins
if cfg.plugins_openai:
manifests_specs = fetch_openai_plugins_manifest_and_spec(cfg)
if manifests_specs.keys():
manifests_specs_clients = initialize_openai_plugins(
manifests_specs, cfg, debug
)
for url, openai_plugin_meta in manifests_specs_clients.items():
if blacklist_whitelist_check(url, cfg):
plugin = BaseOpenAIPlugin(openai_plugin_meta)
loaded_plugins.append(plugin)
if loaded_plugins:
print(f"\nPlugins found: {len(loaded_plugins)}\n" "--------------------")
for plugin in loaded_plugins:
print(f"{plugin._name}: {plugin._version} - {plugin._description}")
return loaded_plugins
def blacklist_whitelist_check(plugin_name: str, cfg: Config) -> bool:
"""Check if the plugin is in the whitelist or blacklist.
Args:
plugin_name (str): Name of the plugin.
cfg (Config): Config object.
Returns:
True or False
"""
if plugin_name in cfg.plugins_blacklist:
return False
if plugin_name in cfg.plugins_whitelist:
return True
ack = input(
f"WARNNG Plugin {plugin_name} found. But not in the"
" whitelist... Load? (y/n): "
)
return ack.lower() == "y"

View File

View File

@ -1,8 +1,6 @@
""" A module for generating custom prompt strings."""
from __future__ import annotations
import json
from typing import Any
from typing import Any, Callable, Dict, List, Optional
class PromptGenerator:
@ -20,6 +18,10 @@ class PromptGenerator:
self.commands = []
self.resources = []
self.performance_evaluation = []
self.goals = []
self.command_registry = None
self.name = "Bob"
self.role = "AI"
self.response_format = {
"thoughts": {
"text": "thought",
@ -40,7 +42,13 @@ class PromptGenerator:
"""
self.constraints.append(constraint)
def add_command(self, command_label: str, command_name: str, args=None) -> None:
def add_command(
self,
command_label: str,
command_name: str,
args=None,
function: Optional[Callable] = None,
) -> None:
"""
Add a command to the commands list with a label, name, and optional arguments.
@ -49,6 +57,8 @@ class PromptGenerator:
command_name (str): The name of the command.
args (dict, optional): A dictionary containing argument names and their
values. Defaults to None.
function (callable, optional): A callable function to be called when
the command is executed. Defaults to None.
"""
if args is None:
args = {}
@ -59,11 +69,12 @@ class PromptGenerator:
"label": command_label,
"name": command_name,
"args": command_args,
"function": function,
}
self.commands.append(command)
def _generate_command_string(self, command: dict[str, Any]) -> str:
def _generate_command_string(self, command: Dict[str, Any]) -> str:
"""
Generate a formatted string representation of a command.
@ -96,7 +107,7 @@ class PromptGenerator:
"""
self.performance_evaluation.append(evaluation)
def _generate_numbered_list(self, items: list[Any], item_type="list") -> str:
def _generate_numbered_list(self, items: List[Any], item_type="list") -> str:
"""
Generate a numbered list from given items based on the item_type.
@ -109,10 +120,16 @@ class PromptGenerator:
str: The formatted numbered list.
"""
if item_type == "command":
return "\n".join(
f"{i+1}. {self._generate_command_string(item)}"
for i, item in enumerate(items)
)
command_strings = []
if self.command_registry:
command_strings += [
str(item)
for item in self.command_registry.commands.values()
if item.enabled
]
# These are the commands that are added manually, do_nothing and terminate
command_strings += [self._generate_command_string(item) for item in items]
return "\n".join(f"{i+1}. {item}" for i, item in enumerate(command_strings))
else:
return "\n".join(f"{i+1}. {item}" for i, item in enumerate(items))
@ -134,5 +151,5 @@ class PromptGenerator:
f"{self._generate_numbered_list(self.performance_evaluation)}\n\n"
"You should only respond in JSON format as described below \nResponse"
f" Format: \n{formatted_response_format} \nEnsure the response can be"
" parsed by Python json.loads"
"parsed by Python json.loads"
)

View File

@ -1,17 +1,16 @@
from colorama import Fore
from autogpt.config import Config
from autogpt.config.ai_config import AIConfig
from autogpt.config.config import Config
from autogpt.logs import logger
from autogpt.promptgenerator import PromptGenerator
from autogpt.prompts.generator import PromptGenerator
from autogpt.setup import prompt_user
from autogpt.utils import clean_input
CFG = Config()
def get_prompt() -> str:
def build_default_prompt_generator() -> PromptGenerator:
"""
This function generates a prompt string that includes various constraints,
commands, resources, and performance evaluations.
@ -20,9 +19,6 @@ def get_prompt() -> str:
str: The generated prompt string.
"""
# Initialize the Config object
cfg = Config()
# Initialize the PromptGenerator object
prompt_generator = PromptGenerator()
@ -39,96 +35,12 @@ def get_prompt() -> str:
prompt_generator.add_constraint(
'Exclusively use the commands listed in double quotes e.g. "command name"'
)
prompt_generator.add_constraint(
"Use subprocesses for commands that will not terminate within a few minutes"
)
# Define the command list
commands = [
("Google Search", "google", {"input": "<search>"}),
(
"Browse Website",
"browse_website",
{"url": "<url>", "question": "<what_you_want_to_find_on_website>"},
),
(
"Start GPT Agent",
"start_agent",
{"name": "<name>", "task": "<short_task_desc>", "prompt": "<prompt>"},
),
(
"Message GPT Agent",
"message_agent",
{"key": "<key>", "message": "<message>"},
),
("List GPT Agents", "list_agents", {}),
("Delete GPT Agent", "delete_agent", {"key": "<key>"}),
(
"Clone Repository",
"clone_repository",
{"repository_url": "<url>", "clone_path": "<directory>"},
),
("Write to file", "write_to_file", {"file": "<file>", "text": "<text>"}),
("Read file", "read_file", {"file": "<file>"}),
("Append to file", "append_to_file", {"file": "<file>", "text": "<text>"}),
("Delete file", "delete_file", {"file": "<file>"}),
("Search Files", "search_files", {"directory": "<directory>"}),
("Analyze Code", "analyze_code", {"code": "<full_code_string>"}),
(
"Get Improved Code",
"improve_code",
{"suggestions": "<list_of_suggestions>", "code": "<full_code_string>"},
),
(
"Write Tests",
"write_tests",
{"code": "<full_code_string>", "focus": "<list_of_focus_areas>"},
),
("Execute Python File", "execute_python_file", {"file": "<file>"}),
("Generate Image", "generate_image", {"prompt": "<prompt>"}),
("Send Tweet", "send_tweet", {"text": "<text>"}),
]
# Only add the audio to text command if the model is specified
if cfg.huggingface_audio_to_text_model:
commands.append(
("Convert Audio to text", "read_audio_from_file", {"file": "<file>"}),
)
# Only add shell command to the prompt if the AI is allowed to execute it
if cfg.execute_local_commands:
commands.append(
(
"Execute Shell Command, non-interactive commands only",
"execute_shell",
{"command_line": "<command_line>"},
),
)
commands.append(
(
"Execute Shell Command Popen, non-interactive commands only",
"execute_shell_popen",
{"command_line": "<command_line>"},
),
)
# Only add the download file command if the AI is allowed to execute it
if cfg.allow_downloads:
commands.append(
(
"Downloads a file from the internet, and stores it locally",
"download_file",
{"url": "<file_url>", "file": "<saved_filename>"},
),
)
# Add these command last.
commands.append(
("Do Nothing", "do_nothing", {}),
)
commands.append(
("Task Complete (Shutdown)", "task_complete", {"reason": "<reason>"}),
)
]
# Add commands to the PromptGenerator object
for command_label, command_name, args in commands:
@ -159,12 +71,11 @@ def get_prompt() -> str:
"Every command has a cost, so be smart and efficient. Aim to complete tasks in"
" the least number of steps."
)
# Generate the prompt string
return prompt_generator.generate_prompt_string()
prompt_generator.add_performance_evaluation("Write all code to a file.")
return prompt_generator
def construct_prompt() -> str:
def construct_main_ai_config() -> AIConfig:
"""Construct the prompt for the AI to respond to
Returns:
@ -196,8 +107,4 @@ Continue (y/n): """
config = prompt_user()
config.save(CFG.ai_settings_file)
# Get rid of this global:
global ai_name
ai_name = config.ai_name
return config.construct_full_prompt()
return config

View File

@ -1,13 +1,16 @@
"""Functions for counting the number of tokens in a message or string."""
from __future__ import annotations
from typing import List
import tiktoken
from autogpt.logs import logger
from autogpt.types.openai import Message
def count_message_tokens(
messages: list[dict[str, str]], model: str = "gpt-3.5-turbo-0301"
messages: List[Message], model: str = "gpt-3.5-turbo-0301"
) -> int:
"""
Returns the number of tokens used by a list of messages.

9
autogpt/types/openai.py Normal file
View File

@ -0,0 +1,9 @@
"""Type helpers for working with the OpenAI library"""
from typing import TypedDict
class Message(TypedDict):
"""OpenAI Message object containing a role and the message content"""
role: str
content: str

View File

@ -3,7 +3,7 @@ import os
import requests
import yaml
from colorama import Fore
from git import Repo
from git.repo import Repo
def clean_input(prompt: str = ""):

BIN
plugin.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

View File

View File

@ -31,6 +31,7 @@ pre-commit
black
isort
gitpython==3.1.31
auto-gpt-plugin-template
# Items below this point will not be included in the Docker Image
@ -42,3 +43,7 @@ pytest-benchmark
pytest-cov
pytest-integration
pytest-mock
# OpenAI and Generic plugins import
openapi-python-client==0.13.4

0
scripts/__init__.py Normal file
View File

0
tests/mocks/__init__.py Normal file
View File

View File

@ -0,0 +1,6 @@
from autogpt.commands.command import command
@command("function_based", "Function-based test command")
def function_based(arg1: int, arg2: str) -> str:
return f"{arg1} - {arg2}"

177
tests/test_commands.py Normal file
View File

@ -0,0 +1,177 @@
import os
import shutil
import sys
from pathlib import Path
import pytest
from autogpt.commands.command import Command, CommandRegistry
class TestCommand:
@staticmethod
def example_function(arg1: int, arg2: str) -> str:
return f"{arg1} - {arg2}"
def test_command_creation(self):
cmd = Command(
name="example", description="Example command", method=self.example_function
)
assert cmd.name == "example"
assert cmd.description == "Example command"
assert cmd.method == self.example_function
assert cmd.signature == "(arg1: int, arg2: str) -> str"
def test_command_call(self):
cmd = Command(
name="example", description="Example command", method=self.example_function
)
result = cmd(arg1=1, arg2="test")
assert result == "1 - test"
def test_command_call_with_invalid_arguments(self):
cmd = Command(
name="example", description="Example command", method=self.example_function
)
with pytest.raises(TypeError):
cmd(arg1="invalid", does_not_exist="test")
def test_command_default_signature(self):
cmd = Command(
name="example", description="Example command", method=self.example_function
)
assert cmd.signature == "(arg1: int, arg2: str) -> str"
def test_command_custom_signature(self):
custom_signature = "custom_arg1: int, custom_arg2: str"
cmd = Command(
name="example",
description="Example command",
method=self.example_function,
signature=custom_signature,
)
assert cmd.signature == custom_signature
class TestCommandRegistry:
@staticmethod
def example_function(arg1: int, arg2: str) -> str:
return f"{arg1} - {arg2}"
def test_register_command(self):
"""Test that a command can be registered to the registry."""
registry = CommandRegistry()
cmd = Command(
name="example", description="Example command", method=self.example_function
)
registry.register(cmd)
assert cmd.name in registry.commands
assert registry.commands[cmd.name] == cmd
def test_unregister_command(self):
"""Test that a command can be unregistered from the registry."""
registry = CommandRegistry()
cmd = Command(
name="example", description="Example command", method=self.example_function
)
registry.register(cmd)
registry.unregister(cmd.name)
assert cmd.name not in registry.commands
def test_get_command(self):
"""Test that a command can be retrieved from the registry."""
registry = CommandRegistry()
cmd = Command(
name="example", description="Example command", method=self.example_function
)
registry.register(cmd)
retrieved_cmd = registry.get_command(cmd.name)
assert retrieved_cmd == cmd
def test_get_nonexistent_command(self):
"""Test that attempting to get a nonexistent command raises a KeyError."""
registry = CommandRegistry()
with pytest.raises(KeyError):
registry.get_command("nonexistent_command")
def test_call_command(self):
"""Test that a command can be called through the registry."""
registry = CommandRegistry()
cmd = Command(
name="example", description="Example command", method=self.example_function
)
registry.register(cmd)
result = registry.call("example", arg1=1, arg2="test")
assert result == "1 - test"
def test_call_nonexistent_command(self):
"""Test that attempting to call a nonexistent command raises a KeyError."""
registry = CommandRegistry()
with pytest.raises(KeyError):
registry.call("nonexistent_command", arg1=1, arg2="test")
def test_get_command_prompt(self):
"""Test that the command prompt is correctly formatted."""
registry = CommandRegistry()
cmd = Command(
name="example", description="Example command", method=self.example_function
)
registry.register(cmd)
command_prompt = registry.command_prompt()
assert f"(arg1: int, arg2: str)" in command_prompt
def test_import_mock_commands_module(self):
"""Test that the registry can import a module with mock command plugins."""
registry = CommandRegistry()
mock_commands_module = "tests.mocks.mock_commands"
registry.import_commands(mock_commands_module)
assert "function_based" in registry.commands
assert registry.commands["function_based"].name == "function_based"
assert (
registry.commands["function_based"].description
== "Function-based test command"
)
def test_import_temp_command_file_module(self, tmp_path):
"""Test that the registry can import a command plugins module from a temp file."""
registry = CommandRegistry()
# Create a temp command file
src = Path(os.getcwd()) / "tests/mocks/mock_commands.py"
temp_commands_file = tmp_path / "mock_commands.py"
shutil.copyfile(src, temp_commands_file)
# Add the temp directory to sys.path to make the module importable
sys.path.append(str(tmp_path))
temp_commands_module = "mock_commands"
registry.import_commands(temp_commands_module)
# Remove the temp directory from sys.path
sys.path.remove(str(tmp_path))
assert "function_based" in registry.commands
assert registry.commands["function_based"].name == "function_based"
assert (
registry.commands["function_based"].description
== "Function-based test command"
)

View File

@ -1,6 +1,6 @@
from unittest import TestCase
from autogpt.promptgenerator import PromptGenerator
from autogpt.prompts.generator import PromptGenerator
class TestPromptGenerator(TestCase):
@ -38,6 +38,7 @@ class TestPromptGenerator(TestCase):
"label": command_label,
"name": command_name,
"args": args,
"function": None,
}
self.assertIn(command, self.generator.commands)

View File

@ -0,0 +1,79 @@
from typing import Any, Dict, List, Optional, Tuple
import pytest
from autogpt.models.base_open_ai_plugin import (
BaseOpenAIPlugin,
Message,
PromptGenerator,
)
class DummyPlugin(BaseOpenAIPlugin):
pass
@pytest.fixture
def dummy_plugin():
manifests_specs_clients = {
"manifest": {
"name_for_model": "Dummy",
"schema_version": "1.0",
"description_for_model": "A dummy plugin for testing purposes",
},
"client": None,
"openapi_spec": None,
}
return DummyPlugin(manifests_specs_clients)
def test_dummy_plugin_inheritance(dummy_plugin):
assert isinstance(dummy_plugin, BaseOpenAIPlugin)
def test_dummy_plugin_name(dummy_plugin):
assert dummy_plugin._name == "Dummy"
def test_dummy_plugin_version(dummy_plugin):
assert dummy_plugin._version == "1.0"
def test_dummy_plugin_description(dummy_plugin):
assert dummy_plugin._description == "A dummy plugin for testing purposes"
def test_dummy_plugin_default_methods(dummy_plugin):
assert not dummy_plugin.can_handle_on_response()
assert not dummy_plugin.can_handle_post_prompt()
assert not dummy_plugin.can_handle_on_planning()
assert not dummy_plugin.can_handle_post_planning()
assert not dummy_plugin.can_handle_pre_instruction()
assert not dummy_plugin.can_handle_on_instruction()
assert not dummy_plugin.can_handle_post_instruction()
assert not dummy_plugin.can_handle_pre_command()
assert not dummy_plugin.can_handle_post_command()
assert not dummy_plugin.can_handle_chat_completion(None, None, None, None)
assert dummy_plugin.on_response("hello") == "hello"
assert dummy_plugin.post_prompt(None) is None
assert dummy_plugin.on_planning(None, None) is None
assert dummy_plugin.post_planning("world") == "world"
pre_instruction = dummy_plugin.pre_instruction(
[{"role": "system", "content": "Beep, bop, boop"}]
)
assert isinstance(pre_instruction, list)
assert len(pre_instruction) == 1
assert pre_instruction[0]["role"] == "system"
assert pre_instruction[0]["content"] == "Beep, bop, boop"
assert dummy_plugin.on_instruction(None) is None
assert dummy_plugin.post_instruction("I'm a robot") == "I'm a robot"
pre_command = dummy_plugin.pre_command("evolve", {"continuously": True})
assert isinstance(pre_command, tuple)
assert len(pre_command) == 2
assert pre_command[0] == "evolve"
assert pre_command[1]["continuously"] == True
post_command = dummy_plugin.post_command("evolve", "upgraded successfully!")
assert isinstance(post_command, str)
assert post_command == "upgraded successfully!"
assert dummy_plugin.handle_chat_completion(None, None, None, None) is None

View File

@ -9,16 +9,20 @@ Code Analysis
Objective:
The objective of the "scrape_text" function is to scrape the text content from
a given URL and return it as a string, after removing any unwanted HTML tags and scripts.
a given URL and return it as a string, after removing any unwanted HTML tags and
scripts.
Inputs:
- url: a string representing the URL of the webpage to be scraped.
Flow:
1. Send a GET request to the given URL using the requests library and the user agent header from the config file.
1. Send a GET request to the given URL using the requests library and the user agent
header from the config file.
2. Check if the response contains an HTTP error. If it does, return an error message.
3. Use BeautifulSoup to parse the HTML content of the response and extract all script and style tags.
4. Get the text content of the remaining HTML using the get_text() method of BeautifulSoup.
3. Use BeautifulSoup to parse the HTML content of the response and extract all script
and style tags.
4. Get the text content of the remaining HTML using the get_text() method of
BeautifulSoup.
5. Split the text into lines and then into chunks, removing any extra whitespace.
6. Join the chunks into a single string with newline characters between them.
7. Return the cleaned text.
@ -27,9 +31,12 @@ Outputs:
- A string representing the cleaned text content of the webpage.
Additional aspects:
- The function uses the requests library and BeautifulSoup to handle the HTTP request and HTML parsing, respectively.
- The function removes script and style tags from the HTML to avoid including unwanted content in the text output.
- The function uses a generator expression to split the text into lines and chunks, which can improve performance for large amounts of text.
- The function uses the requests library and BeautifulSoup to handle the HTTP request
and HTML parsing, respectively.
- The function removes script and style tags from the HTML to avoid including unwanted
content in the text output.
- The function uses a generator expression to split the text into lines and chunks,
which can improve performance for large amounts of text.
"""
@ -40,26 +47,33 @@ class TestScrapeText:
expected_text = "This is some sample text"
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.text = f"<html><body><div><p style='color: blue;'>{expected_text}</p></div></body></html>"
mock_response.text = (
"<html><body><div><p style='color: blue;'>"
f"{expected_text}</p></div></body></html>"
)
mocker.patch("requests.Session.get", return_value=mock_response)
# Call the function with a valid URL and assert that it returns the expected text
# Call the function with a valid URL and assert that it returns the
# expected text
url = "http://www.example.com"
assert scrape_text(url) == expected_text
# Tests that the function returns an error message when an invalid or unreachable url is provided.
# Tests that the function returns an error message when an invalid or unreachable
# url is provided.
def test_invalid_url(self, mocker):
# Mock the requests.get() method to raise an exception
mocker.patch(
"requests.Session.get", side_effect=requests.exceptions.RequestException
)
# Call the function with an invalid URL and assert that it returns an error message
# Call the function with an invalid URL and assert that it returns an error
# message
url = "http://www.invalidurl.com"
error_message = scrape_text(url)
assert "Error:" in error_message
# Tests that the function returns an empty string when the html page contains no text to be scraped.
# Tests that the function returns an empty string when the html page contains no
# text to be scraped.
def test_no_text(self, mocker):
# Mock the requests.get() method to return a response with no text
mock_response = mocker.Mock()
@ -71,7 +85,8 @@ class TestScrapeText:
url = "http://www.example.com"
assert scrape_text(url) == ""
# Tests that the function returns an error message when the response status code is an http error (>=400).
# Tests that the function returns an error message when the response status code is
# an http error (>=400).
def test_http_error(self, mocker):
# Mock the requests.get() method to return a response with a 404 status code
mocker.patch("requests.Session.get", return_value=mocker.Mock(status_code=404))

112
tests/unit/test_plugins.py Normal file
View File

@ -0,0 +1,112 @@
import pytest
from autogpt.config import Config
from autogpt.plugins import (
blacklist_whitelist_check,
inspect_zip_for_module,
scan_plugins,
)
PLUGINS_TEST_DIR = "tests/unit/data/test_plugins"
PLUGIN_TEST_ZIP_FILE = "Auto-GPT-Plugin-Test-master.zip"
PLUGIN_TEST_INIT_PY = "Auto-GPT-Plugin-Test-master/src/auto_gpt_vicuna/__init__.py"
PLUGIN_TEST_OPENAI = "https://weathergpt.vercel.app/"
def test_inspect_zip_for_module():
result = inspect_zip_for_module(str(f"{PLUGINS_TEST_DIR}/{PLUGIN_TEST_ZIP_FILE}"))
assert result == PLUGIN_TEST_INIT_PY
@pytest.fixture
def mock_config_blacklist_whitelist_check():
class MockConfig:
plugins_blacklist = ["BadPlugin"]
plugins_whitelist = ["GoodPlugin"]
return MockConfig()
def test_blacklist_whitelist_check_blacklist(
mock_config_blacklist_whitelist_check, monkeypatch
):
monkeypatch.setattr("builtins.input", lambda _: "y")
assert not blacklist_whitelist_check(
"BadPlugin", mock_config_blacklist_whitelist_check
)
def test_blacklist_whitelist_check_whitelist(
mock_config_blacklist_whitelist_check, monkeypatch
):
monkeypatch.setattr("builtins.input", lambda _: "y")
assert blacklist_whitelist_check(
"GoodPlugin", mock_config_blacklist_whitelist_check
)
def test_blacklist_whitelist_check_user_input_yes(
mock_config_blacklist_whitelist_check, monkeypatch
):
monkeypatch.setattr("builtins.input", lambda _: "y")
assert blacklist_whitelist_check(
"UnknownPlugin", mock_config_blacklist_whitelist_check
)
def test_blacklist_whitelist_check_user_input_no(
mock_config_blacklist_whitelist_check, monkeypatch
):
monkeypatch.setattr("builtins.input", lambda _: "n")
assert not blacklist_whitelist_check(
"UnknownPlugin", mock_config_blacklist_whitelist_check
)
def test_blacklist_whitelist_check_user_input_invalid(
mock_config_blacklist_whitelist_check, monkeypatch
):
monkeypatch.setattr("builtins.input", lambda _: "invalid")
assert not blacklist_whitelist_check(
"UnknownPlugin", mock_config_blacklist_whitelist_check
)
@pytest.fixture
def config_with_plugins():
cfg = Config()
cfg.plugins_dir = PLUGINS_TEST_DIR
cfg.plugins_openai = ["https://weathergpt.vercel.app/"]
return cfg
@pytest.fixture
def mock_config_openai_plugin():
class MockConfig:
plugins_dir = PLUGINS_TEST_DIR
plugins_openai = [PLUGIN_TEST_OPENAI]
plugins_blacklist = ["AutoGPTPVicuna"]
plugins_whitelist = [PLUGIN_TEST_OPENAI]
return MockConfig()
def test_scan_plugins_openai(mock_config_openai_plugin):
result = scan_plugins(mock_config_openai_plugin, debug=True)
assert len(result) == 1
@pytest.fixture
def mock_config_generic_plugin():
class MockConfig:
plugins_dir = PLUGINS_TEST_DIR
plugins_openai = []
plugins_blacklist = []
plugins_whitelist = ["AutoGPTPVicuna"]
return MockConfig()
def test_scan_plugins_generic(mock_config_generic_plugin):
result = scan_plugins(mock_config_generic_plugin, debug=True)
assert len(result) == 1