feat(builder): Add service-level creds access for agent blocks (#7373)

### Background

Credentials for blocks could only be defined through the block input. The scope of this change is providing system-wide that becomes the default value for these input blocks.

### Changes 🏗️

* Add system-wide credential support for agent blocks `BlockFieldSecret`.
* Update llmcall & reddit block to adopt `BlockFieldSecret`.
pull/7381/head
Zamil Majdy 2024-07-12 12:41:55 +04:00 committed by GitHub
parent 58af7f9466
commit 1089551869
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 142 additions and 82 deletions

View File

@ -2,9 +2,8 @@ import logging
from enum import Enum
import openai
from pydantic import BaseModel
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
from autogpt_server.data.block import Block, BlockOutput, BlockSchema, BlockFieldSecret
from autogpt_server.util import json
logger = logging.getLogger(__name__)
@ -14,17 +13,13 @@ class LlmModel(str, Enum):
openai_gpt4 = "gpt-4-turbo"
class LlmConfig(BaseModel):
model: LlmModel
api_key: str
class LlmCallBlock(Block):
class Input(BlockSchema):
config: LlmConfig
expected_format: dict[str, str]
prompt: str
api_key: BlockFieldSecret = BlockFieldSecret(key="openai_api_key")
sys_prompt: str = ""
usr_prompt: str = ""
expected_format: dict[str, str] = {}
model: LlmModel = LlmModel.openai_gpt4
retry: int = 3
class Output(BlockSchema):
@ -37,18 +32,15 @@ class LlmCallBlock(Block):
input_schema=LlmCallBlock.Input,
output_schema=LlmCallBlock.Output,
test_input={
"config": {
"model": "gpt-4-turbo",
"api_key": "fake-api",
},
"model": "gpt-4-turbo",
"api_key": "fake-api",
"expected_format": {
"key1": "value1",
"key2": "value2",
},
"sys_prompt": "System prompt",
"usr_prompt": "User prompt",
"prompt": "User prompt",
},
test_output=("response", {"key1": "key1Value","key2": "key2Value"}),
test_output=("response", {"key1": "key1Value", "key2": "key2Value"}),
test_mock={"llm_call": lambda *args, **kwargs: json.dumps({
"key1": "key1Value",
"key2": "key2Value",
@ -56,36 +48,40 @@ class LlmCallBlock(Block):
)
@staticmethod
def llm_call(api_key: str, model: LlmModel, prompt: list[dict]) -> str:
def llm_call(api_key: str, model: LlmModel, prompt: list[dict], json: bool) -> str:
openai.api_key = api_key
response = openai.chat.completions.create(
model=model,
messages=prompt, # type: ignore
response_format={"type": "json_object"},
response_format={"type": "json_object"} if json else None,
)
return response.choices[0].message.content or ""
def run(self, input_data: Input) -> BlockOutput:
expected_format = [f'"{k}": "{v}"' for k, v in
input_data.expected_format.items()]
format_prompt = ",\n ".join(expected_format)
sys_prompt = f"""
|{input_data.sys_prompt}
|
|Reply in json format:
|{{
| {format_prompt}
|}}
"""
usr_prompt = f"""
|{input_data.usr_prompt}
"""
prompt = []
def trim_prompt(s: str) -> str:
lines = s.strip().split("\n")
return "\n".join([line.strip().lstrip("|") for line in lines])
if input_data.sys_prompt:
prompt.append({"role": "system", "content": input_data.sys_prompt})
if input_data.expected_format:
expected_format = [f'"{k}": "{v}"' for k, v in
input_data.expected_format.items()]
format_prompt = ",\n ".join(expected_format)
sys_prompt = f"""
|Reply in json format:
|{{
| {format_prompt}
|}}
"""
prompt.append({"role": "system", "content": trim_prompt(sys_prompt)})
prompt.append({"role": "user", "content": input_data.prompt})
def parse_response(resp: str) -> tuple[dict[str, str], str | None]:
try:
parsed = json.loads(resp)
@ -96,24 +92,24 @@ class LlmCallBlock(Block):
except Exception as e:
return {}, f"JSON decode error: {e}"
prompt = [
{"role": "system", "content": trim_prompt(sys_prompt)},
{"role": "user", "content": trim_prompt(usr_prompt)},
]
logger.warning(f"LLM request: {prompt}")
retry_prompt = ""
for retry_count in range(input_data.retry):
response_text = self.llm_call(
input_data.config.api_key,
input_data.config.model,
prompt
api_key=input_data.api_key.get(),
model=input_data.model,
prompt=prompt,
json=bool(input_data.expected_format),
)
logger.warning(f"LLM attempt-{retry_count} response: {response_text}")
parsed_dict, parsed_error = parse_response(response_text)
if not parsed_error:
yield "response", {k: str(v) for k, v in parsed_dict.items()}
if input_data.expected_format:
parsed_dict, parsed_error = parse_response(response_text)
if not parsed_error:
yield "response", {k: str(v) for k, v in parsed_dict.items()}
return
else:
yield "response", {"response": response_text}
return
retry_prompt = f"""

View File

@ -7,15 +7,15 @@ from typing import Any
from pydantic import BaseModel, Field
from typing import Iterator
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
from autogpt_server.data.block import Block, BlockOutput, BlockSchema, BlockFieldSecret
from autogpt_server.util.mock import MockObject
class RedditCredentials(BaseModel):
client_id: str
client_secret: str
username: str
password: str
client_id: BlockFieldSecret = BlockFieldSecret(key="reddit_client_id")
client_secret: BlockFieldSecret = BlockFieldSecret(key="reddit_client_secret")
username: BlockFieldSecret = BlockFieldSecret(key="reddit_username")
password: BlockFieldSecret = BlockFieldSecret(key="reddit_password")
user_agent: str | None = None
@ -28,11 +28,11 @@ class RedditPost(BaseModel):
def get_praw(creds: RedditCredentials) -> praw.Reddit:
client = praw.Reddit(
client_id=creds.client_id,
client_secret=creds.client_secret,
client_id=creds.client_id.get(),
client_secret=creds.client_secret.get(),
username=creds.username.get(),
password=creds.password.get(),
user_agent=creds.user_agent,
username=creds.username,
password=creds.password,
)
me = client.user.me()
if not me:
@ -43,8 +43,11 @@ def get_praw(creds: RedditCredentials) -> praw.Reddit:
class RedditGetPostsBlock(Block):
class Input(BlockSchema):
creds: RedditCredentials = Field(description="Reddit credentials")
subreddit: str = Field(description="Subreddit name")
creds: RedditCredentials = Field(
description="Reddit credentials",
default=RedditCredentials(),
)
last_minutes: int | None = Field(
description="Post time to stop minutes ago while fetching posts",
default=None
@ -81,8 +84,8 @@ class RedditGetPostsBlock(Block):
test_output=[
("post", RedditPost(
id="id1", subreddit="subreddit", title="title1", body="body1")),
("post", RedditPost(
id="id2", subreddit="subreddit", title="title2", body="body2")),
("post", RedditPost(
id="id2", subreddit="subreddit", title="title2", body="body2")),
],
test_mock={
"get_posts": lambda _: [
@ -92,7 +95,7 @@ class RedditGetPostsBlock(Block):
]
}
)
@staticmethod
def get_posts(input_data: Input) -> Iterator[praw.reddit.Submission]:
client = get_praw(input_data.creds)
@ -101,7 +104,8 @@ class RedditGetPostsBlock(Block):
def run(self, input_data: Input) -> BlockOutput:
for post in self.get_posts(input_data):
if input_data.last_minutes and post.created_utc < datetime.now(tz=timezone.utc) - \
if input_data.last_minutes and post.created_utc < datetime.now(
tz=timezone.utc) - \
timedelta(minutes=input_data.last_minutes):
break

View File

@ -138,7 +138,7 @@ def reddit(
import requests
from autogpt_server.data.graph import Graph, Link, Node
from autogpt_server.blocks.ai import LlmConfig, LlmCallBlock, LlmModel
from autogpt_server.blocks.ai import LlmCallBlock, LlmModel
from autogpt_server.blocks.reddit import (
RedditCredentials,
RedditGetPostsBlock,
@ -153,10 +153,7 @@ def reddit(
password=password,
user_agent=user_agent,
)
openai_creds = LlmConfig(
model=LlmModel.openai_gpt4,
api_key="TODO_FILL_OUT_THIS",
)
openai_api_key = "TODO_FILL_OUT_THIS"
# Hardcoded inputs
reddit_get_post_input = {
@ -179,7 +176,7 @@ The product you are marketing is: Auto-GPT an autonomous AI agent utilizing GPT
You reply the post that you find it relevant to be replied with marketing text.
Make sure to only comment on a relevant post.
""",
"config": openai_creds,
"api_key": openai_api_key,
"expected_format": {
"post_id": "str, the reddit post id",
"is_relevant": "bool, whether the post is relevant for marketing",
@ -219,7 +216,7 @@ Make sure to only comment on a relevant post.
# Links
links = [
Link(reddit_get_post_node.id, text_formatter_node.id, "post", "named_texts"),
Link(text_formatter_node.id, llm_call_node.id, "output", "usr_prompt"),
Link(text_formatter_node.id, llm_call_node.id, "output", "prompt"),
Link(llm_call_node.id, text_matcher_node.id, "response", "data"),
Link(llm_call_node.id, text_matcher_node.id, "response_#_is_relevant", "text"),
Link(

View File

@ -4,15 +4,69 @@ from typing import Any, ClassVar, Generator, Generic, Type, TypeVar, cast
import jsonref
import jsonschema
from prisma.models import AgentBlock
from pydantic import BaseModel
from pydantic import BaseModel, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
from autogpt_server.util import json
from autogpt_server.util.settings import Secrets
BlockInput = dict[str, Any]
BlockData = tuple[str, Any]
BlockOutput = Generator[BlockData, None, None]
class BlockFieldSecret:
def __init__(self, value=None, key=None):
self._value = value or self.__get_secret(key)
if self._value is None:
raise ValueError(f"Secret {key} not found.")
STR: ClassVar[str] = "<secret>"
SECRETS: ClassVar[Secrets] = Secrets()
def __repr__(self):
return BlockFieldSecret.STR
def __str__(self):
return BlockFieldSecret.STR
@staticmethod
def __get_secret(key: str | None):
if not key or not hasattr(BlockFieldSecret.SECRETS, key):
return None
return getattr(BlockFieldSecret.SECRETS, key)
def get(self):
return str(self._value)
@classmethod
def parse_value(cls, value: Any) -> "BlockFieldSecret":
if isinstance(value, BlockFieldSecret):
return value
return BlockFieldSecret(value=value)
@classmethod
def __get_pydantic_json_schema__(
cls, source_type: Any, handler: GetCoreSchemaHandler) -> dict[str, Any]:
return {
"type": "string",
"title": "BlockFieldSecret",
"description": "A secret field",
}
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler: GetCoreSchemaHandler) -> CoreSchema:
validate_fun = core_schema.no_info_plain_validator_function(cls.parse_value)
return core_schema.json_or_python_schema(
json_schema=validate_fun,
python_schema=validate_fun,
serialization=core_schema.plain_serializer_function_ser_schema(
lambda val: BlockFieldSecret.STR
),
)
class BlockSchema(BaseModel):
cached_jsonschema: ClassVar[dict[str, Any]] = {}
@ -25,10 +79,17 @@ class BlockSchema(BaseModel):
def ref_to_dict(obj):
if isinstance(obj, dict):
# OpenAPI <3.1 does not support sibling fields that has a $ref key
# So sometimes, the schema has an "allOf"/"anyOf"/"oneOf" with 1 item.
keys = {"allOf", "anyOf", "oneOf"}
one_key = next((k for k in keys if k in obj and len(obj[k]) == 1), None)
if one_key:
obj.update(obj[one_key][0])
return {
key: ref_to_dict(value)
for key, value in obj.items()
if not key.startswith("$")
if not key.startswith("$") and key != one_key
}
elif isinstance(obj, list):
return [ref_to_dict(item) for item in obj]
@ -92,10 +153,10 @@ class EmptySchema(BlockSchema):
class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
def __init__(
self,
id: str = "",
input_schema: Type[BlockSchemaInputType] = EmptySchema,
output_schema: Type[BlockSchemaOutputType] = EmptySchema,
self,
id: str = "",
input_schema: Type[BlockSchemaInputType] = EmptySchema,
output_schema: Type[BlockSchemaOutputType] = EmptySchema,
test_input: BlockInput | list[BlockInput] | None = None,
test_output: BlockData | list[BlockData] | None = None,
test_mock: dict[str, Any] | None = None,

View File

@ -69,8 +69,13 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
"""Secrets for the server."""
database_password: str = ""
openai_api_key: str = Field(default="no_key", description="OpenAI API key")
reddit_client_id: str = Field(default="", description="Reddit client ID")
reddit_client_secret: str = Field(default="", description="Reddit client secret")
reddit_username: str = Field(default="", description="Reddit username")
reddit_password: str = Field(default="", description="Reddit password")
# Add more secret fields as needed
model_config = SettingsConfigDict(
@ -87,7 +92,7 @@ class Settings(BaseModel):
def save(self) -> None:
# Save updated config to JSON file
if self.config._updated_fields:
if self.config.updated_fields:
config_to_save = self.config.get_updates()
config_path = os.path.join(get_data_path(), "config.json")
if os.path.exists(config_path):

View File

@ -2,7 +2,7 @@ import time
from autogpt_server.data import block, db
from autogpt_server.data.graph import Graph, Link, Node, create_graph
from autogpt_server.data.execution import ExecutionStatus
from autogpt_server.blocks.ai import LlmConfig, LlmCallBlock, LlmModel
from autogpt_server.blocks.ai import LlmCallBlock, LlmModel
from autogpt_server.blocks.reddit import (
RedditCredentials,
RedditGetPostsBlock,
@ -27,10 +27,7 @@ async def create_test_graph() -> Graph:
password="TODO_FILL_OUT_THIS",
user_agent="TODO_FILL_OUT_THIS",
)
openai_creds = LlmConfig(
model=LlmModel.openai_gpt4,
api_key="TODO_FILL_OUT_THIS",
)
openai_api_key = "TODO_FILL_OUT_THIS"
# Hardcoded inputs
reddit_get_post_input = {
@ -53,7 +50,7 @@ The product you are marketing is: Auto-GPT an autonomous AI agent utilizing GPT
You reply the post that you find it relevant to be replied with marketing text.
Make sure to only comment on a relevant post.
""",
"config": openai_creds,
"api_key": openai_api_key,
"expected_format": {
"post_id": "str, the reddit post id",
"is_relevant": "bool, whether the post is relevant for marketing",
@ -96,7 +93,7 @@ Make sure to only comment on a relevant post.
# Links
links = [
Link(reddit_get_post_node.id, text_formatter_node.id, "post", "named_texts"),
Link(text_formatter_node.id, llm_call_node.id, "output", "usr_prompt"),
Link(text_formatter_node.id, llm_call_node.id, "output", "prompt"),
Link(llm_call_node.id, text_matcher_node.id, "response", "data"),
Link(llm_call_node.id, text_matcher_node.id, "response_#_is_relevant", "text"),
Link(text_matcher_node.id, reddit_comment_node.id, "positive_#_post_id",