442 lines
17 KiB
Python
442 lines
17 KiB
Python
import json
|
|
import random
|
|
import commands as cmd
|
|
import utils
|
|
from memory import get_memory, get_supported_memory_backends
|
|
import chat
|
|
from colorama import Fore, Style
|
|
from spinner import Spinner
|
|
import time
|
|
import speak
|
|
from config import Config
|
|
from json_parser import fix_and_parse_json
|
|
from ai_config import AIConfig
|
|
import traceback
|
|
import yaml
|
|
import argparse
|
|
from logger import logger
|
|
import logging
|
|
from prompt import get_prompt
|
|
|
|
cfg = Config()
|
|
|
|
|
|
def check_openai_api_key():
|
|
"""Check if the OpenAI API key is set in config.py or as an environment variable."""
|
|
if not cfg.openai_api_key:
|
|
print(
|
|
Fore.RED +
|
|
"Please set your OpenAI API key in .env or as an environment variable."
|
|
)
|
|
print("You can get your key from https://beta.openai.com/account/api-keys")
|
|
exit(1)
|
|
|
|
|
|
def attempt_to_fix_json_by_finding_outermost_brackets(json_string):
|
|
if cfg.speak_mode and cfg.debug_mode:
|
|
speak.say_text("I have received an invalid JSON response from the OpenAI API. Trying to fix it now.")
|
|
logger.typewriter_log("Attempting to fix JSON by finding outermost brackets\n")
|
|
|
|
try:
|
|
# Use regex to search for JSON objects
|
|
import regex
|
|
json_pattern = regex.compile(r"\{(?:[^{}]|(?R))*\}")
|
|
json_match = json_pattern.search(json_string)
|
|
|
|
if json_match:
|
|
# Extract the valid JSON object from the string
|
|
json_string = json_match.group(0)
|
|
logger.typewriter_log(title="Apparently json was fixed.", title_color=Fore.GREEN)
|
|
if cfg.speak_mode and cfg.debug_mode:
|
|
speak.say_text("Apparently json was fixed.")
|
|
else:
|
|
raise ValueError("No valid JSON object found")
|
|
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
if cfg.speak_mode:
|
|
speak.say_text("Didn't work. I will have to ignore this response then.")
|
|
logger.error("Error: Invalid JSON, setting it to empty JSON now.\n")
|
|
json_string = {}
|
|
|
|
return json_string
|
|
|
|
|
|
def print_assistant_thoughts(assistant_reply):
|
|
"""Prints the assistant's thoughts to the console"""
|
|
global ai_name
|
|
global cfg
|
|
try:
|
|
try:
|
|
# Parse and print Assistant response
|
|
assistant_reply_json = fix_and_parse_json(assistant_reply)
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Error: Invalid JSON in assistant thoughts\n", assistant_reply)
|
|
assistant_reply_json = attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply)
|
|
assistant_reply_json = fix_and_parse_json(assistant_reply_json)
|
|
|
|
# Check if assistant_reply_json is a string and attempt to parse it into a JSON object
|
|
if isinstance(assistant_reply_json, str):
|
|
try:
|
|
assistant_reply_json = json.loads(assistant_reply_json)
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Error: Invalid JSON\n", assistant_reply)
|
|
assistant_reply_json = attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply_json)
|
|
|
|
assistant_thoughts_reasoning = None
|
|
assistant_thoughts_plan = None
|
|
assistant_thoughts_speak = None
|
|
assistant_thoughts_criticism = None
|
|
assistant_thoughts = assistant_reply_json.get("thoughts", {})
|
|
assistant_thoughts_text = assistant_thoughts.get("text")
|
|
|
|
if assistant_thoughts:
|
|
assistant_thoughts_reasoning = assistant_thoughts.get("reasoning")
|
|
assistant_thoughts_plan = assistant_thoughts.get("plan")
|
|
assistant_thoughts_criticism = assistant_thoughts.get("criticism")
|
|
assistant_thoughts_speak = assistant_thoughts.get("speak")
|
|
|
|
logger.typewriter_log(f"{ai_name.upper()} THOUGHTS:", Fore.YELLOW, assistant_thoughts_text)
|
|
logger.typewriter_log("REASONING:", Fore.YELLOW, assistant_thoughts_reasoning)
|
|
|
|
if assistant_thoughts_plan:
|
|
logger.typewriter_log("PLAN:", Fore.YELLOW, "")
|
|
# If it's a list, join it into a string
|
|
if isinstance(assistant_thoughts_plan, list):
|
|
assistant_thoughts_plan = "\n".join(assistant_thoughts_plan)
|
|
elif isinstance(assistant_thoughts_plan, dict):
|
|
assistant_thoughts_plan = str(assistant_thoughts_plan)
|
|
|
|
# Split the input_string using the newline character and dashes
|
|
lines = assistant_thoughts_plan.split('\n')
|
|
for line in lines:
|
|
line = line.lstrip("- ")
|
|
logger.typewriter_log("- ", Fore.GREEN, line.strip())
|
|
|
|
logger.typewriter_log("CRITICISM:", Fore.YELLOW, assistant_thoughts_criticism)
|
|
# Speak the assistant's thoughts
|
|
if cfg.speak_mode and assistant_thoughts_speak:
|
|
speak.say_text(assistant_thoughts_speak)
|
|
|
|
return assistant_reply_json
|
|
except json.decoder.JSONDecodeError as e:
|
|
logger.error("Error: Invalid JSON\n", assistant_reply)
|
|
if cfg.speak_mode:
|
|
speak.say_text("I have received an invalid JSON response from the OpenAI API. I cannot ignore this response.")
|
|
|
|
# All other errors, return "Error: + error message"
|
|
except Exception as e:
|
|
call_stack = traceback.format_exc()
|
|
logger.error("Error: \n", call_stack)
|
|
|
|
|
|
def construct_prompt():
|
|
"""Construct the prompt for the AI to respond to"""
|
|
config = AIConfig.load()
|
|
if config.ai_name:
|
|
logger.typewriter_log(
|
|
f"Welcome back! ",
|
|
Fore.GREEN,
|
|
f"Would you like me to return to being {config.ai_name}?",
|
|
speak_text=True)
|
|
should_continue = utils.clean_input(f"""Continue with the last settings?
|
|
Name: {config.ai_name}
|
|
Role: {config.ai_role}
|
|
Goals: {config.ai_goals}
|
|
Continue (y/n): """)
|
|
if should_continue.lower() == "n":
|
|
config = AIConfig()
|
|
|
|
if not config.ai_name:
|
|
config = prompt_user()
|
|
config.save()
|
|
|
|
# Get rid of this global:
|
|
global ai_name
|
|
ai_name = config.ai_name
|
|
|
|
full_prompt = config.construct_full_prompt()
|
|
return full_prompt
|
|
|
|
|
|
def prompt_user():
|
|
"""Prompt the user for input"""
|
|
ai_name = ""
|
|
# Construct the prompt
|
|
logger.typewriter_log(
|
|
"Welcome to Auto-GPT! ",
|
|
Fore.GREEN,
|
|
"Enter the name of your AI and its role below. Entering nothing will load defaults.",
|
|
speak_text=True)
|
|
|
|
# Get AI Name from User
|
|
logger.typewriter_log(
|
|
"Name your AI: ",
|
|
Fore.GREEN,
|
|
"For example, 'Entrepreneur-GPT'")
|
|
ai_name = utils.clean_input("AI Name: ")
|
|
if ai_name == "":
|
|
ai_name = "Entrepreneur-GPT"
|
|
|
|
logger.typewriter_log(
|
|
f"{ai_name} here!",
|
|
Fore.LIGHTBLUE_EX,
|
|
"I am at your service.",
|
|
speak_text=True)
|
|
|
|
# Get AI Role from User
|
|
logger.typewriter_log(
|
|
"Describe your AI's role: ",
|
|
Fore.GREEN,
|
|
"For example, 'an AI designed to autonomously develop and run businesses with the sole goal of increasing your net worth.'")
|
|
ai_role = utils.clean_input(f"{ai_name} is: ")
|
|
if ai_role == "":
|
|
ai_role = "an AI designed to autonomously develop and run businesses with the sole goal of increasing your net worth."
|
|
|
|
# Enter up to 5 goals for the AI
|
|
logger.typewriter_log(
|
|
"Enter up to 5 goals for your AI: ",
|
|
Fore.GREEN,
|
|
"For example: \nIncrease net worth, Grow Twitter Account, Develop and manage multiple businesses autonomously'")
|
|
print("Enter nothing to load defaults, enter nothing when finished.", flush=True)
|
|
ai_goals = []
|
|
for i in range(5):
|
|
ai_goal = utils.clean_input(f"{Fore.LIGHTBLUE_EX}Goal{Style.RESET_ALL} {i+1}: ")
|
|
if ai_goal == "":
|
|
break
|
|
ai_goals.append(ai_goal)
|
|
if len(ai_goals) == 0:
|
|
ai_goals = ["Increase net worth", "Grow Twitter Account",
|
|
"Develop and manage multiple businesses autonomously"]
|
|
|
|
config = AIConfig(ai_name, ai_role, ai_goals)
|
|
return config
|
|
|
|
|
|
def parse_arguments():
|
|
"""Parses the arguments passed to the script"""
|
|
global cfg
|
|
cfg.set_debug_mode(False)
|
|
cfg.set_continuous_mode(False)
|
|
cfg.set_speak_mode(False)
|
|
|
|
parser = argparse.ArgumentParser(description='Process arguments.')
|
|
parser.add_argument('--continuous', action='store_true', help='Enable Continuous Mode')
|
|
parser.add_argument('--continuous-limit', '-l', type=int, dest="continuous_limit", help='Defines the number of times to run in continuous mode')
|
|
parser.add_argument('--speak', action='store_true', help='Enable Speak Mode')
|
|
parser.add_argument('--debug', action='store_true', help='Enable Debug Mode')
|
|
parser.add_argument('--gpt3only', action='store_true', help='Enable GPT3.5 Only Mode')
|
|
parser.add_argument('--gpt4only', action='store_true', help='Enable GPT4 Only Mode')
|
|
parser.add_argument('--use-memory', '-m', dest="memory_type", help='Defines which Memory backend to use')
|
|
args = parser.parse_args()
|
|
|
|
if args.debug:
|
|
logger.typewriter_log("Debug Mode: ", Fore.GREEN, "ENABLED")
|
|
cfg.set_debug_mode(True)
|
|
|
|
if args.continuous:
|
|
logger.typewriter_log("Continuous Mode: ", Fore.RED, "ENABLED")
|
|
logger.typewriter_log(
|
|
"WARNING: ",
|
|
Fore.RED,
|
|
"Continuous mode is not recommended. It is potentially dangerous and may cause your AI to run forever or carry out actions you would not usually authorise. Use at your own risk.")
|
|
cfg.set_continuous_mode(True)
|
|
|
|
if args.continuous_limit:
|
|
logger.typewriter_log(
|
|
"Continuous Limit: ",
|
|
Fore.GREEN,
|
|
f"{args.continuous_limit}")
|
|
cfg.set_continuous_limit(args.continuous_limit)
|
|
|
|
# Check if continuous limit is used without continuous mode
|
|
if args.continuous_limit and not args.continuous:
|
|
parser.error("--continuous-limit can only be used with --continuous")
|
|
|
|
if args.speak:
|
|
logger.typewriter_log("Speak Mode: ", Fore.GREEN, "ENABLED")
|
|
cfg.set_speak_mode(True)
|
|
|
|
if args.gpt3only:
|
|
logger.typewriter_log("GPT3.5 Only Mode: ", Fore.GREEN, "ENABLED")
|
|
cfg.set_smart_llm_model(cfg.fast_llm_model)
|
|
|
|
if args.gpt4only:
|
|
logger.typewriter_log("GPT4 Only Mode: ", Fore.GREEN, "ENABLED")
|
|
cfg.set_fast_llm_model(cfg.smart_llm_model)
|
|
|
|
if args.memory_type:
|
|
supported_memory = get_supported_memory_backends()
|
|
chosen = args.memory_type
|
|
if not chosen in supported_memory:
|
|
logger.typewriter_log("ONLY THE FOLLOWING MEMORY BACKENDS ARE SUPPORTED: ", Fore.RED, f'{supported_memory}')
|
|
logger.typewriter_log(f"Defaulting to: ", Fore.YELLOW, cfg.memory_backend)
|
|
else:
|
|
cfg.memory_backend = chosen
|
|
|
|
|
|
def main():
|
|
global ai_name, memory
|
|
# TODO: fill in llm values here
|
|
check_openai_api_key()
|
|
parse_arguments()
|
|
logger.set_level(logging.DEBUG if cfg.debug_mode else logging.INFO)
|
|
ai_name = ""
|
|
prompt = construct_prompt()
|
|
# print(prompt)
|
|
# Initialize variables
|
|
full_message_history = []
|
|
result = None
|
|
next_action_count = 0
|
|
# Make a constant:
|
|
user_input = "Determine which next command to use, and respond using the format specified above:"
|
|
# Initialize memory and make sure it is empty.
|
|
# this is particularly important for indexing and referencing pinecone memory
|
|
memory = get_memory(cfg, init=True)
|
|
print('Using memory of type: ' + memory.__class__.__name__)
|
|
agent = Agent(
|
|
ai_name=ai_name,
|
|
memory=memory,
|
|
full_message_history=full_message_history,
|
|
next_action_count=next_action_count,
|
|
prompt=prompt,
|
|
user_input=user_input
|
|
)
|
|
agent.start_interaction_loop()
|
|
|
|
|
|
class Agent:
|
|
"""Agent class for interacting with Auto-GPT.
|
|
|
|
Attributes:
|
|
ai_name: The name of the agent.
|
|
memory: The memory object to use.
|
|
full_message_history: The full message history.
|
|
next_action_count: The number of actions to execute.
|
|
prompt: The prompt to use.
|
|
user_input: The user input.
|
|
|
|
"""
|
|
def __init__(self,
|
|
ai_name,
|
|
memory,
|
|
full_message_history,
|
|
next_action_count,
|
|
prompt,
|
|
user_input):
|
|
self.ai_name = ai_name
|
|
self.memory = memory
|
|
self.full_message_history = full_message_history
|
|
self.next_action_count = next_action_count
|
|
self.prompt = prompt
|
|
self.user_input = user_input
|
|
|
|
def start_interaction_loop(self):
|
|
# Interaction Loop
|
|
loop_count = 0
|
|
while True:
|
|
# Discontinue if continuous limit is reached
|
|
loop_count += 1
|
|
if cfg.continuous_mode and cfg.continuous_limit > 0 and loop_count > cfg.continuous_limit:
|
|
logger.typewriter_log("Continuous Limit Reached: ", Fore.YELLOW, f"{cfg.continuous_limit}")
|
|
break
|
|
|
|
# Send message to AI, get response
|
|
with Spinner("Thinking... "):
|
|
assistant_reply = chat.chat_with_ai(
|
|
self.prompt,
|
|
self.user_input,
|
|
self.full_message_history,
|
|
self.memory,
|
|
cfg.fast_token_limit) # TODO: This hardcodes the model to use GPT3.5. Make this an argument
|
|
|
|
# Print Assistant thoughts
|
|
print_assistant_thoughts(assistant_reply)
|
|
|
|
# Get command name and arguments
|
|
try:
|
|
command_name, arguments = cmd.get_command(
|
|
attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply))
|
|
if cfg.speak_mode:
|
|
speak.say_text(f"I want to execute {command_name}")
|
|
except Exception as e:
|
|
logger.error("Error: \n", str(e))
|
|
|
|
if not cfg.continuous_mode and self.next_action_count == 0:
|
|
### GET USER AUTHORIZATION TO EXECUTE COMMAND ###
|
|
# Get key press: Prompt the user to press enter to continue or escape
|
|
# to exit
|
|
self.user_input = ""
|
|
logger.typewriter_log(
|
|
"NEXT ACTION: ",
|
|
Fore.CYAN,
|
|
f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}")
|
|
print(
|
|
f"Enter 'y' to authorise command, 'y -N' to run N continuous commands, 'n' to exit program, or enter feedback for {self.ai_name}...",
|
|
flush=True)
|
|
while True:
|
|
console_input = utils.clean_input(Fore.MAGENTA + "Input:" + Style.RESET_ALL)
|
|
if console_input.lower().rstrip() == "y":
|
|
self.user_input = "GENERATE NEXT COMMAND JSON"
|
|
break
|
|
elif console_input.lower().startswith("y -"):
|
|
try:
|
|
self.next_action_count = abs(int(console_input.split(" ")[1]))
|
|
self.user_input = "GENERATE NEXT COMMAND JSON"
|
|
except ValueError:
|
|
print("Invalid input format. Please enter 'y -n' where n is the number of continuous tasks.")
|
|
continue
|
|
break
|
|
elif console_input.lower() == "n":
|
|
self.user_input = "EXIT"
|
|
break
|
|
else:
|
|
self.user_input = console_input
|
|
command_name = "human_feedback"
|
|
break
|
|
|
|
if self.user_input == "GENERATE NEXT COMMAND JSON":
|
|
logger.typewriter_log(
|
|
"-=-=-=-=-=-=-= COMMAND AUTHORISED BY USER -=-=-=-=-=-=-=",
|
|
Fore.MAGENTA,
|
|
"")
|
|
elif self.user_input == "EXIT":
|
|
print("Exiting...", flush=True)
|
|
break
|
|
else:
|
|
# Print command
|
|
logger.typewriter_log(
|
|
"NEXT ACTION: ",
|
|
Fore.CYAN,
|
|
f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}")
|
|
|
|
# Execute command
|
|
if command_name is not None and command_name.lower().startswith("error"):
|
|
result = f"Command {command_name} threw the following error: " + arguments
|
|
elif command_name == "human_feedback":
|
|
result = f"Human feedback: {self.user_input}"
|
|
else:
|
|
result = f"Command {command_name} returned: {cmd.execute_command(command_name, arguments)}"
|
|
if self.next_action_count > 0:
|
|
self.next_action_count -= 1
|
|
|
|
memory_to_add = f"Assistant Reply: {assistant_reply} " \
|
|
f"\nResult: {result} " \
|
|
f"\nHuman Feedback: {self.user_input} "
|
|
|
|
self.memory.add(memory_to_add)
|
|
|
|
# Check if there's a result from the command append it to the message
|
|
# history
|
|
if result is not None:
|
|
self.full_message_history.append(chat.create_chat_message("system", result))
|
|
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result)
|
|
else:
|
|
self.full_message_history.append(
|
|
chat.create_chat_message(
|
|
"system", "Unable to execute command"))
|
|
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, "Unable to execute command")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|