fix(edgestacks): decouple the EdgeStackStatusUpdateCoordinator so it can be used by other packages BE-11572 (#340)
parent
ebffc340d9
commit
20fa7e508d
|
@ -77,7 +77,11 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req
|
|||
return httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name))
|
||||
}
|
||||
|
||||
stack, err := handler.stackCoordinator.UpdateStatus(r, endpoint, portainer.EdgeStackID(stackID), payload)
|
||||
updateFn := func(stack *portainer.EdgeStack) (*portainer.EdgeStack, error) {
|
||||
return handler.updateEdgeStackStatus(stack, endpoint, r, stack.ID, payload)
|
||||
}
|
||||
|
||||
stack, err := handler.stackCoordinator.UpdateStatus(r, portainer.EdgeStackID(stackID), updateFn)
|
||||
if err != nil {
|
||||
var httpErr *httperror.HandlerError
|
||||
if errors.As(err, &httpErr) {
|
||||
|
|
|
@ -12,11 +12,9 @@ import (
|
|||
)
|
||||
|
||||
type statusRequest struct {
|
||||
r *http.Request
|
||||
respCh chan statusResponse
|
||||
endpoint *portainer.Endpoint
|
||||
stackID portainer.EdgeStackID
|
||||
payload updateStatusPayload
|
||||
updateFn statusUpdateFn
|
||||
}
|
||||
|
||||
type statusResponse struct {
|
||||
|
@ -24,24 +22,19 @@ type statusResponse struct {
|
|||
Error error
|
||||
}
|
||||
|
||||
type statusUpdateFn func(stack *portainer.EdgeStack, endpoint *portainer.Endpoint, r *http.Request, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error)
|
||||
type statusUpdateFn func(*portainer.EdgeStack) (*portainer.EdgeStack, error)
|
||||
|
||||
type EdgeStackStatusUpdateCoordinator struct {
|
||||
updateCh chan statusRequest
|
||||
dataStore dataservices.DataStore
|
||||
statusUpdateFn statusUpdateFn
|
||||
updateCh chan statusRequest
|
||||
dataStore dataservices.DataStore
|
||||
}
|
||||
|
||||
var errAnotherStackUpdateInProgress = errors.New("another stack update is in progress")
|
||||
|
||||
func NewEdgeStackStatusUpdateCoordinator(
|
||||
dataStore dataservices.DataStore,
|
||||
statusUpdateFn statusUpdateFn,
|
||||
) *EdgeStackStatusUpdateCoordinator {
|
||||
func NewEdgeStackStatusUpdateCoordinator(dataStore dataservices.DataStore) *EdgeStackStatusUpdateCoordinator {
|
||||
return &EdgeStackStatusUpdateCoordinator{
|
||||
updateCh: make(chan statusRequest),
|
||||
dataStore: dataStore,
|
||||
statusUpdateFn: statusUpdateFn,
|
||||
updateCh: make(chan statusRequest),
|
||||
dataStore: dataStore,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,7 +62,7 @@ func (c *EdgeStackStatusUpdateCoordinator) loop() {
|
|||
|
||||
// 2. Mutate the edge stack opportunistically until there are no more pending updates
|
||||
for {
|
||||
stack, err = c.statusUpdateFn(stack, u.endpoint, u.r, stack.ID, u.payload)
|
||||
stack, err = u.updateFn(stack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -135,23 +128,14 @@ func (c *EdgeStackStatusUpdateCoordinator) getNextUpdate(stackID portainer.EdgeS
|
|||
}
|
||||
}
|
||||
|
||||
func (c *EdgeStackStatusUpdateCoordinator) UpdateStatus(
|
||||
r *http.Request,
|
||||
endpoint *portainer.Endpoint,
|
||||
stackID portainer.EdgeStackID,
|
||||
payload updateStatusPayload) (
|
||||
*portainer.EdgeStack,
|
||||
error,
|
||||
) {
|
||||
func (c *EdgeStackStatusUpdateCoordinator) UpdateStatus(r *http.Request, stackID portainer.EdgeStackID, updateFn statusUpdateFn) (*portainer.EdgeStack, error) {
|
||||
respCh := make(chan statusResponse)
|
||||
defer close(respCh)
|
||||
|
||||
msg := statusRequest{
|
||||
respCh: respCh,
|
||||
r: r,
|
||||
endpoint: endpoint,
|
||||
stackID: stackID,
|
||||
payload: payload,
|
||||
updateFn: updateFn,
|
||||
}
|
||||
|
||||
select {
|
||||
|
|
|
@ -51,10 +51,14 @@ func setupHandler(t *testing.T) (*Handler, string) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
coord := NewEdgeStackStatusUpdateCoordinator(store)
|
||||
go coord.Start()
|
||||
|
||||
handler := NewHandler(
|
||||
security.NewRequestBouncer(store, jwtService, apiKeyService),
|
||||
store,
|
||||
edgestacks.NewService(store),
|
||||
coord,
|
||||
)
|
||||
|
||||
handler.FileService = fs
|
||||
|
|
|
@ -26,18 +26,15 @@ type Handler struct {
|
|||
}
|
||||
|
||||
// NewHandler creates a handler to manage environment(endpoint) group operations.
|
||||
func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, edgeStacksService *edgestackservice.Service) *Handler {
|
||||
func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, edgeStacksService *edgestackservice.Service, stackCoordinator *EdgeStackStatusUpdateCoordinator) *Handler {
|
||||
h := &Handler{
|
||||
Router: mux.NewRouter(),
|
||||
requestBouncer: bouncer,
|
||||
DataStore: dataStore,
|
||||
edgeStacksService: edgeStacksService,
|
||||
stackCoordinator: stackCoordinator,
|
||||
}
|
||||
|
||||
h.stackCoordinator = NewEdgeStackStatusUpdateCoordinator(dataStore, h.updateEdgeStackStatus)
|
||||
|
||||
go h.stackCoordinator.Start()
|
||||
|
||||
h.Handle("/edge_stacks/create/{method}",
|
||||
bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackCreate)))).Methods(http.MethodPost)
|
||||
h.Handle("/edge_stacks",
|
||||
|
|
|
@ -161,7 +161,10 @@ func (server *Server) Start() error {
|
|||
edgeJobsHandler.FileService = server.FileService
|
||||
edgeJobsHandler.ReverseTunnelService = server.ReverseTunnelService
|
||||
|
||||
var edgeStacksHandler = edgestacks.NewHandler(requestBouncer, server.DataStore, server.EdgeStacksService)
|
||||
edgeStackCoordinator := edgestacks.NewEdgeStackStatusUpdateCoordinator(server.DataStore)
|
||||
go edgeStackCoordinator.Start()
|
||||
|
||||
var edgeStacksHandler = edgestacks.NewHandler(requestBouncer, server.DataStore, server.EdgeStacksService, edgeStackCoordinator)
|
||||
edgeStacksHandler.FileService = server.FileService
|
||||
edgeStacksHandler.GitService = server.GitService
|
||||
edgeStacksHandler.KubernetesDeployer = server.KubernetesDeployer
|
||||
|
|
Loading…
Reference in New Issue