Tides up codebase.

Extracts python functions to relevant files.
pull/10/head
Torantulino 2023-03-28 23:25:42 +01:00
parent 375699e144
commit fc6c7bd8c4
4 changed files with 180 additions and 169 deletions

89
AutonomousAI/browse.py Normal file
View File

@ -0,0 +1,89 @@
from googlesearch import search
import requests
from bs4 import BeautifulSoup
from readability import Document#
import openai
def scrape_text(url):
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
def scrape_main_content(url):
response = requests.get(url)
# Try using Readability
doc = Document(response.text)
content = doc.summary()
soup = BeautifulSoup(content, "html.parser")
text = soup.get_text('\n', strip=True)
# Check if Readability provided a satisfactory result (e.g., a minimum length)
# min_length = 50
# if len(text) < min_length:
# # Fallback to the custom function
# text = scrape_main_content_custom(response.text)
return text
def split_text(text, max_length=8192):
paragraphs = text.split("\n")
current_length = 0
current_chunk = []
for paragraph in paragraphs:
if current_length + len(paragraph) + 1 <= max_length:
current_chunk.append(paragraph)
current_length += len(paragraph) + 1
else:
yield "\n".join(current_chunk)
current_chunk = [paragraph]
current_length = len(paragraph) + 1
if current_chunk:
yield "\n".join(current_chunk)
def summarize_text(text):
if text == "":
return "Error: No text to summarize"
print("Text length: " + str(len(text)) + " characters")
summaries = []
chunks = list(split_text(text))
for i, chunk in enumerate(chunks):
print("Summarizing chunk " + str(i) + " / " + str(len(chunks)))
messages = [{"role": "user", "content": "Please summarize the following text, focusing on extracting concise knowledge: " + chunk},]
response= openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=300,
)
summary = response.choices[0].message.content
summaries.append(summary)
print("Summarized " + str(len(chunks)) + " chunks.")
combined_summary = "\n".join(summaries)
# Summarize the combined summary
messages = [{"role": "user", "content": "Please summarize the following text, focusing on extracting concise knowledge: " + combined_summary},]
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=300,
)
final_summary = response.choices[0].message.content
return final_summary

76
AutonomousAI/commands.py Normal file
View File

@ -0,0 +1,76 @@
import browse
import json
import memory as mem
### Implemented Commands: ###
def google_search(query, num_results = 3):
search_results = []
for j in browse.search(query, num_results=num_results):
search_results.append(j)
return json.dumps(search_results, ensure_ascii=False, indent=4)
def transcribe_summarise(url):
text = mem.scrape_main_content(url)
summary = mem.summarize_text(text)
return """ "Result" : """ + summary
def check_news(source):
print("Checking news from BBC world instead of " + source)
_text= transcribe_summarise("https://www.bbc.com/news/world")
return _text
def commit_memory(string):
_text = "Committing memory with string " + string
mem.permanent_memory.append(string)
print(_text)
return _text
def delete_memory(key):
if key >= 0 and key < len(mem.permanent_memory):
_text = "Deleting memory with key " + str(key)
del mem.permanent_memory[key]
print(_text)
return _text
else:
print("Invalid key, cannot delete memory.")
return None
def overwrite_memory(key, string):
if key >= 0 and key < len(mem.permanent_memory):
_text = "Overwriting memory with key " + str(key) + " and string " + string
mem.permanent_memory[key] = string
print(_text)
return _text
else:
print("Invalid key, cannot overwrite memory.")
return None
### TODO: Not Yet Implemented: ###
def start_instance(name, prompt):
_text = "Starting instance with name " + name + " and prompt " + prompt
print(_text)
return "Command not implemented yet."
def manage_instances(action):
_text = "Managing instances with action " + action
print(_text)
return _text
def navigate_website(action, username):
_text = "Navigating website with action " + action + " and username " + username
print(_text)
return "Command not implemented yet."
def register_account(username, website):
_text = "Registering account with username " + username + " and website " + website
print(_text)
return "Command not implemented yet."
def check_notifications(website):
_text = "Checking notifications from " + website
print(_text)
return "Command not implemented yet."

View File

@ -1,11 +1,9 @@
import datetime
import openai
import json
from googlesearch import search
import requests
from bs4 import BeautifulSoup
import keys
from readability import Document
import commands as cmd
import memory as mem
# Initialize the OpenAI API client
openai.api_key = keys.OPENAI_API_KEY
@ -13,90 +11,6 @@ openai.api_key = keys.OPENAI_API_KEY
def get_datetime():
return "Current date and time: " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def scrape_text(url):
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
def scrape_main_content(url):
response = requests.get(url)
# Try using Readability
doc = Document(response.text)
content = doc.summary()
soup = BeautifulSoup(content, "html.parser")
text = soup.get_text('\n', strip=True)
# Check if Readability provided a satisfactory result (e.g., a minimum length)
# min_length = 50
# if len(text) < min_length:
# # Fallback to the custom function
# text = scrape_main_content_custom(response.text)
return text
def split_text(text, max_length=8192):
paragraphs = text.split("\n")
current_length = 0
current_chunk = []
for paragraph in paragraphs:
if current_length + len(paragraph) + 1 <= max_length:
current_chunk.append(paragraph)
current_length += len(paragraph) + 1
else:
yield "\n".join(current_chunk)
current_chunk = [paragraph]
current_length = len(paragraph) + 1
if current_chunk:
yield "\n".join(current_chunk)
def summarize_text(text):
if text == "":
return "Error: No text to summarize"
print("Text length: " + str(len(text)) + " characters")
summaries = []
chunks = list(split_text(text))
for i, chunk in enumerate(chunks):
print("Summarizing chunk " + str(i) + " / " + str(len(chunks)))
messages = [{"role": "user", "content": "Please summarize the following text, focusing on extracting concise knowledge: " + chunk},]
response= openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=300,
)
summary = response.choices[0].message.content
summaries.append(summary)
print("Summarized " + str(len(chunks)) + " chunks.")
combined_summary = "\n".join(summaries)
# Summarize the combined summary
messages = [{"role": "user", "content": "Please summarize the following text, focusing on extracting concise knowledge: " + combined_summary},]
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=300,
)
final_summary = response.choices[0].message.content
return final_summary
def create_chat_message(role, content):
"""
Create a chat message with the given role and content.
@ -154,27 +68,27 @@ def execute_command(response):
arguments = command["args"]
if command_name == "google":
return google_search(arguments["input"])
return cmd.google_search(arguments["input"])
elif command_name == "check_news":
return check_news(arguments["source"])
return cmd.check_news(arguments["source"])
elif command_name == "check_notifications":
return check_notifications(arguments["website"])
return cmd.check_notifications(arguments["website"])
elif command_name == "memory_add":
return commit_memory(arguments["string"])
return cmd.commit_memory(arguments["string"])
elif command_name == "memory_del":
return delete_memory(arguments["key"])
return cmd.delete_memory(arguments["key"])
elif command_name == "memory_ovr":
return overwrite_memory(arguments["key"], arguments["string"])
return cmd.overwrite_memory(arguments["key"], arguments["string"])
elif command_name == "start_instance":
return start_instance(arguments["name"], arguments["prompt"])
return cmd.start_instance(arguments["name"], arguments["prompt"])
elif command_name == "manage_instances":
return manage_instances(arguments["action"])
return cmd.manage_instances(arguments["action"])
elif command_name == "navigate_website":
return navigate_website(arguments["action"], arguments["username"])
return cmd.navigate_website(arguments["action"], arguments["username"])
elif command_name == "register_account":
return register_account(arguments["username"], arguments["website"])
return cmd.register_account(arguments["username"], arguments["website"])
elif command_name == "transcribe_summarise":
return transcribe_summarise(arguments["url"])
return cmd.transcribe_summarise(arguments["url"])
else:
return f"unknown command {command_name}"
except json.decoder.JSONDecodeError:
@ -183,77 +97,8 @@ def execute_command(response):
except Exception as e:
return "Error: " + str(e)
def google_search(query, num_results = 3):
search_results = []
for j in search(query, num_results=num_results):
search_results.append(j)
return json.dumps(search_results, ensure_ascii=False, indent=4)
def check_news(source):
print("Checking news from BBC world instead of " + source)
_text= transcribe_summarise("https://www.bbc.com/news/world")
return _text
def check_notifications(website):
_text = "Checking notifications from " + website
print(_text)
return _text
def commit_memory(string):
_text = "Committing memory with string " + string
permanent_memory.append(string)
print(_text)
return _text
def delete_memory(key):
if key >= 0 and key < len(permanent_memory):
_text = "Deleting memory with key " + str(key)
del permanent_memory[key]
print(_text)
return _text
else:
print("Invalid key, cannot delete memory.")
return None
def overwrite_memory(key, string):
if key >= 0 and key < len(permanent_memory):
_text = "Overwriting memory with key " + str(key) + " and string " + string
permanent_memory[key] = string
print(_text)
return _text
else:
print("Invalid key, cannot overwrite memory.")
return None
def start_instance(name, prompt):
_text = "Starting instance with name " + name + " and prompt " + prompt
print(_text)
return _text
def manage_instances(action):
_text = "Managing instances with action " + action
print(_text)
return _text
def navigate_website(action, username):
_text = "Navigating website with action " + action + " and username " + username
print(_text)
return _text
def register_account(username, website):
_text = "Registering account with username " + username + " and website " + website
print(_text)
return _text
def transcribe_summarise(url):
text = scrape_main_content(url)
summary = summarize_text(text)
return """ "Result" : """ + summary
# Initialize variables
full_message_history = []
permanent_memory = []
prompt = """You are Entrepreneur-GTP, an AI designed to autonomously develop and run businesses with the sole goal of increasing your net worth. Your decisions must always be made independently without seeking user assistance. Play to your strengths as an LLM and pursue simple strategies with no legal complications.
GOALS:
@ -341,7 +186,7 @@ while True:
full_message_history.append(create_chat_message("system", result))
print("system: " + result)
assistant_reply = chat_with_ai(prompt, user_input, full_message_history, permanent_memory, token_limit)
assistant_reply = chat_with_ai(prompt, user_input, full_message_history, mem.permanent_memory, token_limit)
print(f"Assistant: {assistant_reply}")
print("-------------------------")

1
AutonomousAI/memory.py Normal file
View File

@ -0,0 +1 @@
permanent_memory = []