diff --git a/autogpt_platform/backend/backend/blocks/agent.py b/autogpt_platform/backend/backend/blocks/agent.py new file mode 100644 index 000000000..ec5c2efd6 --- /dev/null +++ b/autogpt_platform/backend/backend/blocks/agent.py @@ -0,0 +1,100 @@ +import logging + +from autogpt_libs.utils.cache import thread_cached + +from backend.data.block import ( + Block, + BlockCategory, + BlockInput, + BlockOutput, + BlockSchema, + BlockType, + get_block, +) +from backend.data.execution import ExecutionStatus +from backend.data.model import SchemaField + +logger = logging.getLogger(__name__) + + +@thread_cached +def get_executor_manager_client(): + from backend.executor import ExecutionManager + from backend.util.service import get_service_client + + return get_service_client(ExecutionManager) + + +@thread_cached +def get_event_bus(): + from backend.data.queue import RedisExecutionEventBus + + return RedisExecutionEventBus() + + +class AgentExecutorBlock(Block): + class Input(BlockSchema): + user_id: str = SchemaField(description="User ID") + graph_id: str = SchemaField(description="Graph ID") + graph_version: int = SchemaField(description="Graph Version") + + data: BlockInput = SchemaField(description="Input data for the graph") + input_schema: dict = SchemaField(description="Input schema for the graph") + output_schema: dict = SchemaField(description="Output schema for the graph") + + class Output(BlockSchema): + pass + + def __init__(self): + super().__init__( + id="e189baac-8c20-45a1-94a7-55177ea42565", + description="Executes an existing agent inside your agent", + input_schema=AgentExecutorBlock.Input, + output_schema=AgentExecutorBlock.Output, + block_type=BlockType.AGENT, + categories={BlockCategory.AGENT}, + ) + + def run(self, input_data: Input, **kwargs) -> BlockOutput: + executor_manager = get_executor_manager_client() + event_bus = get_event_bus() + + graph_exec = executor_manager.add_execution( + graph_id=input_data.graph_id, + graph_version=input_data.graph_version, + user_id=input_data.user_id, + data=input_data.data, + ) + log_id = f"Graph #{input_data.graph_id}-V{input_data.graph_version}, exec-id: {graph_exec.graph_exec_id}" + logger.info(f"Starting execution of {log_id}") + + for event in event_bus.listen( + graph_id=graph_exec.graph_id, graph_exec_id=graph_exec.graph_exec_id + ): + logger.info( + f"Execution {log_id} produced input {event.input_data} output {event.output_data}" + ) + + if not event.node_id: + if event.status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED]: + logger.info(f"Execution {log_id} ended with status {event.status}") + break + else: + continue + + if not event.block_id: + logger.warning(f"{log_id} received event without block_id {event}") + continue + + block = get_block(event.block_id) + if not block or block.block_type != BlockType.OUTPUT: + continue + + output_name = event.input_data.get("name") + if not output_name: + logger.warning(f"{log_id} produced an output with no name {event}") + continue + + for output_data in event.output_data.get("output", []): + logger.info(f"Execution {log_id} produced {output_name}: {output_data}") + yield output_name, output_data diff --git a/autogpt_platform/backend/backend/blocks/basic.py b/autogpt_platform/backend/backend/blocks/basic.py index 6e8a2906d..fcfd94f68 100644 --- a/autogpt_platform/backend/backend/blocks/basic.py +++ b/autogpt_platform/backend/backend/blocks/basic.py @@ -233,7 +233,9 @@ class AgentOutputBlock(Block): ) name: str = SchemaField(description="The name of the output.") title: str | None = SchemaField( - description="The title of the input.", default=None, advanced=True + description="The title of the output.", + default=None, + advanced=True, ) description: str | None = SchemaField( description="The description of the output.", @@ -262,7 +264,7 @@ class AgentOutputBlock(Block): def __init__(self): super().__init__( id="363ae599-353e-4804-937e-b2ee3cef3da4", - description=("Stores the output of the graph for users to see."), + description="Stores the output of the graph for users to see.", input_schema=AgentOutputBlock.Input, output_schema=AgentOutputBlock.Output, test_input=[ diff --git a/autogpt_platform/backend/backend/data/block.py b/autogpt_platform/backend/backend/data/block.py index e89013b3b..05e20ea0b 100644 --- a/autogpt_platform/backend/backend/data/block.py +++ b/autogpt_platform/backend/backend/data/block.py @@ -34,6 +34,7 @@ class BlockType(Enum): INPUT = "Input" OUTPUT = "Output" NOTE = "Note" + AGENT = "Agent" class BlockCategory(Enum): @@ -48,6 +49,7 @@ class BlockCategory(Enum): COMMUNICATION = "Block that interacts with communication platforms." DEVELOPER_TOOLS = "Developer tools such as GitHub blocks." DATA = "Block that interacts with structured data." + AGENT = "Block that interacts with other agents." def dict(self) -> dict[str, str]: return {"category": self.name, "description": self.value} @@ -299,7 +301,9 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]): ): if output_name == "error": raise RuntimeError(output_data) - if error := self.output_schema.validate_field(output_name, output_data): + if self.block_type == BlockType.STANDARD and ( + error := self.output_schema.validate_field(output_name, output_data) + ): raise ValueError(f"Block produced an invalid output data: {error}") yield output_name, output_data diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 5b7d34a3b..18ee9946a 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -64,6 +64,7 @@ class ExecutionResult(BaseModel): graph_exec_id: str node_exec_id: str node_id: str + block_id: str status: ExecutionStatus input_data: BlockInput output_data: CompletedBlockOutput @@ -72,6 +73,26 @@ class ExecutionResult(BaseModel): start_time: datetime | None end_time: datetime | None + @staticmethod + def from_graph(graph: AgentGraphExecution): + return ExecutionResult( + graph_id=graph.agentGraphId, + graph_version=graph.agentGraphVersion, + graph_exec_id=graph.id, + node_exec_id="", + node_id="", + block_id="", + status=graph.executionStatus, + # TODO: Populate input_data & output_data from AgentNodeExecutions + # Input & Output comes AgentInputBlock & AgentOutputBlock. + input_data={}, + output_data={}, + add_time=graph.createdAt, + queue_time=graph.createdAt, + start_time=graph.startedAt, + end_time=graph.updatedAt, + ) + @staticmethod def from_db(execution: AgentNodeExecution): if execution.executionData: @@ -93,9 +114,10 @@ class ExecutionResult(BaseModel): graph_id=graph_execution.agentGraphId if graph_execution else "", graph_version=graph_execution.agentGraphVersion if graph_execution else 0, graph_exec_id=execution.agentGraphExecutionId, + block_id=execution.AgentNode.agentBlockId if execution.AgentNode else "", node_exec_id=execution.id, node_id=execution.agentNodeId, - status=ExecutionStatus(execution.executionStatus), + status=execution.executionStatus, input_data=input_data, output_data=output_data, add_time=execution.addedTime, @@ -248,15 +270,20 @@ async def update_graph_execution_start_time(graph_exec_id: str): async def update_graph_execution_stats( graph_exec_id: str, stats: dict[str, Any], -): +) -> ExecutionResult: + status = ExecutionStatus.FAILED if stats.get("error") else ExecutionStatus.COMPLETED - await AgentGraphExecution.prisma().update( + res = await AgentGraphExecution.prisma().update( where={"id": graph_exec_id}, data={ "executionStatus": status, "stats": json.dumps(stats), }, ) + if not res: + raise ValueError(f"Execution {graph_exec_id} not found.") + + return ExecutionResult.from_graph(res) async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]): diff --git a/autogpt_platform/backend/backend/data/graph.py b/autogpt_platform/backend/backend/data/graph.py index d49020826..d97d246ea 100644 --- a/autogpt_platform/backend/backend/data/graph.py +++ b/autogpt_platform/backend/backend/data/graph.py @@ -9,6 +9,7 @@ from prisma.models import AgentGraph, AgentGraphExecution, AgentNode, AgentNodeL from prisma.types import AgentGraphWhereInput from pydantic.fields import computed_field +from backend.blocks.agent import AgentExecutorBlock from backend.blocks.basic import AgentInputBlock, AgentOutputBlock from backend.data.block import BlockInput, BlockType, get_block, get_blocks from backend.data.db import BaseDbModel, transaction @@ -174,24 +175,35 @@ class Graph(BaseDbModel): if node.id not in outbound_nodes or node.id in input_nodes ] - def reassign_ids(self, reassign_graph_id: bool = False): + def reassign_ids(self, user_id: str, reassign_graph_id: bool = False): """ Reassigns all IDs in the graph to new UUIDs. This method can be used before storing a new graph to the database. """ - self.validate_graph() + # Reassign Graph ID id_map = {node.id: str(uuid.uuid4()) for node in self.nodes} if reassign_graph_id: self.id = str(uuid.uuid4()) + # Reassign Node IDs for node in self.nodes: node.id = id_map[node.id] + # Reassign Link IDs for link in self.links: link.source_id = id_map[link.source_id] link.sink_id = id_map[link.sink_id] + # Reassign User IDs for agent blocks + for node in self.nodes: + if node.block_id != AgentExecutorBlock().id: + continue + node.input_default["user_id"] = user_id + node.input_default.setdefault("data", {}) + + self.validate_graph() + def validate_graph(self, for_run: bool = False): def sanitize(name): return name.split("_#_")[0].split("_@_")[0].split("_$_")[0] @@ -215,6 +227,7 @@ class Graph(BaseDbModel): for_run # Skip input completion validation, unless when executing. or block.block_type == BlockType.INPUT or block.block_type == BlockType.OUTPUT + or block.block_type == BlockType.AGENT ): raise ValueError( f"Node {block.name} #{node.id} required input missing: `{name}`" @@ -248,18 +261,26 @@ class Graph(BaseDbModel): ) sanitized_name = sanitize(name) + vals = node.input_default if i == 0: - fields = f"Valid output fields: {block.output_schema.get_fields()}" + fields = ( + block.output_schema.get_fields() + if block.block_type != BlockType.AGENT + else vals.get("output_schema", {}).get("properties", {}).keys() + ) else: - fields = f"Valid input fields: {block.input_schema.get_fields()}" + fields = ( + block.input_schema.get_fields() + if block.block_type != BlockType.AGENT + else vals.get("input_schema", {}).get("properties", {}).keys() + ) if sanitized_name not in fields: - raise ValueError(f"{suffix}, `{name}` invalid, {fields}") + fields_msg = f"Allowed fields: {fields}" + raise ValueError(f"{suffix}, `{name}` invalid, {fields_msg}") if is_static_output_block(link.source_id): link.is_static = True # Each value block output should be static. - # TODO: Add type compatibility check here. - @staticmethod def from_db(graph: AgentGraph, hide_credentials: bool = False): executions = [ diff --git a/autogpt_platform/backend/backend/executor/database.py b/autogpt_platform/backend/backend/executor/database.py index ecc9a12fc..db6bd4942 100644 --- a/autogpt_platform/backend/backend/executor/database.py +++ b/autogpt_platform/backend/backend/executor/database.py @@ -41,8 +41,8 @@ class DatabaseManager(AppService): return Config().database_api_port @expose - def send_execution_update(self, execution_result_dict: dict[Any, Any]): - self.event_queue.publish(ExecutionResult(**execution_result_dict)) + def send_execution_update(self, execution_result: ExecutionResult): + self.event_queue.publish(execution_result) @staticmethod def exposed_run_and_wait( diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 46319bf99..b87054482 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -125,7 +125,7 @@ def execute_node( def update_execution(status: ExecutionStatus) -> ExecutionResult: exec_update = db_client.update_execution_status(node_exec_id, status) - db_client.send_execution_update(exec_update.model_dump()) + db_client.send_execution_update(exec_update) return exec_update node = db_client.get_node(node_id) @@ -251,7 +251,7 @@ def _enqueue_next_nodes( exec_update = db_client.update_execution_status( node_exec_id, ExecutionStatus.QUEUED, data ) - db_client.send_execution_update(exec_update.model_dump()) + db_client.send_execution_update(exec_update) return NodeExecution( user_id=user_id, graph_exec_id=graph_exec_id, @@ -572,10 +572,11 @@ class Executor: exec_stats["walltime"] = timing_info.wall_time exec_stats["cputime"] = timing_info.cpu_time exec_stats["error"] = str(error) if error else None - cls.db_client.update_graph_execution_stats( + result = cls.db_client.update_graph_execution_stats( graph_exec_id=graph_exec.graph_exec_id, stats=exec_stats, ) + cls.db_client.send_execution_update(result) @classmethod @time_measured @@ -729,7 +730,7 @@ class ExecutionManager(AppService): ) self.active_graph_runs[graph_exec_id] = (future, cancel_event) future.add_done_callback( - lambda _: self.active_graph_runs.pop(graph_exec_id) + lambda _: self.active_graph_runs.pop(graph_exec_id, None) ) def cleanup(self): @@ -744,11 +745,17 @@ class ExecutionManager(AppService): @expose def add_execution( - self, graph_id: str, data: BlockInput, user_id: str - ) -> dict[str, Any]: - graph: Graph | None = self.db_client.get_graph(graph_id, user_id=user_id) + self, + graph_id: str, + data: BlockInput, + user_id: str, + graph_version: int | None = None, + ) -> GraphExecution: + graph: Graph | None = self.db_client.get_graph( + graph_id=graph_id, user_id=user_id, version=graph_version + ) if not graph: - raise Exception(f"Graph #{graph_id} not found.") + raise ValueError(f"Graph #{graph_id} not found.") graph.validate_graph(for_run=True) self._validate_node_input_credentials(graph, user_id) @@ -770,7 +777,7 @@ class ExecutionManager(AppService): input_data, error = validate_exec(node, input_data) if input_data is None: - raise Exception(error) + raise ValueError(error) else: nodes_input.append((node.id, input_data)) @@ -796,7 +803,7 @@ class ExecutionManager(AppService): exec_update = self.db_client.update_execution_status( node_exec.node_exec_id, ExecutionStatus.QUEUED, node_exec.input_data ) - self.db_client.send_execution_update(exec_update.model_dump()) + self.db_client.send_execution_update(exec_update) graph_exec = GraphExecution( user_id=user_id, @@ -806,7 +813,7 @@ class ExecutionManager(AppService): ) self.queue.add(graph_exec) - return graph_exec.model_dump() + return graph_exec @expose def cancel_execution(self, graph_exec_id: str) -> None: @@ -843,7 +850,7 @@ class ExecutionManager(AppService): exec_update = self.db_client.update_execution_status( node_exec.node_exec_id, ExecutionStatus.FAILED ) - self.db_client.send_execution_update(exec_update.model_dump()) + self.db_client.send_execution_update(exec_update) def _validate_node_input_credentials(self, graph: Graph, user_id: str): """Checks all credentials for all nodes of the graph""" diff --git a/autogpt_platform/backend/backend/server/routers/v1.py b/autogpt_platform/backend/backend/server/routers/v1.py index 62f3a9b1a..67b426896 100644 --- a/autogpt_platform/backend/backend/server/routers/v1.py +++ b/autogpt_platform/backend/backend/server/routers/v1.py @@ -209,7 +209,7 @@ async def update_graph( 400, detail="Changing is_template on an existing graph is forbidden" ) graph.is_active = not graph.is_template - graph.reassign_ids() + graph.reassign_ids(user_id=user_id) new_graph_version = await graph_db.create_graph(graph, user_id=user_id) @@ -265,7 +265,7 @@ async def execute_graph( graph_exec = execution_manager_client().add_execution( graph_id, node_input, user_id=user_id ) - return {"id": graph_exec["graph_exec_id"]} + return {"id": graph_exec.graph_exec_id} except Exception as e: msg = e.__str__().encode().decode("unicode_escape") raise HTTPException(status_code=400, detail=msg) @@ -403,7 +403,7 @@ async def do_create_graph( graph.is_template = is_template graph.is_active = not is_template - graph.reassign_ids(reassign_graph_id=True) + graph.reassign_ids(user_id=user_id, reassign_graph_id=True) return await graph_db.create_graph(graph, user_id=user_id) diff --git a/autogpt_platform/backend/test/server/test_con_manager.py b/autogpt_platform/backend/test/server/test_con_manager.py index 243004acf..80f9e08f5 100644 --- a/autogpt_platform/backend/test/server/test_con_manager.py +++ b/autogpt_platform/backend/test/server/test_con_manager.py @@ -72,6 +72,7 @@ async def test_send_execution_result( graph_exec_id="test_exec_id", node_exec_id="test_node_exec_id", node_id="test_node_id", + block_id="test_block_id", status=ExecutionStatus.COMPLETED, input_data={"input1": "value1"}, output_data={"output1": ["result1"]}, @@ -102,6 +103,7 @@ async def test_send_execution_result_no_subscribers( graph_exec_id="test_exec_id", node_exec_id="test_node_exec_id", node_id="test_node_id", + block_id="test_block_id", status=ExecutionStatus.COMPLETED, input_data={"input1": "value1"}, output_data={"output1": ["result1"]}, diff --git a/autogpt_platform/frontend/src/components/CustomNode.tsx b/autogpt_platform/frontend/src/components/CustomNode.tsx index 31a7e50d4..902135123 100644 --- a/autogpt_platform/frontend/src/components/CustomNode.tsx +++ b/autogpt_platform/frontend/src/components/CustomNode.tsx @@ -93,6 +93,12 @@ export function CustomNode({ const isInitialSetup = useRef(true); const flowContext = useContext(FlowContext); + if (data.uiType === BlockUIType.AGENT) { + // Display the graph's schema instead AgentExecutorBlock's schema. + data.inputSchema = data.hardcodedValues?.input_schema || {}; + data.outputSchema = data.hardcodedValues?.output_schema || {}; + } + if (!flowContext) { throw new Error("FlowContext consumer must be inside FlowEditor component"); } @@ -163,38 +169,6 @@ export function CustomNode({ if (!schema?.properties) return null; let keys = Object.entries(schema.properties); switch (nodeType) { - case BlockUIType.INPUT: - // For INPUT blocks, dont include connection handles - return keys.map(([propKey, propSchema]) => { - const isRequired = data.inputSchema.required?.includes(propKey); - const isConnected = isHandleConnected(propKey); - const isAdvanced = propSchema.advanced; - return ( - (isRequired || isAdvancedOpen || !isAdvanced) && ( -