Merge pull request #4 from evahteev/_openai-plugin-support

[WIP] Openai plugins support
pull/2531/head
BillSchumacher 2023-04-18 18:20:37 -05:00 committed by GitHub
commit b188c2b3e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 570 additions and 61 deletions

2
.gitignore vendored
View File

@ -156,3 +156,5 @@ vicuna-*
# mac
.DS_Store
openai/

View File

@ -1,7 +1,5 @@
"""Main script for the autogpt package."""
import logging
import os
from pathlib import Path
from colorama import Fore
@ -11,8 +9,10 @@ from autogpt.commands.command import CommandRegistry
from autogpt.config import Config, check_openai_api_key
from autogpt.logs import logger
from autogpt.memory import get_memory
from autogpt.plugins import load_plugins
from autogpt.prompts.prompt import construct_main_ai_config
from autogpt.plugins import scan_plugins
# Load environment variables from .env file
@ -24,27 +24,7 @@ def main() -> None:
check_openai_api_key()
parse_arguments()
logger.set_level(logging.DEBUG if cfg.debug_mode else logging.INFO)
plugins_found = load_plugins(Path(os.getcwd()) / "plugins")
loaded_plugins = []
for plugin in plugins_found:
if plugin.__name__ in cfg.plugins_blacklist:
continue
if plugin.__name__ in cfg.plugins_whitelist:
loaded_plugins.append(plugin())
else:
ack = input(
f"WARNNG Plugin {plugin.__name__} found. But not in the"
" whitelist... Load? (y/n): "
)
if ack.lower() == "y":
loaded_plugins.append(plugin())
if loaded_plugins:
print(f"\nPlugins found: {len(loaded_plugins)}\n" "--------------------")
for plugin in loaded_plugins:
print(f"{plugin._name}: {plugin._version} - {plugin._description}")
cfg.set_plugins(loaded_plugins)
cfg.set_plugins(scan_plugins(cfg, cfg.debug_mode))
# Create a CommandRegistry instance and scan default folder
command_registry = CommandRegistry()
command_registry.import_commands("autogpt.commands.audio_text")

View File

@ -109,7 +109,9 @@ class Config(metaclass=Singleton):
# Initialize the OpenAI API client
openai.api_key = self.openai_api_key
self.plugins_dir = os.getenv("PLUGINS_DIR", "plugins")
self.plugins: List[AutoGPTPluginTemplate] = []
self.plugins_openai = []
self.plugins_whitelist = []
self.plugins_blacklist = []

View File

@ -0,0 +1,200 @@
"""Handles loading of plugins."""
from typing import Any, Dict, List, Optional, Tuple, TypedDict
from typing import TypeVar
from auto_gpt_plugin_template import AutoGPTPluginTemplate
PromptGenerator = TypeVar("PromptGenerator")
class Message(TypedDict):
role: str
content: str
class BaseOpenAIPlugin(AutoGPTPluginTemplate):
"""
This is a BaseOpenAIPlugin class for generating Auto-GPT plugins.
"""
def __init__(self, manifests_specs_clients: dict):
# super().__init__()
self._name = manifests_specs_clients["manifest"]["name_for_model"]
self._version = manifests_specs_clients["manifest"]["schema_version"]
self._description = manifests_specs_clients["manifest"]["description_for_model"]
self._client = manifests_specs_clients["client"]
self._manifest = manifests_specs_clients["manifest"]
self._openapi_spec = manifests_specs_clients["openapi_spec"]
def can_handle_on_response(self) -> bool:
"""This method is called to check that the plugin can
handle the on_response method.
Returns:
bool: True if the plugin can handle the on_response method."""
return False
def on_response(self, response: str, *args, **kwargs) -> str:
"""This method is called when a response is received from the model."""
pass
def can_handle_post_prompt(self) -> bool:
"""This method is called to check that the plugin can
handle the post_prompt method.
Returns:
bool: True if the plugin can handle the post_prompt method."""
return False
def post_prompt(self, prompt: PromptGenerator) -> PromptGenerator:
"""This method is called just after the generate_prompt is called,
but actually before the prompt is generated.
Args:
prompt (PromptGenerator): The prompt generator.
Returns:
PromptGenerator: The prompt generator.
"""
pass
def can_handle_on_planning(self) -> bool:
"""This method is called to check that the plugin can
handle the on_planning method.
Returns:
bool: True if the plugin can handle the on_planning method."""
return False
def on_planning(
self, prompt: PromptGenerator, messages: List[Message]
) -> Optional[str]:
"""This method is called before the planning chat completion is done.
Args:
prompt (PromptGenerator): The prompt generator.
messages (List[str]): The list of messages.
"""
pass
def can_handle_post_planning(self) -> bool:
"""This method is called to check that the plugin can
handle the post_planning method.
Returns:
bool: True if the plugin can handle the post_planning method."""
return False
def post_planning(self, response: str) -> str:
"""This method is called after the planning chat completion is done.
Args:
response (str): The response.
Returns:
str: The resulting response.
"""
pass
def can_handle_pre_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the pre_instruction method.
Returns:
bool: True if the plugin can handle the pre_instruction method."""
return False
def pre_instruction(self, messages: List[Message]) -> List[Message]:
"""This method is called before the instruction chat is done.
Args:
messages (List[Message]): The list of context messages.
Returns:
List[Message]: The resulting list of messages.
"""
pass
def can_handle_on_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the on_instruction method.
Returns:
bool: True if the plugin can handle the on_instruction method."""
return False
def on_instruction(self, messages: List[Message]) -> Optional[str]:
"""This method is called when the instruction chat is done.
Args:
messages (List[Message]): The list of context messages.
Returns:
Optional[str]: The resulting message.
"""
pass
def can_handle_post_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the post_instruction method.
Returns:
bool: True if the plugin can handle the post_instruction method."""
return False
def post_instruction(self, response: str) -> str:
"""This method is called after the instruction chat is done.
Args:
response (str): The response.
Returns:
str: The resulting response.
"""
pass
def can_handle_pre_command(self) -> bool:
"""This method is called to check that the plugin can
handle the pre_command method.
Returns:
bool: True if the plugin can handle the pre_command method."""
return False
def pre_command(
self, command_name: str, arguments: Dict[str, Any]
) -> Tuple[str, Dict[str, Any]]:
"""This method is called before the command is executed.
Args:
command_name (str): The command name.
arguments (Dict[str, Any]): The arguments.
Returns:
Tuple[str, Dict[str, Any]]: The command name and the arguments.
"""
pass
def can_handle_post_command(self) -> bool:
"""This method is called to check that the plugin can
handle the post_command method.
Returns:
bool: True if the plugin can handle the post_command method."""
return False
def post_command(self, command_name: str, response: str) -> str:
"""This method is called after the command is executed.
Args:
command_name (str): The command name.
response (str): The response.
Returns:
str: The resulting response.
"""
pass
def can_handle_chat_completion(
self, messages: Dict[Any, Any], model: str, temperature: float, max_tokens: int
) -> bool:
"""This method is called to check that the plugin can
handle the chat_completion method.
Args:
messages (List[Message]): The messages.
model (str): The model name.
temperature (float): The temperature.
max_tokens (int): The max tokens.
Returns:
bool: True if the plugin can handle the chat_completion method."""
return False
def handle_chat_completion(
self, messages: List[Message], model: str, temperature: float, max_tokens: int
) -> str:
"""This method is called when the chat completion is done.
Args:
messages (List[Message]): The messages.
model (str): The model name.
temperature (float): The temperature.
max_tokens (int): The max tokens.
Returns:
str: The resulting response.
"""
pass

View File

@ -1,11 +1,21 @@
"""Handles loading of plugins."""
import importlib
import json
import os
import zipfile
from ast import Module
import openapi_python_client
import requests
from pathlib import Path
from typing import List, Optional, Tuple
from typing import List, Tuple, Optional
from urllib.parse import urlparse
from zipimport import zipimporter
from openapi_python_client.cli import Config as OpenAPIConfig
from autogpt.config import Config
from autogpt.models.base_open_ai_plugin import BaseOpenAIPlugin
from auto_gpt_plugin_template import AutoGPTPluginTemplate
@ -31,47 +41,207 @@ def inspect_zip_for_module(zip_path: str, debug: bool = False) -> Optional[str]:
return None
def scan_plugins(plugins_path: Path, debug: bool = False) -> List[Tuple[str, Path]]:
"""Scan the plugins directory for plugins.
Args:
plugins_path (Path): Path to the plugins directory.
Returns:
List[Path]: List of plugins.
def write_dict_to_json_file(data: dict, file_path: str) -> None:
"""
plugins = []
for plugin in plugins_path.glob("*.zip"):
if module := inspect_zip_for_module(str(plugin), debug):
plugins.append((module, plugin))
Write a dictionary to a JSON file.
Args:
data (dict): Dictionary to write.
file_path (str): Path to the file.
"""
with open(file_path, 'w') as file:
json.dump(data, file, indent=4)
def fetch_openai_plugins_manifest_and_spec(cfg: Config) -> dict:
"""
Fetch the manifest for a list of OpenAI plugins.
Args:
urls (List): List of URLs to fetch.
Returns:
dict: per url dictionary of manifest and spec.
"""
# TODO add directory scan
manifests = {}
for url in cfg.plugins_openai:
openai_plugin_client_dir = f"{cfg.plugins_dir}/openai/{urlparse(url).netloc}"
create_directory_if_not_exists(openai_plugin_client_dir)
if not os.path.exists(f'{openai_plugin_client_dir}/ai-plugin.json'):
try:
response = requests.get(f"{url}/.well-known/ai-plugin.json")
if response.status_code == 200:
manifest = response.json()
if manifest["schema_version"] != "v1":
print(f"Unsupported manifest version: {manifest['schem_version']} for {url}")
continue
if manifest["api"]["type"] != "openapi":
print(f"Unsupported API type: {manifest['api']['type']} for {url}")
continue
write_dict_to_json_file(manifest, f'{openai_plugin_client_dir}/ai-plugin.json')
else:
print(f"Failed to fetch manifest for {url}: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error while requesting manifest from {url}: {e}")
else:
print(f"Manifest for {url} already exists")
manifest = json.load(open(f'{openai_plugin_client_dir}/ai-plugin.json'))
if not os.path.exists(f'{openai_plugin_client_dir}/openapi.json'):
openapi_spec = openapi_python_client._get_document(url=manifest["api"]["url"], path=None, timeout=5)
write_dict_to_json_file(openapi_spec, f'{openai_plugin_client_dir}/openapi.json')
else:
print(f"OpenAPI spec for {url} already exists")
openapi_spec = json.load(open(f'{openai_plugin_client_dir}/openapi.json'))
manifests[url] = {
'manifest': manifest,
'openapi_spec': openapi_spec
}
return manifests
def create_directory_if_not_exists(directory_path: str) -> bool:
"""
Create a directory if it does not exist.
Args:
directory_path (str): Path to the directory.
Returns:
bool: True if the directory was created, else False.
"""
if not os.path.exists(directory_path):
try:
os.makedirs(directory_path)
print(f"Created directory: {directory_path}")
return True
except OSError as e:
print(f"Error creating directory {directory_path}: {e}")
return False
else:
print(f"Directory {directory_path} already exists")
return True
def initialize_openai_plugins(manifests_specs: dict, cfg: Config, debug: bool = False) -> dict:
"""
Initialize OpenAI plugins.
Args:
manifests_specs (dict): per url dictionary of manifest and spec.
cfg (Config): Config instance including plugins config
debug (bool, optional): Enable debug logging. Defaults to False.
Returns:
dict: per url dictionary of manifest, spec and client.
"""
openai_plugins_dir = f'{cfg.plugins_dir}/openai'
if create_directory_if_not_exists(openai_plugins_dir):
for url, manifest_spec in manifests_specs.items():
openai_plugin_client_dir = f'{openai_plugins_dir}/{urlparse(url).hostname}'
_meta_option = openapi_python_client.MetaType.SETUP,
_config = OpenAPIConfig(**{
'project_name_override': 'client',
'package_name_override': 'client',
})
prev_cwd = Path.cwd()
os.chdir(openai_plugin_client_dir)
Path('ai-plugin.json')
if not os.path.exists('client'):
client_results = openapi_python_client.create_new_client(
url=manifest_spec['manifest']['api']['url'],
path=None,
meta=_meta_option,
config=_config,
)
if client_results:
print(f"Error creating OpenAPI client: {client_results[0].header} \n"
f" details: {client_results[0].detail}")
continue
spec = importlib.util.spec_from_file_location('client', 'client/client/client.py')
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
client = module.Client(base_url=url)
os.chdir(prev_cwd)
manifest_spec['client'] = client
return manifests_specs
def instantiate_openai_plugin_clients(manifests_specs_clients: dict, cfg: Config, debug: bool = False) -> dict:
"""
Instantiates BaseOpenAIPlugin instances for each OpenAI plugin.
Args:
manifests_specs_clients (dict): per url dictionary of manifest, spec and client.
cfg (Config): Config instance including plugins config
debug (bool, optional): Enable debug logging. Defaults to False.
Returns:
plugins (dict): per url dictionary of BaseOpenAIPlugin instances.
"""
plugins = {}
for url, manifest_spec_client in manifests_specs_clients.items():
plugins[url] = BaseOpenAIPlugin(manifest_spec_client)
return plugins
def load_plugins(
plugins_path: Path, debug: bool = False
) -> List[AutoGPTPluginTemplate]:
"""Load plugins from the plugins directory.
def scan_plugins(cfg: Config, debug: bool = False) -> List[AutoGPTPluginTemplate]:
"""Scan the plugins directory for plugins and loads them.
Args:
plugins_path (Path): Path to the plugins directory.
cfg (Config): Config instance including plugins config
debug (bool, optional): Enable debug logging. Defaults to False.
Returns:
List[Path]: List of plugins.
List[Tuple[str, Path]]: List of plugins.
"""
plugins = scan_plugins(plugins_path)
plugin_modules = []
for module, plugin in plugins:
plugin = Path(plugin)
module = Path(module)
if debug:
print(f"Plugin: {plugin} Module: {module}")
zipped_package = zipimporter(plugin)
zipped_module = zipped_package.load_module(str(module.parent))
for key in dir(zipped_module):
if key.startswith("__"):
continue
a_module = getattr(zipped_module, key)
a_keys = dir(a_module)
if "_abc_impl" in a_keys and a_module.__name__ != "AutoGPTPluginTemplate":
plugin_modules.append(a_module)
return plugin_modules
loaded_plugins = []
# Generic plugins
plugins_path_path = Path(cfg.plugins_dir)
for plugin in plugins_path_path.glob("*.zip"):
if module := inspect_zip_for_module(str(plugin), debug):
plugin = Path(plugin)
module = Path(module)
if debug:
print(f"Plugin: {plugin} Module: {module}")
zipped_package = zipimporter(plugin)
zipped_module = zipped_package.load_module(str(module.parent))
for key in dir(zipped_module):
if key.startswith("__"):
continue
a_module = getattr(zipped_module, key)
a_keys = dir(a_module)
if (
"_abc_impl" in a_keys
and a_module.__name__ != "AutoGPTPluginTemplate"
and blacklist_whitelist_check(a_module.__name__, cfg)
):
loaded_plugins.append(a_module())
# OpenAI plugins
if cfg.plugins_openai:
manifests_specs = fetch_openai_plugins_manifest_and_spec(cfg)
if manifests_specs.keys():
manifests_specs_clients = initialize_openai_plugins(manifests_specs, cfg, debug)
for url, openai_plugin_meta in manifests_specs_clients.items():
if blacklist_whitelist_check(url, cfg):
plugin = BaseOpenAIPlugin(openai_plugin_meta)
loaded_plugins.append(plugin)
if loaded_plugins:
print(f"\nPlugins found: {len(loaded_plugins)}\n" "--------------------")
for plugin in loaded_plugins:
print(f"{plugin._name}: {plugin._version} - {plugin._description}")
return loaded_plugins
def blacklist_whitelist_check(plugin_name: str, cfg: Config) -> bool:
"""Check if the plugin is in the whitelist or blacklist.
Args:
plugin_name (str): Name of the plugin.
cfg (Config): Config object.
Returns:
True or False
"""
if plugin_name in cfg.plugins_blacklist:
return False
if plugin_name in cfg.plugins_whitelist:
return True
ack = input(
f"WARNNG Plugin {plugin_name} found. But not in the"
" whitelist... Load? (y/n): "
)
return ack.lower() == "y"

View File

@ -40,3 +40,9 @@ pytest-benchmark
pytest-cov
pytest-integration
pytest-mock
# OpenAI and Generic plugins import
openapi-python-client==0.13.4
abstract-singleton
auto-gpt-plugin-template

View File

@ -0,0 +1,61 @@
import pytest
from typing import Any, Dict, List, Optional, Tuple
from autogpt.models.base_open_ai_plugin import BaseOpenAIPlugin, Message, PromptGenerator
class DummyPlugin(BaseOpenAIPlugin):
pass
@pytest.fixture
def dummy_plugin():
manifests_specs_clients = {
"manifest": {
"name_for_model": "Dummy",
"schema_version": "1.0",
"description_for_model": "A dummy plugin for testing purposes"
},
"client": None,
"openapi_spec": None
}
return DummyPlugin(manifests_specs_clients)
def test_dummy_plugin_inheritance(dummy_plugin):
assert isinstance(dummy_plugin, BaseOpenAIPlugin)
def test_dummy_plugin_name(dummy_plugin):
assert dummy_plugin._name == "Dummy"
def test_dummy_plugin_version(dummy_plugin):
assert dummy_plugin._version == "1.0"
def test_dummy_plugin_description(dummy_plugin):
assert dummy_plugin._description == "A dummy plugin for testing purposes"
def test_dummy_plugin_default_methods(dummy_plugin):
assert not dummy_plugin.can_handle_on_response()
assert not dummy_plugin.can_handle_post_prompt()
assert not dummy_plugin.can_handle_on_planning()
assert not dummy_plugin.can_handle_post_planning()
assert not dummy_plugin.can_handle_pre_instruction()
assert not dummy_plugin.can_handle_on_instruction()
assert not dummy_plugin.can_handle_post_instruction()
assert not dummy_plugin.can_handle_pre_command()
assert not dummy_plugin.can_handle_post_command()
assert not dummy_plugin.can_handle_chat_completion(None, None, None, None)
assert dummy_plugin.on_response(None) is None
assert dummy_plugin.post_prompt(None) is None
assert dummy_plugin.on_planning(None, None) is None
assert dummy_plugin.post_planning(None) is None
assert dummy_plugin.pre_instruction(None) is None
assert dummy_plugin.on_instruction(None) is None
assert dummy_plugin.post_instruction(None) is None
assert dummy_plugin.pre_command(None, None) is None
assert dummy_plugin.post_command(None, None) is None
assert dummy_plugin.handle_chat_completion(None, None, None, None) is None

View File

@ -0,0 +1,88 @@
import pytest
from autogpt.plugins import inspect_zip_for_module, scan_plugins, blacklist_whitelist_check
from autogpt.config import Config
PLUGINS_TEST_DIR = "tests/unit/data/test_plugins"
PLUGIN_TEST_ZIP_FILE = "Auto-GPT-Plugin-Test-master.zip"
PLUGIN_TEST_INIT_PY = "Auto-GPT-Plugin-Test-master/src/auto_gpt_vicuna/__init__.py"
PLUGIN_TEST_OPENAI = 'https://weathergpt.vercel.app/'
def test_inspect_zip_for_module():
result = inspect_zip_for_module(str(f'{PLUGINS_TEST_DIR}/{PLUGIN_TEST_ZIP_FILE}'))
assert result == PLUGIN_TEST_INIT_PY
@pytest.fixture
def mock_config_blacklist_whitelist_check():
class MockConfig:
plugins_blacklist = ["BadPlugin"]
plugins_whitelist = ["GoodPlugin"]
return MockConfig()
def test_blacklist_whitelist_check_blacklist(mock_config_blacklist_whitelist_check,
monkeypatch):
monkeypatch.setattr("builtins.input", lambda _: "y")
assert not blacklist_whitelist_check("BadPlugin", mock_config_blacklist_whitelist_check)
def test_blacklist_whitelist_check_whitelist(mock_config_blacklist_whitelist_check, monkeypatch):
monkeypatch.setattr("builtins.input", lambda _: "y")
assert blacklist_whitelist_check("GoodPlugin", mock_config_blacklist_whitelist_check)
def test_blacklist_whitelist_check_user_input_yes(mock_config_blacklist_whitelist_check, monkeypatch):
monkeypatch.setattr("builtins.input", lambda _: "y")
assert blacklist_whitelist_check("UnknownPlugin", mock_config_blacklist_whitelist_check)
def test_blacklist_whitelist_check_user_input_no(mock_config_blacklist_whitelist_check, monkeypatch):
monkeypatch.setattr("builtins.input", lambda _: "n")
assert not blacklist_whitelist_check("UnknownPlugin", mock_config_blacklist_whitelist_check)
def test_blacklist_whitelist_check_user_input_invalid(mock_config_blacklist_whitelist_check, monkeypatch):
monkeypatch.setattr("builtins.input", lambda _: "invalid")
assert not blacklist_whitelist_check("UnknownPlugin", mock_config_blacklist_whitelist_check)
@pytest.fixture
def config_with_plugins():
cfg = Config()
cfg.plugins_dir = PLUGINS_TEST_DIR
cfg.plugins_openai = ['https://weathergpt.vercel.app/']
return cfg
@pytest.fixture
def mock_config_openai_plugin():
class MockConfig:
plugins_dir = PLUGINS_TEST_DIR
plugins_openai = [PLUGIN_TEST_OPENAI]
plugins_blacklist = ["AutoGPTPVicuna"]
plugins_whitelist = [PLUGIN_TEST_OPENAI]
return MockConfig()
def test_scan_plugins_openai(mock_config_openai_plugin):
result = scan_plugins(mock_config_openai_plugin, debug=True)
assert len(result) == 1
@pytest.fixture
def mock_config_generic_plugin():
class MockConfig:
plugins_dir = PLUGINS_TEST_DIR
plugins_openai = []
plugins_blacklist = []
plugins_whitelist = ["AutoGPTPVicuna"]
return MockConfig()
def test_scan_plugins_generic(mock_config_generic_plugin):
result = scan_plugins(mock_config_generic_plugin, debug=True)
assert len(result) == 1