feat(autogpt_builder): Add websocket support to replace polling (#7449)

feat(autogpt_builder): Add websocket support
pull/7455/head
Bently 2024-07-15 20:18:23 +01:00 committed by GitHub
parent 110e093e7b
commit d673bf741a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 79 additions and 17 deletions

View File

@ -81,7 +81,24 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({
const [agentName, setAgentName] = useState<string>('');
const apiUrl = process.env.AGPT_SERVER_URL!;
const api = new AutoGPTServerAPI(apiUrl);
const api = useMemo(() => new AutoGPTServerAPI(apiUrl), [apiUrl]);
useEffect(() => {
api.connectWebSocket()
.then(() => {
console.log('WebSocket connected');
api.onWebSocketMessage('execution_event', (data) => {
updateNodesWithExecutionData([data]);
});
})
.catch((error) => {
console.error('Failed to connect WebSocket:', error);
});
return () => {
api.disconnectWebSocket();
};
}, [api]);
useEffect(() => {
api.getBlocks()
@ -377,21 +394,8 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({
return;
}
const executeData = await api.executeFlow(newAgentId);
const runId = executeData.id;
const pollExecution = async () => {
const data = await api.getFlowExecutionInfo(newAgentId, runId);
updateNodesWithExecutionData(data);
if (data.every((node) => node.status === 'COMPLETED')) {
console.log('All nodes completed execution');
} else {
setTimeout(pollExecution, 1000);
}
};
pollExecution();
api.subscribeToExecution(newAgentId);
api.runGraph(newAgentId);
} catch (error) {
console.error('Error running agent:', error);

View File

@ -3,9 +3,13 @@ import { ObjectSchema } from "./types";
export default class AutoGPTServerAPI {
private baseUrl: string;
private wsUrl: string;
private socket: WebSocket | null = null;
private messageHandlers: { [key: string]: (data: any) => void } = {};
constructor(baseUrl: string = process.env.AGPT_SERVER_URL || "http://localhost:8000") {
this.baseUrl = baseUrl;
this.wsUrl = `ws://${new URL(this.baseUrl).host}/ws`;
}
async getBlocks(): Promise<Block[]> {
@ -228,6 +232,60 @@ export default class AutoGPTServerAPI {
throw error;
}
}
connectWebSocket(): Promise<void> {
return new Promise((resolve, reject) => {
this.socket = new WebSocket(this.wsUrl);
this.socket.onopen = () => {
console.log('WebSocket connection established');
resolve();
};
this.socket.onclose = (event) => {
console.log('WebSocket connection closed', event);
this.socket = null;
};
this.socket.onerror = (error) => {
console.error('WebSocket error:', error);
reject(error);
};
this.socket.onmessage = (event) => {
const message = JSON.parse(event.data);
if (this.messageHandlers[message.method]) {
this.messageHandlers[message.method](message.data);
}
};
});
}
disconnectWebSocket() {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.close();
}
}
sendWebSocketMessage(method: string, data: any) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify({ method, data }));
} else {
console.error('WebSocket is not connected');
}
}
onWebSocketMessage(method: string, handler: (data: any) => void) {
this.messageHandlers[method] = handler;
}
subscribeToExecution(graphId: string) {
this.sendWebSocketMessage('subscribe', { graph_id: graphId });
}
runGraph(graphId: string, data: any = {}) {
this.sendWebSocketMessage('run_graph', { graph_id: graphId, data });
}
}
/* Mirror of autogpt_server/data/block.py:Block */
@ -322,4 +380,4 @@ export type NodeExecutionResult = {
queue_time?: Date;
start_time?: Date;
end_time?: Date;
};
};