diff --git a/rnd/autogpt_builder/src/components/Flow.tsx b/rnd/autogpt_builder/src/components/Flow.tsx index b17ee89b5..64d78ba30 100644 --- a/rnd/autogpt_builder/src/components/Flow.tsx +++ b/rnd/autogpt_builder/src/components/Flow.tsx @@ -81,7 +81,24 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({ const [agentName, setAgentName] = useState(''); const apiUrl = process.env.AGPT_SERVER_URL!; - const api = new AutoGPTServerAPI(apiUrl); + const api = useMemo(() => new AutoGPTServerAPI(apiUrl), [apiUrl]); + + useEffect(() => { + api.connectWebSocket() + .then(() => { + console.log('WebSocket connected'); + api.onWebSocketMessage('execution_event', (data) => { + updateNodesWithExecutionData([data]); + }); + }) + .catch((error) => { + console.error('Failed to connect WebSocket:', error); + }); + + return () => { + api.disconnectWebSocket(); + }; + }, [api]); useEffect(() => { api.getBlocks() @@ -377,21 +394,8 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({ return; } - const executeData = await api.executeFlow(newAgentId); - const runId = executeData.id; - - const pollExecution = async () => { - const data = await api.getFlowExecutionInfo(newAgentId, runId); - updateNodesWithExecutionData(data); - - if (data.every((node) => node.status === 'COMPLETED')) { - console.log('All nodes completed execution'); - } else { - setTimeout(pollExecution, 1000); - } - }; - - pollExecution(); + api.subscribeToExecution(newAgentId); + api.runGraph(newAgentId); } catch (error) { console.error('Error running agent:', error); diff --git a/rnd/autogpt_builder/src/lib/autogpt_server_api.ts b/rnd/autogpt_builder/src/lib/autogpt_server_api.ts index 5a9326a8e..3f9bee621 100644 --- a/rnd/autogpt_builder/src/lib/autogpt_server_api.ts +++ b/rnd/autogpt_builder/src/lib/autogpt_server_api.ts @@ -3,9 +3,13 @@ import { ObjectSchema } from "./types"; export default class AutoGPTServerAPI { private baseUrl: string; + private wsUrl: string; + private socket: WebSocket | null = null; + private messageHandlers: { [key: string]: (data: any) => void } = {}; constructor(baseUrl: string = process.env.AGPT_SERVER_URL || "http://localhost:8000") { this.baseUrl = baseUrl; + this.wsUrl = `ws://${new URL(this.baseUrl).host}/ws`; } async getBlocks(): Promise { @@ -228,6 +232,60 @@ export default class AutoGPTServerAPI { throw error; } } + + connectWebSocket(): Promise { + return new Promise((resolve, reject) => { + this.socket = new WebSocket(this.wsUrl); + + this.socket.onopen = () => { + console.log('WebSocket connection established'); + resolve(); + }; + + this.socket.onclose = (event) => { + console.log('WebSocket connection closed', event); + this.socket = null; + }; + + this.socket.onerror = (error) => { + console.error('WebSocket error:', error); + reject(error); + }; + + this.socket.onmessage = (event) => { + const message = JSON.parse(event.data); + if (this.messageHandlers[message.method]) { + this.messageHandlers[message.method](message.data); + } + }; + }); + } + + disconnectWebSocket() { + if (this.socket && this.socket.readyState === WebSocket.OPEN) { + this.socket.close(); + } + } + + sendWebSocketMessage(method: string, data: any) { + if (this.socket && this.socket.readyState === WebSocket.OPEN) { + this.socket.send(JSON.stringify({ method, data })); + } else { + console.error('WebSocket is not connected'); + } + } + + onWebSocketMessage(method: string, handler: (data: any) => void) { + this.messageHandlers[method] = handler; + } + + subscribeToExecution(graphId: string) { + this.sendWebSocketMessage('subscribe', { graph_id: graphId }); + } + + runGraph(graphId: string, data: any = {}) { + this.sendWebSocketMessage('run_graph', { graph_id: graphId, data }); + } } /* Mirror of autogpt_server/data/block.py:Block */ @@ -322,4 +380,4 @@ export type NodeExecutionResult = { queue_time?: Date; start_time?: Date; end_time?: Date; -}; +}; \ No newline at end of file