fix(platform): Added updated graph meta to include runs (#8088)

pull/8123/head^2
Swifty 2024-09-23 11:31:48 +02:00 committed by GitHub
parent fc51176a56
commit c07cf8a7b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 156 additions and 137 deletions

View File

@ -1,17 +1,20 @@
import asyncio
import logging
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Literal
import prisma.types
from prisma.models import AgentGraph, AgentNode, AgentNodeLink
from prisma.models import AgentGraph, AgentGraphExecution, AgentNode, AgentNodeLink
from prisma.types import AgentGraphInclude
from pydantic import BaseModel, PrivateAttr
from pydantic_core import PydanticUndefinedType
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
from backend.data.block import BlockInput, get_block, get_blocks
from backend.data.db import BaseDbModel, transaction
from backend.data.execution import ExecutionStatus
from backend.data.user import DEFAULT_USER_ID
from backend.util import json
@ -77,16 +80,57 @@ class Node(BaseDbModel):
return obj
class ExecutionMeta(BaseDbModel):
execution_id: str
started_at: datetime
ended_at: datetime
duration: float
total_run_time: float
status: ExecutionStatus
@staticmethod
def from_agent_graph_execution(execution: AgentGraphExecution):
now = datetime.now(timezone.utc)
start_time = execution.startedAt or execution.createdAt
end_time = execution.updatedAt or now
duration = (end_time - start_time).total_seconds()
total_run_time = 0
if execution.AgentNodeExecutions:
for node_execution in execution.AgentNodeExecutions:
node_start = node_execution.startedTime or now
node_end = node_execution.endedTime or now
total_run_time += (node_end - node_start).total_seconds()
return ExecutionMeta(
id=execution.id,
execution_id=execution.id,
started_at=start_time,
ended_at=end_time,
duration=duration,
total_run_time=total_run_time,
status=ExecutionStatus(execution.executionStatus),
)
class GraphMeta(BaseDbModel):
version: int = 1
is_active: bool = True
is_template: bool = False
name: str
description: str
executions: list[ExecutionMeta] | None = None
@staticmethod
def from_db(graph: AgentGraph):
if graph.AgentGraphExecution:
executions = [
ExecutionMeta.from_agent_graph_execution(execution)
for execution in graph.AgentGraphExecution
]
else:
executions = None
return GraphMeta(
id=graph.id,
version=graph.version,
@ -94,6 +138,7 @@ class GraphMeta(BaseDbModel):
is_template=graph.isTemplate,
name=graph.name or "",
description=graph.description or "",
executions=executions,
)
@ -337,6 +382,7 @@ async def get_node(node_id: str) -> Node:
async def get_graphs_meta(
include_executions: bool = False,
filter_by: Literal["active", "template"] | None = "active",
user_id: str | None = None,
) -> list[GraphMeta]:
@ -345,6 +391,7 @@ async def get_graphs_meta(
Default behaviour is to get all currently active graphs.
Args:
include_executions: Whether to include executions in the graph metadata.
filter_by: An optional filter to either select templates or active graphs.
Returns:
@ -364,6 +411,13 @@ async def get_graphs_meta(
where=where_clause,
distinct=["id"],
order={"version": "desc"},
include=(
AgentGraphInclude(
AgentGraphExecution={"include": {"AgentNodeExecutions": True}}
)
if include_executions
else None
),
)
if not graphs:

View File

@ -326,9 +326,13 @@ class AgentServer(AppService):
@classmethod
async def get_graphs(
cls, user_id: Annotated[str, Depends(get_user_id)]
cls,
user_id: Annotated[str, Depends(get_user_id)],
with_runs: bool = False,
) -> list[graph_db.GraphMeta]:
return await graph_db.get_graphs_meta(filter_by="active", user_id=user_id)
return await graph_db.get_graphs_meta(
include_executions=with_runs, filter_by="active", user_id=user_id
)
@classmethod
async def get_templates(cls) -> list[graph_db.GraphMeta]:

View File

@ -6,6 +6,7 @@ authors = ["AutoGPT <info@agpt.co>"]
readme = "README.md"
packages = [{ include = "backend" }]
[tool.poetry.dependencies]
python = "^3.10"
aio-pika = "^9.4.3"
@ -67,40 +68,6 @@ cli = "backend.cli:main"
format = "linter:format"
lint = "linter:lint"
test = "run_tests:test"
# https://poethepoet.natn.io/index.html
[tool.poe]
poetry_command = ""
# poetry run poe xxx
[tool.poe.tasks]
test = "pytest"
build = ["test", "_dbuild"]
# This might break your python install :)
install = ["build", "_dinstall"]
# https://cx-freeze.readthedocs.io/en/stable/index.html
[tool.poe.tasks._dbuild]
cmd = "python setup.py build"
[tool.poe.tasks.dist_app]
cmd = "python setup.py bdist_app"
[tool.poe.tasks.dist_dmg]
cmd = "python setup.py bdist_dmg"
[tool.poe.tasks.dist_msi]
cmd = "python setup.py bdist_msi"
[tool.poe.tasks.dist_appimage]
cmd = "python setup.py bdist_appimage"
[tool.poe.tasks.dist_deb]
cmd = "python setup.py bdist_deb"
[tool.poe.tasks._dinstall]
cmd = "python setup.py install"
[tool.pytest-watcher]
now = false

View File

@ -3,7 +3,8 @@
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "next dev",
"dev": "export NODE_ENV=development && next dev",
"dev:test": "export NODE_ENV=test && next dev",
"build": "next build",
"start": "next start",
"lint": "next lint",

View File

@ -7,6 +7,8 @@ import * as Sentry from "@sentry/nextjs";
Sentry.init({
dsn: "https://fe4e4aa4a283391808a5da396da20159@o4505260022104064.ingest.us.sentry.io/4507946746380288",
enabled: process.env.NODE_ENV !== "development",
// Add optional integrations for additional features
integrations: [
Sentry.replayIntegration(),

View File

@ -8,6 +8,8 @@ import * as Sentry from "@sentry/nextjs";
Sentry.init({
dsn: "https://fe4e4aa4a283391808a5da396da20159@o4505260022104064.ingest.us.sentry.io/4507946746380288",
enabled: process.env.NODE_ENV !== "development",
// Define how likely traces are sampled. Adjust this value in production, or use tracesSampler for greater control.
tracesSampleRate: 1,

View File

@ -8,6 +8,8 @@ import * as Sentry from "@sentry/nextjs";
Sentry.init({
dsn: "https://fe4e4aa4a283391808a5da396da20159@o4505260022104064.ingest.us.sentry.io/4507946746380288",
enabled: process.env.NODE_ENV !== "development",
// Define how likely traces are sampled. Adjust this value in production, or use tracesSampler for greater control.
tracesSampleRate: 1,

View File

@ -2,8 +2,8 @@
import React, { useCallback, useEffect, useMemo, useState } from "react";
import AutoGPTServerAPI, {
GraphMeta,
NodeExecutionResult,
GraphMetaWithRuns,
ExecutionMeta,
} from "@/lib/autogpt-server-api";
import { Card } from "@/components/ui/card";
@ -17,62 +17,37 @@ import {
} from "@/components/monitor";
const Monitor = () => {
const [flows, setFlows] = useState<GraphMeta[]>([]);
const [flows, setFlows] = useState<GraphMetaWithRuns[]>([]);
const [flowRuns, setFlowRuns] = useState<FlowRun[]>([]);
const [selectedFlow, setSelectedFlow] = useState<GraphMeta | null>(null);
const [selectedFlow, setSelectedFlow] = useState<GraphMetaWithRuns | null>(
null,
);
const [selectedRun, setSelectedRun] = useState<FlowRun | null>(null);
const api = useMemo(() => new AutoGPTServerAPI(), []);
const refreshFlowRuns = useCallback(
(flowID: string) => {
// Fetch flow run IDs
api.listGraphRunIDs(flowID).then((runIDs) =>
runIDs.map((runID) => {
let run;
if (
(run = flowRuns.find((fr) => fr.id == runID)) &&
!["waiting", "running"].includes(run.status)
) {
return;
}
// Fetch flow run
api.getGraphExecutionInfo(flowID, runID).then((execInfo) =>
setFlowRuns((flowRuns) => {
if (execInfo.length == 0) return flowRuns;
const flowRunIndex = flowRuns.findIndex((fr) => fr.id == runID);
const flowRun = flowRunFromNodeExecutionResults(execInfo);
if (flowRunIndex > -1) {
flowRuns.splice(flowRunIndex, 1, flowRun);
} else {
flowRuns.push(flowRun);
}
return [...flowRuns];
}),
);
}),
const fetchAgents = useCallback(() => {
api.listGraphsWithRuns().then((agent) => {
setFlows(agent);
const flowRuns = agent.flatMap((graph) =>
graph.executions != null
? graph.executions.map((execution) =>
flowRunFromExecutionMeta(graph, execution),
)
: [],
);
},
[api, flowRuns],
);
const fetchFlowsAndRuns = useCallback(() => {
api.listGraphs().then((flows) => {
setFlows(flows);
flows.map((flow) => refreshFlowRuns(flow.id));
setFlowRuns(flowRuns);
});
}, [api, refreshFlowRuns]);
}, [api]);
useEffect(() => fetchFlowsAndRuns(), [fetchFlowsAndRuns]);
useEffect(() => {
const intervalId = setInterval(
() => flows.map((f) => refreshFlowRuns(f.id)),
5000,
);
fetchAgents();
}, [api, fetchAgents]);
useEffect(() => {
const intervalId = setInterval(() => fetchAgents(), 5000);
return () => clearInterval(intervalId);
}, [flows, refreshFlowRuns]);
}, [fetchAgents, flows]);
const column1 = "md:col-span-2 xl:col-span-3 xxl:col-span-2";
const column2 = "md:col-span-3 lg:col-span-2 xl:col-span-3 space-y-4";
@ -87,7 +62,9 @@ const Monitor = () => {
selectedFlow={selectedFlow}
onSelectFlow={(f) => {
setSelectedRun(null);
setSelectedFlow(f.id == selectedFlow?.id ? null : f);
setSelectedFlow(
f.id == selectedFlow?.id ? null : (f as GraphMetaWithRuns),
);
}}
/>
<FlowRunsList
@ -123,56 +100,20 @@ const Monitor = () => {
);
};
function flowRunFromNodeExecutionResults(
nodeExecutionResults: NodeExecutionResult[],
function flowRunFromExecutionMeta(
graphMeta: GraphMetaWithRuns,
executionMeta: ExecutionMeta,
): FlowRun {
// Determine overall status
let status: "running" | "waiting" | "success" | "failed" = "success";
for (const execution of nodeExecutionResults) {
if (execution.status === "FAILED") {
status = "failed";
break;
} else if (["QUEUED", "RUNNING"].includes(execution.status)) {
status = "running";
break;
} else if (execution.status === "INCOMPLETE") {
status = "waiting";
}
}
// Determine aggregate startTime, endTime, and totalRunTime
const now = Date.now();
const startTime = Math.min(
...nodeExecutionResults.map((ner) => ner.add_time.getTime()),
now,
);
const endTime = ["success", "failed"].includes(status)
? Math.max(
...nodeExecutionResults.map((ner) => ner.end_time?.getTime() || 0),
startTime,
)
: now;
const duration = (endTime - startTime) / 1000; // Convert to seconds
const totalRunTime =
nodeExecutionResults.reduce(
(cum, node) =>
cum +
((node.end_time?.getTime() ?? now) -
(node.start_time?.getTime() ?? now)),
0,
) / 1000;
return {
id: nodeExecutionResults[0].graph_exec_id,
graphID: nodeExecutionResults[0].graph_id,
graphVersion: nodeExecutionResults[0].graph_version,
status,
startTime,
endTime,
duration,
totalRunTime,
nodeExecutionResults: nodeExecutionResults,
};
id: executionMeta.execution_id,
graphID: graphMeta.id,
graphVersion: graphMeta.version,
status: executionMeta.status,
startTime: executionMeta.started_at,
endTime: executionMeta.ended_at,
duration: executionMeta.duration,
totalRunTime: executionMeta.total_run_time,
} as FlowRun;
}
export default Monitor;

View File

@ -5,7 +5,9 @@ import {
GraphCreatable,
GraphUpdateable,
GraphMeta,
GraphMetaWithRuns,
GraphExecuteResponse,
ExecutionMeta,
NodeExecutionResult,
User,
AnalyticsMetrics,
@ -45,7 +47,12 @@ export default class BaseAutoGPTServerAPI {
}
async listGraphs(): Promise<GraphMeta[]> {
return this._get("/graphs");
return this._get(`/graphs`);
}
async listGraphsWithRuns(): Promise<GraphMetaWithRuns[]> {
let graphs = await this._get(`/graphs?with_runs=true`);
return graphs.map(parseGraphMetaWithRuns);
}
async listTemplates(): Promise<GraphMeta[]> {
@ -328,3 +335,28 @@ function parseNodeExecutionResultTimestamps(result: any): NodeExecutionResult {
end_time: result.end_time ? new Date(result.end_time) : undefined,
};
}
function parseGraphMetaWithRuns(result: any): GraphMetaWithRuns {
return {
...result,
executions: result.executions.map(parseExecutionMetaTimestamps),
};
}
function parseExecutionMetaTimestamps(result: any): ExecutionMeta {
let status: "running" | "waiting" | "success" | "failed" = "success";
if (result.status === "FAILED") {
status = "failed";
} else if (["QUEUED", "RUNNING"].includes(result.status)) {
status = "running";
} else if (result.status === "INCOMPLETE") {
status = "waiting";
}
return {
...result,
status,
started_at: new Date(result.started_at).getTime(),
ended_at: result.ended_at ? new Date(result.ended_at).getTime() : undefined,
};
}

View File

@ -139,6 +139,16 @@ export type LinkCreatable = Omit<Link, "id" | "is_static"> & {
id?: string;
};
/* Mirror of autogpt_server/data/graph.py:ExecutionMeta */
export type ExecutionMeta = {
execution_id: string;
started_at: number;
ended_at: number;
duration: number;
total_run_time: number;
status: "running" | "waiting" | "success" | "failed";
};
/* Mirror of backend/data/graph.py:GraphMeta */
export type GraphMeta = {
id: string;
@ -149,6 +159,10 @@ export type GraphMeta = {
description: string;
};
export type GraphMetaWithRuns = GraphMeta & {
executions: ExecutionMeta[];
};
/* Mirror of backend/data/graph.py:Graph */
export type Graph = GraphMeta & {
nodes: Array<Node>;