From c07cf8a7b8cd480c53dec60664aa0dcdcd9995e1 Mon Sep 17 00:00:00 2001 From: Swifty Date: Mon, 23 Sep 2024 11:31:48 +0200 Subject: [PATCH] fix(platform): Added updated graph meta to include runs (#8088) --- .../backend/backend/data/graph.py | 58 +++++++- .../backend/backend/server/rest_api.py | 8 +- autogpt_platform/backend/pyproject.toml | 35 +---- autogpt_platform/frontend/package.json | 3 +- .../frontend/sentry.client.config.ts | 2 + .../frontend/sentry.edge.config.ts | 2 + .../frontend/sentry.server.config.ts | 2 + autogpt_platform/frontend/src/app/page.tsx | 135 +++++------------- .../src/lib/autogpt-server-api/baseClient.ts | 34 ++++- .../src/lib/autogpt-server-api/types.ts | 14 ++ 10 files changed, 156 insertions(+), 137 deletions(-) diff --git a/autogpt_platform/backend/backend/data/graph.py b/autogpt_platform/backend/backend/data/graph.py index c058547ba..bbe9afc23 100644 --- a/autogpt_platform/backend/backend/data/graph.py +++ b/autogpt_platform/backend/backend/data/graph.py @@ -1,17 +1,20 @@ import asyncio import logging import uuid +from datetime import datetime, timezone from pathlib import Path from typing import Any, Literal import prisma.types -from prisma.models import AgentGraph, AgentNode, AgentNodeLink +from prisma.models import AgentGraph, AgentGraphExecution, AgentNode, AgentNodeLink +from prisma.types import AgentGraphInclude from pydantic import BaseModel, PrivateAttr from pydantic_core import PydanticUndefinedType from backend.blocks.basic import AgentInputBlock, AgentOutputBlock from backend.data.block import BlockInput, get_block, get_blocks from backend.data.db import BaseDbModel, transaction +from backend.data.execution import ExecutionStatus from backend.data.user import DEFAULT_USER_ID from backend.util import json @@ -77,16 +80,57 @@ class Node(BaseDbModel): return obj +class ExecutionMeta(BaseDbModel): + execution_id: str + started_at: datetime + ended_at: datetime + duration: float + total_run_time: float + status: ExecutionStatus + + @staticmethod + def from_agent_graph_execution(execution: AgentGraphExecution): + now = datetime.now(timezone.utc) + start_time = execution.startedAt or execution.createdAt + end_time = execution.updatedAt or now + duration = (end_time - start_time).total_seconds() + + total_run_time = 0 + if execution.AgentNodeExecutions: + for node_execution in execution.AgentNodeExecutions: + node_start = node_execution.startedTime or now + node_end = node_execution.endedTime or now + total_run_time += (node_end - node_start).total_seconds() + + return ExecutionMeta( + id=execution.id, + execution_id=execution.id, + started_at=start_time, + ended_at=end_time, + duration=duration, + total_run_time=total_run_time, + status=ExecutionStatus(execution.executionStatus), + ) + + class GraphMeta(BaseDbModel): version: int = 1 is_active: bool = True is_template: bool = False - name: str description: str + executions: list[ExecutionMeta] | None = None @staticmethod def from_db(graph: AgentGraph): + if graph.AgentGraphExecution: + executions = [ + ExecutionMeta.from_agent_graph_execution(execution) + for execution in graph.AgentGraphExecution + ] + else: + executions = None + return GraphMeta( id=graph.id, version=graph.version, @@ -94,6 +138,7 @@ class GraphMeta(BaseDbModel): is_template=graph.isTemplate, name=graph.name or "", description=graph.description or "", + executions=executions, ) @@ -337,6 +382,7 @@ async def get_node(node_id: str) -> Node: async def get_graphs_meta( + include_executions: bool = False, filter_by: Literal["active", "template"] | None = "active", user_id: str | None = None, ) -> list[GraphMeta]: @@ -345,6 +391,7 @@ async def get_graphs_meta( Default behaviour is to get all currently active graphs. Args: + include_executions: Whether to include executions in the graph metadata. filter_by: An optional filter to either select templates or active graphs. Returns: @@ -364,6 +411,13 @@ async def get_graphs_meta( where=where_clause, distinct=["id"], order={"version": "desc"}, + include=( + AgentGraphInclude( + AgentGraphExecution={"include": {"AgentNodeExecutions": True}} + ) + if include_executions + else None + ), ) if not graphs: diff --git a/autogpt_platform/backend/backend/server/rest_api.py b/autogpt_platform/backend/backend/server/rest_api.py index 9a765d15a..9f3afd2fb 100644 --- a/autogpt_platform/backend/backend/server/rest_api.py +++ b/autogpt_platform/backend/backend/server/rest_api.py @@ -326,9 +326,13 @@ class AgentServer(AppService): @classmethod async def get_graphs( - cls, user_id: Annotated[str, Depends(get_user_id)] + cls, + user_id: Annotated[str, Depends(get_user_id)], + with_runs: bool = False, ) -> list[graph_db.GraphMeta]: - return await graph_db.get_graphs_meta(filter_by="active", user_id=user_id) + return await graph_db.get_graphs_meta( + include_executions=with_runs, filter_by="active", user_id=user_id + ) @classmethod async def get_templates(cls) -> list[graph_db.GraphMeta]: diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml index d014d7246..c7c3be200 100644 --- a/autogpt_platform/backend/pyproject.toml +++ b/autogpt_platform/backend/pyproject.toml @@ -6,6 +6,7 @@ authors = ["AutoGPT "] readme = "README.md" packages = [{ include = "backend" }] + [tool.poetry.dependencies] python = "^3.10" aio-pika = "^9.4.3" @@ -67,40 +68,6 @@ cli = "backend.cli:main" format = "linter:format" lint = "linter:lint" test = "run_tests:test" -# https://poethepoet.natn.io/index.html - -[tool.poe] -poetry_command = "" - -# poetry run poe xxx -[tool.poe.tasks] -test = "pytest" -build = ["test", "_dbuild"] - -# This might break your python install :) -install = ["build", "_dinstall"] - -# https://cx-freeze.readthedocs.io/en/stable/index.html -[tool.poe.tasks._dbuild] -cmd = "python setup.py build" - -[tool.poe.tasks.dist_app] -cmd = "python setup.py bdist_app" - -[tool.poe.tasks.dist_dmg] -cmd = "python setup.py bdist_dmg" - -[tool.poe.tasks.dist_msi] -cmd = "python setup.py bdist_msi" - -[tool.poe.tasks.dist_appimage] -cmd = "python setup.py bdist_appimage" - -[tool.poe.tasks.dist_deb] -cmd = "python setup.py bdist_deb" - -[tool.poe.tasks._dinstall] -cmd = "python setup.py install" [tool.pytest-watcher] now = false diff --git a/autogpt_platform/frontend/package.json b/autogpt_platform/frontend/package.json index c9b7e2dc7..5f9b5cad6 100644 --- a/autogpt_platform/frontend/package.json +++ b/autogpt_platform/frontend/package.json @@ -3,7 +3,8 @@ "version": "0.1.0", "private": true, "scripts": { - "dev": "next dev", + "dev": "export NODE_ENV=development && next dev", + "dev:test": "export NODE_ENV=test && next dev", "build": "next build", "start": "next start", "lint": "next lint", diff --git a/autogpt_platform/frontend/sentry.client.config.ts b/autogpt_platform/frontend/sentry.client.config.ts index aad53ebbb..f37d5cda2 100644 --- a/autogpt_platform/frontend/sentry.client.config.ts +++ b/autogpt_platform/frontend/sentry.client.config.ts @@ -7,6 +7,8 @@ import * as Sentry from "@sentry/nextjs"; Sentry.init({ dsn: "https://fe4e4aa4a283391808a5da396da20159@o4505260022104064.ingest.us.sentry.io/4507946746380288", + enabled: process.env.NODE_ENV !== "development", + // Add optional integrations for additional features integrations: [ Sentry.replayIntegration(), diff --git a/autogpt_platform/frontend/sentry.edge.config.ts b/autogpt_platform/frontend/sentry.edge.config.ts index 09903cfad..8a566e17b 100644 --- a/autogpt_platform/frontend/sentry.edge.config.ts +++ b/autogpt_platform/frontend/sentry.edge.config.ts @@ -8,6 +8,8 @@ import * as Sentry from "@sentry/nextjs"; Sentry.init({ dsn: "https://fe4e4aa4a283391808a5da396da20159@o4505260022104064.ingest.us.sentry.io/4507946746380288", + enabled: process.env.NODE_ENV !== "development", + // Define how likely traces are sampled. Adjust this value in production, or use tracesSampler for greater control. tracesSampleRate: 1, diff --git a/autogpt_platform/frontend/sentry.server.config.ts b/autogpt_platform/frontend/sentry.server.config.ts index db0cf3075..20f0df5a3 100644 --- a/autogpt_platform/frontend/sentry.server.config.ts +++ b/autogpt_platform/frontend/sentry.server.config.ts @@ -8,6 +8,8 @@ import * as Sentry from "@sentry/nextjs"; Sentry.init({ dsn: "https://fe4e4aa4a283391808a5da396da20159@o4505260022104064.ingest.us.sentry.io/4507946746380288", + enabled: process.env.NODE_ENV !== "development", + // Define how likely traces are sampled. Adjust this value in production, or use tracesSampler for greater control. tracesSampleRate: 1, diff --git a/autogpt_platform/frontend/src/app/page.tsx b/autogpt_platform/frontend/src/app/page.tsx index e76636fc6..792488316 100644 --- a/autogpt_platform/frontend/src/app/page.tsx +++ b/autogpt_platform/frontend/src/app/page.tsx @@ -2,8 +2,8 @@ import React, { useCallback, useEffect, useMemo, useState } from "react"; import AutoGPTServerAPI, { - GraphMeta, - NodeExecutionResult, + GraphMetaWithRuns, + ExecutionMeta, } from "@/lib/autogpt-server-api"; import { Card } from "@/components/ui/card"; @@ -17,62 +17,37 @@ import { } from "@/components/monitor"; const Monitor = () => { - const [flows, setFlows] = useState([]); + const [flows, setFlows] = useState([]); const [flowRuns, setFlowRuns] = useState([]); - const [selectedFlow, setSelectedFlow] = useState(null); + const [selectedFlow, setSelectedFlow] = useState( + null, + ); const [selectedRun, setSelectedRun] = useState(null); const api = useMemo(() => new AutoGPTServerAPI(), []); - const refreshFlowRuns = useCallback( - (flowID: string) => { - // Fetch flow run IDs - api.listGraphRunIDs(flowID).then((runIDs) => - runIDs.map((runID) => { - let run; - if ( - (run = flowRuns.find((fr) => fr.id == runID)) && - !["waiting", "running"].includes(run.status) - ) { - return; - } - - // Fetch flow run - api.getGraphExecutionInfo(flowID, runID).then((execInfo) => - setFlowRuns((flowRuns) => { - if (execInfo.length == 0) return flowRuns; - - const flowRunIndex = flowRuns.findIndex((fr) => fr.id == runID); - const flowRun = flowRunFromNodeExecutionResults(execInfo); - if (flowRunIndex > -1) { - flowRuns.splice(flowRunIndex, 1, flowRun); - } else { - flowRuns.push(flowRun); - } - return [...flowRuns]; - }), - ); - }), + const fetchAgents = useCallback(() => { + api.listGraphsWithRuns().then((agent) => { + setFlows(agent); + const flowRuns = agent.flatMap((graph) => + graph.executions != null + ? graph.executions.map((execution) => + flowRunFromExecutionMeta(graph, execution), + ) + : [], ); - }, - [api, flowRuns], - ); - - const fetchFlowsAndRuns = useCallback(() => { - api.listGraphs().then((flows) => { - setFlows(flows); - flows.map((flow) => refreshFlowRuns(flow.id)); + setFlowRuns(flowRuns); }); - }, [api, refreshFlowRuns]); + }, [api]); - useEffect(() => fetchFlowsAndRuns(), [fetchFlowsAndRuns]); useEffect(() => { - const intervalId = setInterval( - () => flows.map((f) => refreshFlowRuns(f.id)), - 5000, - ); + fetchAgents(); + }, [api, fetchAgents]); + + useEffect(() => { + const intervalId = setInterval(() => fetchAgents(), 5000); return () => clearInterval(intervalId); - }, [flows, refreshFlowRuns]); + }, [fetchAgents, flows]); const column1 = "md:col-span-2 xl:col-span-3 xxl:col-span-2"; const column2 = "md:col-span-3 lg:col-span-2 xl:col-span-3 space-y-4"; @@ -87,7 +62,9 @@ const Monitor = () => { selectedFlow={selectedFlow} onSelectFlow={(f) => { setSelectedRun(null); - setSelectedFlow(f.id == selectedFlow?.id ? null : f); + setSelectedFlow( + f.id == selectedFlow?.id ? null : (f as GraphMetaWithRuns), + ); }} /> { ); }; -function flowRunFromNodeExecutionResults( - nodeExecutionResults: NodeExecutionResult[], +function flowRunFromExecutionMeta( + graphMeta: GraphMetaWithRuns, + executionMeta: ExecutionMeta, ): FlowRun { - // Determine overall status - let status: "running" | "waiting" | "success" | "failed" = "success"; - for (const execution of nodeExecutionResults) { - if (execution.status === "FAILED") { - status = "failed"; - break; - } else if (["QUEUED", "RUNNING"].includes(execution.status)) { - status = "running"; - break; - } else if (execution.status === "INCOMPLETE") { - status = "waiting"; - } - } - - // Determine aggregate startTime, endTime, and totalRunTime - const now = Date.now(); - const startTime = Math.min( - ...nodeExecutionResults.map((ner) => ner.add_time.getTime()), - now, - ); - const endTime = ["success", "failed"].includes(status) - ? Math.max( - ...nodeExecutionResults.map((ner) => ner.end_time?.getTime() || 0), - startTime, - ) - : now; - const duration = (endTime - startTime) / 1000; // Convert to seconds - const totalRunTime = - nodeExecutionResults.reduce( - (cum, node) => - cum + - ((node.end_time?.getTime() ?? now) - - (node.start_time?.getTime() ?? now)), - 0, - ) / 1000; - return { - id: nodeExecutionResults[0].graph_exec_id, - graphID: nodeExecutionResults[0].graph_id, - graphVersion: nodeExecutionResults[0].graph_version, - status, - startTime, - endTime, - duration, - totalRunTime, - nodeExecutionResults: nodeExecutionResults, - }; + id: executionMeta.execution_id, + graphID: graphMeta.id, + graphVersion: graphMeta.version, + status: executionMeta.status, + startTime: executionMeta.started_at, + endTime: executionMeta.ended_at, + duration: executionMeta.duration, + totalRunTime: executionMeta.total_run_time, + } as FlowRun; } export default Monitor; diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts index b7d53703c..f1293ed10 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts @@ -5,7 +5,9 @@ import { GraphCreatable, GraphUpdateable, GraphMeta, + GraphMetaWithRuns, GraphExecuteResponse, + ExecutionMeta, NodeExecutionResult, User, AnalyticsMetrics, @@ -45,7 +47,12 @@ export default class BaseAutoGPTServerAPI { } async listGraphs(): Promise { - return this._get("/graphs"); + return this._get(`/graphs`); + } + + async listGraphsWithRuns(): Promise { + let graphs = await this._get(`/graphs?with_runs=true`); + return graphs.map(parseGraphMetaWithRuns); } async listTemplates(): Promise { @@ -328,3 +335,28 @@ function parseNodeExecutionResultTimestamps(result: any): NodeExecutionResult { end_time: result.end_time ? new Date(result.end_time) : undefined, }; } + +function parseGraphMetaWithRuns(result: any): GraphMetaWithRuns { + return { + ...result, + executions: result.executions.map(parseExecutionMetaTimestamps), + }; +} + +function parseExecutionMetaTimestamps(result: any): ExecutionMeta { + let status: "running" | "waiting" | "success" | "failed" = "success"; + if (result.status === "FAILED") { + status = "failed"; + } else if (["QUEUED", "RUNNING"].includes(result.status)) { + status = "running"; + } else if (result.status === "INCOMPLETE") { + status = "waiting"; + } + + return { + ...result, + status, + started_at: new Date(result.started_at).getTime(), + ended_at: result.ended_at ? new Date(result.ended_at).getTime() : undefined, + }; +} diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts index b5a326745..df9f691ce 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts @@ -139,6 +139,16 @@ export type LinkCreatable = Omit & { id?: string; }; +/* Mirror of autogpt_server/data/graph.py:ExecutionMeta */ +export type ExecutionMeta = { + execution_id: string; + started_at: number; + ended_at: number; + duration: number; + total_run_time: number; + status: "running" | "waiting" | "success" | "failed"; +}; + /* Mirror of backend/data/graph.py:GraphMeta */ export type GraphMeta = { id: string; @@ -149,6 +159,10 @@ export type GraphMeta = { description: string; }; +export type GraphMetaWithRuns = GraphMeta & { + executions: ExecutionMeta[]; +}; + /* Mirror of backend/data/graph.py:Graph */ export type Graph = GraphMeta & { nodes: Array;