From 20fa7e508d0ca34ddcefa229d1c06be28923f341 Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Thu, 23 Jan 2025 17:10:46 -0300 Subject: [PATCH] fix(edgestacks): decouple the EdgeStackStatusUpdateCoordinator so it can be used by other packages BE-11572 (#340) --- .../edgestacks/edgestack_status_update.go | 6 +++- .../edgestack_status_update_coordinator.go | 36 ++++++------------- api/http/handler/edgestacks/edgestack_test.go | 4 +++ api/http/handler/edgestacks/handler.go | 7 ++-- api/http/server.go | 5 ++- 5 files changed, 25 insertions(+), 33 deletions(-) diff --git a/api/http/handler/edgestacks/edgestack_status_update.go b/api/http/handler/edgestacks/edgestack_status_update.go index f3f903385..fc8ea8018 100644 --- a/api/http/handler/edgestacks/edgestack_status_update.go +++ b/api/http/handler/edgestacks/edgestack_status_update.go @@ -77,7 +77,11 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req return httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name)) } - stack, err := handler.stackCoordinator.UpdateStatus(r, endpoint, portainer.EdgeStackID(stackID), payload) + updateFn := func(stack *portainer.EdgeStack) (*portainer.EdgeStack, error) { + return handler.updateEdgeStackStatus(stack, endpoint, r, stack.ID, payload) + } + + stack, err := handler.stackCoordinator.UpdateStatus(r, portainer.EdgeStackID(stackID), updateFn) if err != nil { var httpErr *httperror.HandlerError if errors.As(err, &httpErr) { diff --git a/api/http/handler/edgestacks/edgestack_status_update_coordinator.go b/api/http/handler/edgestacks/edgestack_status_update_coordinator.go index 7e557e93f..f81e2fc3e 100644 --- a/api/http/handler/edgestacks/edgestack_status_update_coordinator.go +++ b/api/http/handler/edgestacks/edgestack_status_update_coordinator.go @@ -12,11 +12,9 @@ import ( ) type statusRequest struct { - r *http.Request respCh chan statusResponse - endpoint *portainer.Endpoint stackID portainer.EdgeStackID - payload updateStatusPayload + updateFn statusUpdateFn } type statusResponse struct { @@ -24,24 +22,19 @@ type statusResponse struct { Error error } -type statusUpdateFn func(stack *portainer.EdgeStack, endpoint *portainer.Endpoint, r *http.Request, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error) +type statusUpdateFn func(*portainer.EdgeStack) (*portainer.EdgeStack, error) type EdgeStackStatusUpdateCoordinator struct { - updateCh chan statusRequest - dataStore dataservices.DataStore - statusUpdateFn statusUpdateFn + updateCh chan statusRequest + dataStore dataservices.DataStore } var errAnotherStackUpdateInProgress = errors.New("another stack update is in progress") -func NewEdgeStackStatusUpdateCoordinator( - dataStore dataservices.DataStore, - statusUpdateFn statusUpdateFn, -) *EdgeStackStatusUpdateCoordinator { +func NewEdgeStackStatusUpdateCoordinator(dataStore dataservices.DataStore) *EdgeStackStatusUpdateCoordinator { return &EdgeStackStatusUpdateCoordinator{ - updateCh: make(chan statusRequest), - dataStore: dataStore, - statusUpdateFn: statusUpdateFn, + updateCh: make(chan statusRequest), + dataStore: dataStore, } } @@ -69,7 +62,7 @@ func (c *EdgeStackStatusUpdateCoordinator) loop() { // 2. Mutate the edge stack opportunistically until there are no more pending updates for { - stack, err = c.statusUpdateFn(stack, u.endpoint, u.r, stack.ID, u.payload) + stack, err = u.updateFn(stack) if err != nil { return err } @@ -135,23 +128,14 @@ func (c *EdgeStackStatusUpdateCoordinator) getNextUpdate(stackID portainer.EdgeS } } -func (c *EdgeStackStatusUpdateCoordinator) UpdateStatus( - r *http.Request, - endpoint *portainer.Endpoint, - stackID portainer.EdgeStackID, - payload updateStatusPayload) ( - *portainer.EdgeStack, - error, -) { +func (c *EdgeStackStatusUpdateCoordinator) UpdateStatus(r *http.Request, stackID portainer.EdgeStackID, updateFn statusUpdateFn) (*portainer.EdgeStack, error) { respCh := make(chan statusResponse) defer close(respCh) msg := statusRequest{ respCh: respCh, - r: r, - endpoint: endpoint, stackID: stackID, - payload: payload, + updateFn: updateFn, } select { diff --git a/api/http/handler/edgestacks/edgestack_test.go b/api/http/handler/edgestacks/edgestack_test.go index 496f2de53..ce1e9b659 100644 --- a/api/http/handler/edgestacks/edgestack_test.go +++ b/api/http/handler/edgestacks/edgestack_test.go @@ -51,10 +51,14 @@ func setupHandler(t *testing.T) (*Handler, string) { t.Fatal(err) } + coord := NewEdgeStackStatusUpdateCoordinator(store) + go coord.Start() + handler := NewHandler( security.NewRequestBouncer(store, jwtService, apiKeyService), store, edgestacks.NewService(store), + coord, ) handler.FileService = fs diff --git a/api/http/handler/edgestacks/handler.go b/api/http/handler/edgestacks/handler.go index 6a4a51e3e..290524cb0 100644 --- a/api/http/handler/edgestacks/handler.go +++ b/api/http/handler/edgestacks/handler.go @@ -26,18 +26,15 @@ type Handler struct { } // NewHandler creates a handler to manage environment(endpoint) group operations. -func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, edgeStacksService *edgestackservice.Service) *Handler { +func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, edgeStacksService *edgestackservice.Service, stackCoordinator *EdgeStackStatusUpdateCoordinator) *Handler { h := &Handler{ Router: mux.NewRouter(), requestBouncer: bouncer, DataStore: dataStore, edgeStacksService: edgeStacksService, + stackCoordinator: stackCoordinator, } - h.stackCoordinator = NewEdgeStackStatusUpdateCoordinator(dataStore, h.updateEdgeStackStatus) - - go h.stackCoordinator.Start() - h.Handle("/edge_stacks/create/{method}", bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackCreate)))).Methods(http.MethodPost) h.Handle("/edge_stacks", diff --git a/api/http/server.go b/api/http/server.go index 9fc55757f..ec86e6220 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -161,7 +161,10 @@ func (server *Server) Start() error { edgeJobsHandler.FileService = server.FileService edgeJobsHandler.ReverseTunnelService = server.ReverseTunnelService - var edgeStacksHandler = edgestacks.NewHandler(requestBouncer, server.DataStore, server.EdgeStacksService) + edgeStackCoordinator := edgestacks.NewEdgeStackStatusUpdateCoordinator(server.DataStore) + go edgeStackCoordinator.Start() + + var edgeStacksHandler = edgestacks.NewHandler(requestBouncer, server.DataStore, server.EdgeStacksService, edgeStackCoordinator) edgeStacksHandler.FileService = server.FileService edgeStacksHandler.GitService = server.GitService edgeStacksHandler.KubernetesDeployer = server.KubernetesDeployer