diff --git a/api/chisel/schedules.go b/api/chisel/schedules.go index 0afa587bd..1ae4db571 100644 --- a/api/chisel/schedules.go +++ b/api/chisel/schedules.go @@ -44,3 +44,20 @@ func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) { service.mu.Unlock() } + +func (service *Service) RemoveEdgeJobFromEndpoint(endpointID portainer.EndpointID, edgeJobID portainer.EdgeJobID) { + service.mu.Lock() + tunnel := service.getTunnelDetails(endpointID) + + n := 0 + for _, edgeJob := range tunnel.Jobs { + if edgeJob.ID != edgeJobID { + tunnel.Jobs[n] = edgeJob + n++ + } + } + + tunnel.Jobs = tunnel.Jobs[:n] + + service.mu.Unlock() +} diff --git a/api/http/handler/edgegroups/edgegroup_create.go b/api/http/handler/edgegroups/edgegroup_create.go index 5866bf751..2d095cc03 100644 --- a/api/http/handler/edgegroups/edgegroup_create.go +++ b/api/http/handler/edgegroups/edgegroup_create.go @@ -23,13 +23,13 @@ type edgeGroupCreatePayload struct { func (payload *edgeGroupCreatePayload) Validate(r *http.Request) error { if govalidator.IsNull(payload.Name) { - return errors.New("Invalid Edge group name") + return errors.New("invalid Edge group name") } if payload.Dynamic && (payload.TagIDs == nil || len(payload.TagIDs) == 0) { - return errors.New("TagIDs is mandatory for a dynamic Edge group") + return errors.New("tagIDs is mandatory for a dynamic Edge group") } if !payload.Dynamic && (payload.Endpoints == nil || len(payload.Endpoints) == 0) { - return errors.New("Environment is mandatory for a static Edge group") + return errors.New("environment is mandatory for a static Edge group") } return nil } @@ -61,7 +61,7 @@ func (handler *Handler) edgeGroupCreate(w http.ResponseWriter, r *http.Request) for _, edgeGroup := range edgeGroups { if edgeGroup.Name == payload.Name { - return httperror.BadRequest("Edge group name must be unique", errors.New("Edge group name must be unique")) + return httperror.BadRequest("Edge group name must be unique", errors.New("edge group name must be unique")) } } diff --git a/api/http/handler/edgegroups/edgegroup_delete.go b/api/http/handler/edgegroups/edgegroup_delete.go index de159ab20..9ec4b3b26 100644 --- a/api/http/handler/edgegroups/edgegroup_delete.go +++ b/api/http/handler/edgegroups/edgegroup_delete.go @@ -42,7 +42,20 @@ func (handler *Handler) edgeGroupDelete(w http.ResponseWriter, r *http.Request) for _, edgeStack := range edgeStacks { for _, groupID := range edgeStack.EdgeGroups { if groupID == portainer.EdgeGroupID(edgeGroupID) { - return httperror.Forbidden("Edge group is used by an Edge stack", errors.New("Edge group is used by an Edge stack")) + return httperror.NewError(http.StatusConflict, "Edge group is used by an Edge stack", errors.New("edge group is used by an Edge stack")) + } + } + } + + edgeJobs, err := handler.DataStore.EdgeJob().EdgeJobs() + if err != nil { + return httperror.InternalServerError("Unable to retrieve Edge jobs from the database", err) + } + + for _, edgeJob := range edgeJobs { + for _, groupID := range edgeJob.EdgeGroups { + if groupID == portainer.EdgeGroupID(edgeGroupID) { + return httperror.NewError(http.StatusConflict, "Edge group is used by an Edge job", errors.New("edge group is used by an Edge job")) } } } diff --git a/api/http/handler/edgegroups/edgegroup_list.go b/api/http/handler/edgegroups/edgegroup_list.go index e16787e67..33df22ffb 100644 --- a/api/http/handler/edgegroups/edgegroup_list.go +++ b/api/http/handler/edgegroups/edgegroup_list.go @@ -8,11 +8,13 @@ import ( "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" + "github.com/portainer/portainer/api/internal/slices" ) type decoratedEdgeGroup struct { portainer.EdgeGroup HasEdgeStack bool `json:"HasEdgeStack"` + HasEdgeGroup bool `json:"HasEdgeGroup"` EndpointTypes []portainer.EndpointType } @@ -46,8 +48,21 @@ func (handler *Handler) edgeGroupList(w http.ResponseWriter, r *http.Request) *h } } + edgeJobs, err := handler.DataStore.EdgeJob().EdgeJobs() + if err != nil { + return httperror.InternalServerError("Unable to retrieve Edge jobs from the database", err) + } + decoratedEdgeGroups := []decoratedEdgeGroup{} for _, orgEdgeGroup := range edgeGroups { + usedByEdgeJob := false + for _, edgeJob := range edgeJobs { + if slices.Contains(edgeJob.EdgeGroups, portainer.EdgeGroupID(orgEdgeGroup.ID)) { + usedByEdgeJob = true + break + } + } + edgeGroup := decoratedEdgeGroup{ EdgeGroup: orgEdgeGroup, EndpointTypes: []portainer.EndpointType{}, @@ -63,13 +78,15 @@ func (handler *Handler) edgeGroupList(w http.ResponseWriter, r *http.Request) *h endpointTypes, err := getEndpointTypes(handler.DataStore.Endpoint(), edgeGroup.Endpoints) if err != nil { - return httperror.InternalServerError("Unable to retrieve endpoint types for Edge group", err) + return httperror.InternalServerError("Unable to retrieve environment types for Edge group", err) } edgeGroup.EndpointTypes = endpointTypes edgeGroup.HasEdgeStack = usedEdgeGroups[edgeGroup.ID] + edgeGroup.HasEdgeGroup = usedByEdgeJob + decoratedEdgeGroups = append(decoratedEdgeGroups, edgeGroup) } diff --git a/api/http/handler/edgegroups/edgegroup_update.go b/api/http/handler/edgegroups/edgegroup_update.go index 9676066f2..944dc885e 100644 --- a/api/http/handler/edgegroups/edgegroup_update.go +++ b/api/http/handler/edgegroups/edgegroup_update.go @@ -10,6 +10,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/endpointutils" + "github.com/portainer/portainer/api/internal/slices" "github.com/asaskevich/govalidator" ) @@ -24,13 +25,13 @@ type edgeGroupUpdatePayload struct { func (payload *edgeGroupUpdatePayload) Validate(r *http.Request) error { if govalidator.IsNull(payload.Name) { - return errors.New("Invalid Edge group name") + return errors.New("invalid Edge group name") } if payload.Dynamic && (payload.TagIDs == nil || len(payload.TagIDs) == 0) { - return errors.New("TagIDs is mandatory for a dynamic Edge group") + return errors.New("tagIDs is mandatory for a dynamic Edge group") } if !payload.Dynamic && (payload.Endpoints == nil || len(payload.Endpoints) == 0) { - return errors.New("Environments is mandatory for a static Edge group") + return errors.New("environments is mandatory for a static Edge group") } return nil } @@ -75,7 +76,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request) } for _, edgeGroup := range edgeGroups { if edgeGroup.Name == payload.Name && edgeGroup.ID != portainer.EdgeGroupID(edgeGroupID) { - return httperror.BadRequest("Edge group name must be unique", errors.New("Edge group name must be unique")) + return httperror.BadRequest("Edge group name must be unique", errors.New("edge group name must be unique")) } } @@ -123,17 +124,45 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request) newRelatedEndpoints := edge.EdgeGroupRelatedEndpoints(edgeGroup, endpoints, endpointGroups) endpointsToUpdate := append(newRelatedEndpoints, oldRelatedEndpoints...) + edgeJobs, err := handler.DataStore.EdgeJob().EdgeJobs() + if err != nil { + return httperror.InternalServerError("Unable to fetch Edge jobs", err) + } + for _, endpointID := range endpointsToUpdate { - err = handler.updateEndpoint(endpointID) + err = handler.updateEndpointStacks(endpointID) if err != nil { return httperror.InternalServerError("Unable to persist Environment relation changes inside the database", err) } + + endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + if err != nil { + return httperror.InternalServerError("Unable to get Environment from database", err) + } + + if !endpointutils.IsEdgeEndpoint(endpoint) { + continue + } + + var operation string + if slices.Contains(newRelatedEndpoints, endpointID) { + operation = "add" + } else if slices.Contains(oldRelatedEndpoints, endpointID) { + operation = "remove" + } else { + continue + } + + err = handler.updateEndpointEdgeJobs(edgeGroup.ID, endpointID, edgeJobs, operation) + if err != nil { + return httperror.InternalServerError("Unable to persist Environment Edge Jobs changes inside the database", err) + } } return response.JSON(w, edgeGroup) } -func (handler *Handler) updateEndpoint(endpointID portainer.EndpointID) error { +func (handler *Handler) updateEndpointStacks(endpointID portainer.EndpointID) error { relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID) if err != nil { return err @@ -170,3 +199,20 @@ func (handler *Handler) updateEndpoint(endpointID portainer.EndpointID) error { return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, relation) } + +func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID, endpointID portainer.EndpointID, edgeJobs []portainer.EdgeJob, operation string) error { + for _, edgeJob := range edgeJobs { + if !slices.Contains(edgeJob.EdgeGroups, edgeGroupID) { + continue + } + + switch operation { + case "add": + handler.ReverseTunnelService.AddEdgeJob(endpointID, &edgeJob) + case "remove": + handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) + } + } + + return nil +} diff --git a/api/http/handler/edgegroups/handler.go b/api/http/handler/edgegroups/handler.go index fabd3da9e..5b950a581 100644 --- a/api/http/handler/edgegroups/handler.go +++ b/api/http/handler/edgegroups/handler.go @@ -5,6 +5,7 @@ import ( "github.com/gorilla/mux" httperror "github.com/portainer/libhttp/error" + portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/http/security" ) @@ -12,7 +13,8 @@ import ( // Handler is the HTTP handler used to handle environment(endpoint) group operations. type Handler struct { *mux.Router - DataStore dataservices.DataStore + DataStore dataservices.DataStore + ReverseTunnelService portainer.ReverseTunnelService } // NewHandler creates a handler to manage environment(endpoint) group operations. diff --git a/api/http/handler/edgejobs/edgejob_create.go b/api/http/handler/edgejobs/edgejob_create.go index 4363493c1..e2525f855 100644 --- a/api/http/handler/edgejobs/edgejob_create.go +++ b/api/http/handler/edgejobs/edgejob_create.go @@ -12,7 +12,9 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/endpointutils" + "github.com/portainer/portainer/api/internal/maps" ) // @id EdgeJobCreate @@ -41,7 +43,7 @@ func (handler *Handler) edgeJobCreate(w http.ResponseWriter, r *http.Request) *h case "file": return handler.createEdgeJobFromFile(w, r) default: - return httperror.BadRequest("Invalid query parameter: method. Valid values are: file or string", errors.New(request.ErrInvalidQueryParameter)) + return httperror.BadRequest("Invalid query parameter: method. Valid values are: file or string", errors.New(strings.ToLower(request.ErrInvalidQueryParameter))) } } @@ -50,28 +52,29 @@ type edgeJobCreateFromFileContentPayload struct { CronExpression string Recurring bool Endpoints []portainer.EndpointID + EdgeGroups []portainer.EdgeGroupID FileContent string } func (payload *edgeJobCreateFromFileContentPayload) Validate(r *http.Request) error { if govalidator.IsNull(payload.Name) { - return errors.New("Invalid Edge job name") + return errors.New("invalid Edge job name") } if !govalidator.Matches(payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]*$`) { - return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") + return errors.New("invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") } if govalidator.IsNull(payload.CronExpression) { - return errors.New("Invalid cron expression") + return errors.New("invalid cron expression") } - if payload.Endpoints == nil || len(payload.Endpoints) == 0 { - return errors.New("Invalid environment payload") + if (payload.Endpoints == nil || len(payload.Endpoints) == 0) && (payload.EdgeGroups == nil || len(payload.EdgeGroups) == 0) { + return errors.New("no environments or groups have been provided") } if govalidator.IsNull(payload.FileContent) { - return errors.New("Invalid script file content") + return errors.New("invalid script file content") } return nil @@ -86,7 +89,16 @@ func (handler *Handler) createEdgeJobFromFileContent(w http.ResponseWriter, r *h edgeJob := handler.createEdgeJobObjectFromFileContentPayload(&payload) - err = handler.addAndPersistEdgeJob(edgeJob, []byte(payload.FileContent)) + var endpoints []portainer.EndpointID + if len(edgeJob.EdgeGroups) > 0 { + endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, handler.DataStore) + if err != nil { + return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) + } + } + + err = handler.addAndPersistEdgeJob(edgeJob, []byte(payload.FileContent), endpoints) + if err != nil { return httperror.InternalServerError("Unable to schedule Edge job", err) } @@ -99,36 +111,48 @@ type edgeJobCreateFromFilePayload struct { CronExpression string Recurring bool Endpoints []portainer.EndpointID + EdgeGroups []portainer.EdgeGroupID File []byte } func (payload *edgeJobCreateFromFilePayload) Validate(r *http.Request) error { name, err := request.RetrieveMultiPartFormValue(r, "Name", false) if err != nil { - return errors.New("Invalid Edge job name") + return errors.New("invalid Edge job name") } if !govalidator.Matches(name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) { - return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") + return errors.New("invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") } payload.Name = name cronExpression, err := request.RetrieveMultiPartFormValue(r, "CronExpression", false) if err != nil { - return errors.New("Invalid cron expression") + return errors.New("invalid cron expression") } payload.CronExpression = cronExpression var endpoints []portainer.EndpointID - err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, false) + err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, true) if err != nil { - return errors.New("Invalid environments") + return errors.New("invalid environments") } payload.Endpoints = endpoints + var edgeGroups []portainer.EdgeGroupID + err = request.RetrieveMultiPartFormJSONValue(r, "EdgeGroups", &edgeGroups, true) + if err != nil { + return errors.New("invalid edge groups") + } + payload.EdgeGroups = edgeGroups + + if (payload.Endpoints == nil || len(payload.Endpoints) == 0) && (payload.EdgeGroups == nil || len(payload.EdgeGroups) == 0) { + return errors.New("no environments or groups have been provided") + } + file, _, err := request.RetrieveMultiPartFormFile(r, "file") if err != nil { - return errors.New("Invalid script file. Ensure that the file is uploaded correctly") + return errors.New("invalid script file. Ensure that the file is uploaded correctly") } payload.File = file @@ -144,7 +168,16 @@ func (handler *Handler) createEdgeJobFromFile(w http.ResponseWriter, r *http.Req edgeJob := handler.createEdgeJobObjectFromFilePayload(payload) - err = handler.addAndPersistEdgeJob(edgeJob, payload.File) + var endpoints []portainer.EndpointID + if len(edgeJob.EdgeGroups) > 0 { + endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, handler.DataStore) + if err != nil { + return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) + } + } + + err = handler.addAndPersistEdgeJob(edgeJob, payload.File, endpoints) + if err != nil { return httperror.InternalServerError("Unable to schedule Edge job", err) } @@ -155,16 +188,18 @@ func (handler *Handler) createEdgeJobFromFile(w http.ResponseWriter, r *http.Req func (handler *Handler) createEdgeJobObjectFromFilePayload(payload *edgeJobCreateFromFilePayload) *portainer.EdgeJob { edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier()) - endpoints := convertEndpointsToMetaObject(payload.Endpoints) + endpoints := handler.convertEndpointsToMetaObject(payload.Endpoints) edgeJob := &portainer.EdgeJob{ - ID: edgeJobIdentifier, - Name: payload.Name, - CronExpression: payload.CronExpression, - Recurring: payload.Recurring, - Created: time.Now().Unix(), - Endpoints: endpoints, - Version: 1, + ID: edgeJobIdentifier, + Name: payload.Name, + CronExpression: payload.CronExpression, + Recurring: payload.Recurring, + Created: time.Now().Unix(), + Endpoints: endpoints, + EdgeGroups: payload.EdgeGroups, + Version: 1, + GroupLogsCollection: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}, } return edgeJob @@ -173,22 +208,24 @@ func (handler *Handler) createEdgeJobObjectFromFilePayload(payload *edgeJobCreat func (handler *Handler) createEdgeJobObjectFromFileContentPayload(payload *edgeJobCreateFromFileContentPayload) *portainer.EdgeJob { edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier()) - endpoints := convertEndpointsToMetaObject(payload.Endpoints) + endpoints := handler.convertEndpointsToMetaObject(payload.Endpoints) edgeJob := &portainer.EdgeJob{ - ID: edgeJobIdentifier, - Name: payload.Name, - CronExpression: payload.CronExpression, - Recurring: payload.Recurring, - Created: time.Now().Unix(), - Endpoints: endpoints, - Version: 1, + ID: edgeJobIdentifier, + Name: payload.Name, + CronExpression: payload.CronExpression, + Recurring: payload.Recurring, + Created: time.Now().Unix(), + Endpoints: endpoints, + EdgeGroups: payload.EdgeGroups, + Version: 1, + GroupLogsCollection: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}, } return edgeJob } -func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []byte) error { +func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []byte, endpointsFromGroups []portainer.EndpointID) error { edgeCronExpression := strings.Split(edgeJob.CronExpression, " ") if len(edgeCronExpression) == 6 { edgeCronExpression = edgeCronExpression[1:] @@ -206,29 +243,39 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file [] } } - if len(edgeJob.Endpoints) == 0 { - return errors.New("Environments are mandatory for an Edge job") - } - scriptPath, err := handler.FileService.StoreEdgeJobFileFromBytes(strconv.Itoa(int(edgeJob.ID)), file) if err != nil { return err } edgeJob.ScriptPath = scriptPath - for endpointID := range edgeJob.Endpoints { + var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta + if len(endpointsFromGroups) > 0 { + endpointsMap = handler.convertEndpointsToMetaObject(endpointsFromGroups) + + for ID := range endpointsMap { + endpoint, err := handler.DataStore.Endpoint().Endpoint(ID) + if err != nil { + return err + } + + if !endpointutils.IsEdgeEndpoint(endpoint) { + delete(endpointsMap, ID) + } + } + + maps.Copy(endpointsMap, edgeJob.Endpoints) + } else { + endpointsMap = edgeJob.Endpoints + } + + if len(endpointsMap) == 0 { + return errors.New("environments or edge groups are mandatory for an Edge job") + } + + for endpointID := range endpointsMap { handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) } return handler.DataStore.EdgeJob().Create(edgeJob.ID, edgeJob) } - -func convertEndpointsToMetaObject(endpoints []portainer.EndpointID) map[portainer.EndpointID]portainer.EdgeJobEndpointMeta { - endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{} - - for _, endpointID := range endpoints { - endpointsMap[endpointID] = portainer.EdgeJobEndpointMeta{} - } - - return endpointsMap -} diff --git a/api/http/handler/edgejobs/edgejob_delete.go b/api/http/handler/edgejobs/edgejob_delete.go index 4084b66f5..63bfb7100 100644 --- a/api/http/handler/edgejobs/edgejob_delete.go +++ b/api/http/handler/edgejobs/edgejob_delete.go @@ -8,6 +8,8 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/maps" ) // @id EdgeJobDelete @@ -43,6 +45,23 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h handler.ReverseTunnelService.RemoveEdgeJob(edgeJob.ID) + var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta + if len(edgeJob.EdgeGroups) > 0 { + endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + if err != nil { + return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) + } + + endpointsMap = handler.convertEndpointsToMetaObject(endpoints) + maps.Copy(endpointsMap, edgeJob.Endpoints) + } else { + endpointsMap = edgeJob.Endpoints + } + + for endpointID := range endpointsMap { + handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) + } + err = handler.DataStore.EdgeJob().DeleteEdgeJob(edgeJob.ID) if err != nil { return httperror.InternalServerError("Unable to remove the Edge job from the database", err) diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go index 1a79c0922..e4d3cdced 100644 --- a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go +++ b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go @@ -8,6 +8,8 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/slices" ) // @id EdgeJobTasksClear @@ -43,11 +45,22 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request } endpointID := portainer.EndpointID(taskID) + endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + if err != nil { + return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) + } - meta := edgeJob.Endpoints[endpointID] - meta.CollectLogs = false - meta.LogsStatus = portainer.EdgeJobLogsStatusIdle - edgeJob.Endpoints[endpointID] = meta + if slices.Contains(endpointsFromGroups, endpointID) { + edgeJob.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{ + CollectLogs: false, + LogsStatus: portainer.EdgeJobLogsStatusIdle, + } + } else { + meta := edgeJob.Endpoints[endpointID] + meta.CollectLogs = false + meta.LogsStatus = portainer.EdgeJobLogsStatusIdle + edgeJob.Endpoints[endpointID] = meta + } err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(edgeJobID), strconv.Itoa(taskID)) if err != nil { diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_collect.go b/api/http/handler/edgejobs/edgejob_tasklogs_collect.go index 5b88e5b05..e2281a933 100644 --- a/api/http/handler/edgejobs/edgejob_tasklogs_collect.go +++ b/api/http/handler/edgejobs/edgejob_tasklogs_collect.go @@ -7,6 +7,8 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/slices" ) // @id EdgeJobTasksCollect @@ -42,13 +44,22 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque } endpointID := portainer.EndpointID(taskID) + endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + if err != nil { + return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) + } - meta := edgeJob.Endpoints[endpointID] - meta.CollectLogs = true - meta.LogsStatus = portainer.EdgeJobLogsStatusPending - edgeJob.Endpoints[endpointID] = meta - - handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) + if slices.Contains(endpointsFromGroups, endpointID) { + edgeJob.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{ + CollectLogs: true, + LogsStatus: portainer.EdgeJobLogsStatusPending, + } + } else { + meta := edgeJob.Endpoints[endpointID] + meta.CollectLogs = true + meta.LogsStatus = portainer.EdgeJobLogsStatusPending + edgeJob.Endpoints[endpointID] = meta + } err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) if err != nil { diff --git a/api/http/handler/edgejobs/edgejob_tasks_list.go b/api/http/handler/edgejobs/edgejob_tasks_list.go index 3057b4e7e..7bb7545bb 100644 --- a/api/http/handler/edgejobs/edgejob_tasks_list.go +++ b/api/http/handler/edgejobs/edgejob_tasks_list.go @@ -8,6 +8,8 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/maps" ) type taskContainer struct { @@ -44,8 +46,20 @@ func (handler *Handler) edgeJobTasksList(w http.ResponseWriter, r *http.Request) tasks := make([]taskContainer, 0) - for endpointID, meta := range edgeJob.Endpoints { + endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{} + if len(edgeJob.EdgeGroups) > 0 { + endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + if err != nil { + return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) + } + endpointsMap = handler.convertEndpointsToMetaObject(endpoints) + maps.Copy(endpointsMap, edgeJob.GroupLogsCollection) + } + + maps.Copy(endpointsMap, edgeJob.Endpoints) + + for endpointID, meta := range endpointsMap { cronTask := taskContainer{ ID: fmt.Sprintf("edgejob_task_%d_%d", edgeJob.ID, endpointID), EndpointID: endpointID, diff --git a/api/http/handler/edgejobs/edgejob_update.go b/api/http/handler/edgejobs/edgejob_update.go index 0fd035470..4a75e52ff 100644 --- a/api/http/handler/edgejobs/edgejob_update.go +++ b/api/http/handler/edgejobs/edgejob_update.go @@ -9,6 +9,10 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/internal/edge" + "github.com/portainer/portainer/api/internal/endpointutils" + "github.com/portainer/portainer/api/internal/maps" + "github.com/portainer/portainer/api/internal/slices" "github.com/asaskevich/govalidator" ) @@ -18,12 +22,13 @@ type edgeJobUpdatePayload struct { CronExpression *string Recurring *bool Endpoints []portainer.EndpointID + EdgeGroups []portainer.EdgeGroupID FileContent *string } func (payload *edgeJobUpdatePayload) Validate(r *http.Request) error { if payload.Name != nil && !govalidator.Matches(*payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) { - return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") + return errors.New("invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") } return nil } @@ -80,16 +85,26 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * edgeJob.Name = *payload.Name } + endpointsToAdd := map[portainer.EndpointID]bool{} + endpointsToRemove := map[portainer.EndpointID]bool{} + if payload.Endpoints != nil { endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{} + newEndpoints := endpointutils.EndpointSet(payload.Endpoints) + for endpointID := range edgeJob.Endpoints { + if !newEndpoints[endpointID] { + endpointsToRemove[endpointID] = true + } + } + for _, endpointID := range payload.Endpoints { endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) if err != nil { return err } - if endpoint.Type != portainer.EdgeAgentOnDockerEnvironment && endpoint.Type != portainer.EdgeAgentOnKubernetesEnvironment { + if !endpointutils.IsEdgeEndpoint(endpoint) { continue } @@ -97,12 +112,73 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * endpointsMap[endpointID] = meta } else { endpointsMap[endpointID] = portainer.EdgeJobEndpointMeta{} + endpointsToAdd[endpointID] = true } } edgeJob.Endpoints = endpointsMap } + if len(payload.EdgeGroups) == 0 && len(edgeJob.EdgeGroups) > 0 { + endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + if err != nil { + return errors.New("unable to get endpoints from edge groups") + } + + for _, endpointID := range endpoints { + endpointsToRemove[portainer.EndpointID(endpointID)] = true + } + + edgeJob.EdgeGroups = nil + } + + edgeGroupsToAdd := []portainer.EdgeGroupID{} + edgeGroupsToRemove := []portainer.EdgeGroupID{} + endpointsFromGroupsToAddMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{} + + if len(payload.EdgeGroups) > 0 { + for _, edgeGroupID := range payload.EdgeGroups { + _, err := handler.DataStore.EdgeGroup().EdgeGroup(edgeGroupID) + if err != nil { + return err + } + + if !slices.Contains(edgeJob.EdgeGroups, edgeGroupID) { + edgeGroupsToAdd = append(edgeGroupsToAdd, edgeGroupID) + } + } + + endpointsFromGroupsToAdd, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToAdd, handler.DataStore) + if err != nil { + return errors.New("unable to get endpoints from edge groups") + } + endpointsFromGroupsToAddMap = handler.convertEndpointsToMetaObject(endpointsFromGroupsToAdd) + + for endpointID := range endpointsFromGroupsToAddMap { + endpointsToAdd[endpointID] = true + } + + newEdgeGroups := edge.EdgeGroupSet(payload.EdgeGroups) + for _, edgeGroupID := range edgeJob.EdgeGroups { + if !newEdgeGroups[edgeGroupID] { + edgeGroupsToRemove = append(edgeGroupsToRemove, edgeGroupID) + } + } + + endpointsFromGroupsToRemove, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToRemove, handler.DataStore) + if err != nil { + return errors.New("unable to get endpoints from edge groups") + } + + endpointsToRemoveMap := handler.convertEndpointsToMetaObject(endpointsFromGroupsToRemove) + + for endpointID := range endpointsToRemoveMap { + endpointsToRemove[endpointID] = true + } + + edgeJob.EdgeGroups = payload.EdgeGroups + } + updateVersion := false if payload.CronExpression != nil && *payload.CronExpression != edgeJob.CronExpression { edgeJob.CronExpression = *payload.CronExpression @@ -133,9 +209,15 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * edgeJob.Version++ } - for endpointID := range edgeJob.Endpoints { + maps.Copy(endpointsFromGroupsToAddMap, edgeJob.Endpoints) + + for endpointID := range endpointsFromGroupsToAddMap { handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) } + for endpointID := range endpointsToRemove { + handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) + } + return nil } diff --git a/api/http/handler/edgejobs/handler.go b/api/http/handler/edgejobs/handler.go index 1c066f3e1..e2def868e 100644 --- a/api/http/handler/edgejobs/handler.go +++ b/api/http/handler/edgejobs/handler.go @@ -46,3 +46,13 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler { bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksClear)))).Methods(http.MethodDelete) return h } + +func (handler *Handler) convertEndpointsToMetaObject(endpoints []portainer.EndpointID) map[portainer.EndpointID]portainer.EdgeJobEndpointMeta { + endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{} + + for _, endpointID := range endpoints { + endpointsMap[endpointID] = portainer.EdgeJobEndpointMeta{} + } + + return endpointsMap +} diff --git a/api/http/handler/endpointedge/endpoint_edgejob_logs.go b/api/http/handler/endpointedge/endpoint_edgejob_logs.go index 73498d710..1a38e8928 100644 --- a/api/http/handler/endpointedge/endpoint_edgejob_logs.go +++ b/api/http/handler/endpointedge/endpoint_edgejob_logs.go @@ -65,10 +65,12 @@ func (handler *Handler) endpointEdgeJobsLogs(w http.ResponseWriter, r *http.Requ return httperror.InternalServerError("Unable to save task log to the filesystem", err) } - meta := edgeJob.Endpoints[endpoint.ID] - meta.CollectLogs = false - meta.LogsStatus = portainer.EdgeJobLogsStatusCollected - edgeJob.Endpoints[endpoint.ID] = meta + meta := portainer.EdgeJobEndpointMeta{CollectLogs: false, LogsStatus: portainer.EdgeJobLogsStatusCollected} + if _, ok := edgeJob.GroupLogsCollection[endpoint.ID]; ok { + edgeJob.GroupLogsCollection[endpoint.ID] = meta + } else { + edgeJob.Endpoints[endpoint.ID] = meta + } err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) diff --git a/api/http/handler/endpointedge/endpoint_edgestatus_inspect.go b/api/http/handler/endpointedge/endpoint_edgestatus_inspect.go index 28ae738a9..9c15cbce8 100644 --- a/api/http/handler/endpointedge/endpoint_edgestatus_inspect.go +++ b/api/http/handler/endpointedge/endpoint_edgestatus_inspect.go @@ -158,10 +158,17 @@ func parseAgentPlatform(r *http.Request) (portainer.EndpointType, error) { func (handler *Handler) buildSchedules(endpointID portainer.EndpointID, tunnel portainer.TunnelDetails) ([]edgeJobResponse, *httperror.HandlerError) { schedules := []edgeJobResponse{} for _, job := range tunnel.Jobs { + var collectLogs bool + if _, ok := job.GroupLogsCollection[endpointID]; ok { + collectLogs = job.GroupLogsCollection[endpointID].CollectLogs + } else { + collectLogs = job.Endpoints[endpointID].CollectLogs + } + schedule := edgeJobResponse{ ID: job.ID, CronExpression: job.CronExpression, - CollectLogs: job.Endpoints[endpointID].CollectLogs, + CollectLogs: collectLogs, Version: job.Version, } diff --git a/api/http/server.go b/api/http/server.go index 73b3e8a81..504f9d824 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -150,6 +150,7 @@ func (server *Server) Start() error { var edgeGroupsHandler = edgegroups.NewHandler(requestBouncer) edgeGroupsHandler.DataStore = server.DataStore + edgeGroupsHandler.ReverseTunnelService = server.ReverseTunnelService var edgeJobsHandler = edgejobs.NewHandler(requestBouncer) edgeJobsHandler.DataStore = server.DataStore diff --git a/api/internal/edge/edgegroup.go b/api/internal/edge/edgegroup.go index 5120927c2..69ef8abba 100644 --- a/api/internal/edge/edgegroup.go +++ b/api/internal/edge/edgegroup.go @@ -2,6 +2,7 @@ package edge import ( portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/tag" ) @@ -34,6 +35,40 @@ func EdgeGroupRelatedEndpoints(edgeGroup *portainer.EdgeGroup, endpoints []porta return endpointIDs } +func EdgeGroupSet(edgeGroupIDs []portainer.EdgeGroupID) map[portainer.EdgeGroupID]bool { + set := map[portainer.EdgeGroupID]bool{} + + for _, edgeGroupID := range edgeGroupIDs { + set[edgeGroupID] = true + } + + return set +} + +func GetEndpointsFromEdgeGroups(edgeGroupIDs []portainer.EdgeGroupID, datastore dataservices.DataStore) ([]portainer.EndpointID, error) { + endpoints, err := datastore.Endpoint().Endpoints() + if err != nil { + return nil, err + } + + endpointGroups, err := datastore.EndpointGroup().EndpointGroups() + if err != nil { + return nil, err + } + + var response []portainer.EndpointID + for _, edgeGroupID := range edgeGroupIDs { + edgeGroup, err := datastore.EdgeGroup().EdgeGroup(edgeGroupID) + if err != nil { + return nil, err + } + + response = append(response, EdgeGroupRelatedEndpoints(edgeGroup, endpoints, endpointGroups)...) + } + + return response, nil +} + // edgeGroupRelatedToEndpoint returns true is edgeGroup is associated with environment(endpoint) func edgeGroupRelatedToEndpoint(edgeGroup *portainer.EdgeGroup, endpoint *portainer.Endpoint, endpointGroup *portainer.EndpointGroup) bool { if !edgeGroup.Dynamic { diff --git a/api/internal/maps/maps.go b/api/internal/maps/maps.go new file mode 100644 index 000000000..b3e2a5695 --- /dev/null +++ b/api/internal/maps/maps.go @@ -0,0 +1,34 @@ +package maps + +import "strings" + +// Get a key from a nested map. Not support array for the moment +func Get(mapObj map[string]interface{}, path string, key string) interface{} { + if path == "" { + return mapObj[key] + } + paths := strings.Split(path, ".") + v := mapObj + for _, p := range paths { + if p == "" { + continue + } + value, ok := v[p].(map[string]interface{}) + if ok { + v = value + } else { + return "" + } + } + return v[key] +} + +// Copy copies all key/value pairs in src adding them to dst. +// When a key in src is already present in dst, +// the value in dst will be overwritten by the value associated +// with the key in src. +func Copy[M ~map[K]V, K comparable, V any](dst, src M) { + for k, v := range src { + dst[k] = v + } +} diff --git a/api/internal/maps/maps_test.go b/api/internal/maps/maps_test.go new file mode 100644 index 000000000..16db64072 --- /dev/null +++ b/api/internal/maps/maps_test.go @@ -0,0 +1,38 @@ +package maps + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGet(t *testing.T) { + t.Run("xx", func(t *testing.T) { + jsonStr := "{\"data\":{\"yesterday\":{\"sunrise\":\"06:19\"}}}" + data := make(map[string]interface{}) + err := json.Unmarshal([]byte(jsonStr), &data) + if err != nil { + fmt.Printf("error: %s", err) + return + } + result := Get(data, "data.yesterday", "sunrise") + fmt.Printf("result: %s", result) + expected := "06:19" + assert.Equal(t, expected, result) + }) + t.Run("xx", func(t *testing.T) { + jsonStr := "{\"data\":{\"yesterday\": \"hahaha\"}}" + data := make(map[string]interface{}) + err := json.Unmarshal([]byte(jsonStr), &data) + if err != nil { + fmt.Printf("error: %s", err) + return + } + result := Get(data, "data.yesterday", "sunrise") + fmt.Printf("result: %s", result) + expected := "" + assert.Equal(t, expected, result) + }) +} diff --git a/api/portainer.go b/api/portainer.go index 773d5662e..0f4ceb3ad 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -238,10 +238,14 @@ type ( Created int64 `json:"Created"` CronExpression string `json:"CronExpression"` Endpoints map[EndpointID]EdgeJobEndpointMeta `json:"Endpoints"` + EdgeGroups []EdgeGroupID `json:"EdgeGroups"` Name string `json:"Name"` ScriptPath string `json:"ScriptPath"` Recurring bool `json:"Recurring"` Version int `json:"Version"` + + // Field used for log collection of Endpoints belonging to EdgeGroups + GroupLogsCollection map[EndpointID]EdgeJobEndpointMeta } // EdgeJobEndpointMeta represents a meta data object for an Edge job and Environment(Endpoint) relation @@ -1433,6 +1437,7 @@ type ( GetActiveTunnel(endpoint *Endpoint) (TunnelDetails, error) AddEdgeJob(endpointID EndpointID, edgeJob *EdgeJob) RemoveEdgeJob(edgeJobID EdgeJobID) + RemoveEdgeJobFromEndpoint(endpointID EndpointID, edgeJobID EdgeJobID) } // Server defines the interface to serve the API diff --git a/app/edge/components/edge-job-form/edgeJobForm.html b/app/edge/components/edge-job-form/edgeJobForm.html index 60c7c949a..46b5bbc0a 100644 --- a/app/edge/components/edge-job-form/edgeJobForm.html +++ b/app/edge/components/edge-job-form/edgeJobForm.html @@ -214,6 +214,17 @@ + +