tweak(rnd): Post infra change cleanup - fix process creation lifecycle (#7981)

pull/7932/head^2
Zamil Majdy 2024-09-05 13:41:24 -05:00 committed by GitHub
parent d62b940baf
commit e5eb42d84a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 75 additions and 47 deletions

View File

@ -43,4 +43,4 @@ FROM server_base as server
ENV PORT=8000
ENV DATABASE_URL=""
CMD ["poetry", "run", "app"]
CMD ["poetry", "run", "rest"]

View File

@ -1,61 +1,44 @@
from multiprocessing import freeze_support, set_start_method
from typing import TYPE_CHECKING
import Pyro5.api as pyro
from tenacity import retry, stop_after_attempt, wait_exponential
from .util.logging import configure_logging
if TYPE_CHECKING:
from autogpt_server.util.process import AppProcess
@retry(stop=stop_after_attempt(30), wait=wait_exponential(multiplier=1, min=1, max=30))
def wait_for_nameserver():
pyro.locate_ns(host="localhost", port=9090)
print("NameServer is ready")
def run_processes(processes: list["AppProcess"], **kwargs):
def run_processes(*processes: "AppProcess", **kwargs):
"""
Execute all processes in the app. The last process is run in the foreground.
"""
try:
# Start NameServer first
processes[0].start(background=True, **kwargs)
configure_logging()
# Wait for NameServer to be ready
wait_for_nameserver()
# Start other processes
for process in processes[1:-1]:
for process in processes[:-1]:
process.start(background=True, **kwargs)
# Run the last process in the foreground
processes[-1].start(background=False, **kwargs)
except Exception as e:
finally:
for process in processes:
process.stop()
raise e
def main(**kwargs):
set_start_method("spawn", force=True)
freeze_support()
configure_logging()
"""
Run all the processes required for the AutoGPT-server (REST and WebSocket APIs).
"""
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.server import AgentServer, WebsocketServer
from autogpt_server.util.service import PyroNameServer
run_processes(
[
PyroNameServer(),
ExecutionManager(),
ExecutionScheduler(),
AgentServer(),
],
**kwargs
PyroNameServer(),
ExecutionManager(),
ExecutionScheduler(),
WebsocketServer(),
AgentServer(),
**kwargs,
)

View File

@ -0,0 +1,20 @@
from autogpt_server.app import run_processes
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.util.service import PyroNameServer
def main():
"""
Run all the processes required for the AutoGPT-server REST API.
"""
run_processes(
PyroNameServer(),
ExecutionManager(),
ExecutionScheduler(),
AgentServer(),
)
if __name__ == "__main__":
main()

View File

@ -1,3 +1,4 @@
from .rest_api import AgentServer
from .ws_api import WebsocketServer
__all__ = ["AgentServer"]
__all__ = ["AgentServer", "WebsocketServer"]

View File

@ -1,6 +1,7 @@
import asyncio
import logging
import uvicorn
from autogpt_libs.auth import parse_jwt_token
from fastapi import Depends, FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
@ -9,6 +10,7 @@ from autogpt_server.data.queue import AsyncRedisEventQueue
from autogpt_server.data.user import DEFAULT_USER_ID
from autogpt_server.server.conn_manager import ConnectionManager
from autogpt_server.server.model import ExecutionSubscription, Methods, WsMessage
from autogpt_server.util.service import AppProcess
from autogpt_server.util.settings import Settings
settings = Settings()
@ -166,3 +168,8 @@ async def websocket_router(
except WebSocketDisconnect:
manager.disconnect(websocket)
logging.info("Client Disconnected")
class WebsocketServer(AppProcess):
def run(self):
uvicorn.run(app, host="0.0.0.0", port=8001)

View File

@ -1,7 +1,7 @@
import os
import sys
from abc import ABC, abstractmethod
from multiprocessing import Process, set_start_method
from multiprocessing import Process
from typing import Optional
@ -11,7 +11,6 @@ class AppProcess(ABC):
"""
process: Optional[Process] = None
set_start_method("spawn", force=True)
@abstractmethod
def run(self):
@ -20,6 +19,12 @@ class AppProcess(ABC):
"""
pass
def health_check(self):
"""
A method to check the health of the process.
"""
pass
def execute_run_command(self, silent):
try:
if silent:
@ -61,6 +66,7 @@ class AppProcess(ABC):
**proc_args,
)
self.process.start()
self.health_check()
return self.process.pid or 0
def stop(self):

View File

@ -53,6 +53,14 @@ class PyroNameServer(AppProcess):
except KeyboardInterrupt:
print("Shutting down NameServer")
@conn_retry
def _wait_for_ns(self):
pyro.locate_ns(host="localhost", port=9090)
print("NameServer is ready")
def health_check(self):
self._wait_for_ns()
class AppService(AppProcess):
shared_event_loop: asyncio.AbstractEventLoop

View File

@ -0,0 +1,13 @@
from autogpt_server.app import run_processes
from autogpt_server.server.ws_api import WebsocketServer
def main():
"""
Run all the processes required for the AutoGPT-server WebSocket API.
"""
run_processes(WebsocketServer())
if __name__ == "__main__":
main()

View File

@ -1,11 +0,0 @@
import uvicorn
from autogpt_server.server.ws_api import app
def main():
uvicorn.run(app, host="0.0.0.0", port=8001)
if __name__ == "__main__":
main()

View File

@ -64,7 +64,8 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
app = "autogpt_server.app:main"
ws = "autogpt_server.ws_app:main"
rest = "autogpt_server.rest:main"
ws = "autogpt_server.ws:main"
cli = "autogpt_server.cli:main"
format = "linter:format"
lint = "linter:lint"