feat(edgejobs): support edge groups when using edge jobs EE-3873 (#8099)

pull/8230/head
matias-portainer 2022-12-19 18:54:51 -03:00 committed by GitHub
parent 9732d1b5d8
commit e1b474d04f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 544 additions and 83 deletions

View File

@ -44,3 +44,20 @@ func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) {
service.mu.Unlock()
}
func (service *Service) RemoveEdgeJobFromEndpoint(endpointID portainer.EndpointID, edgeJobID portainer.EdgeJobID) {
service.mu.Lock()
tunnel := service.getTunnelDetails(endpointID)
n := 0
for _, edgeJob := range tunnel.Jobs {
if edgeJob.ID != edgeJobID {
tunnel.Jobs[n] = edgeJob
n++
}
}
tunnel.Jobs = tunnel.Jobs[:n]
service.mu.Unlock()
}

View File

@ -23,13 +23,13 @@ type edgeGroupCreatePayload struct {
func (payload *edgeGroupCreatePayload) Validate(r *http.Request) error {
if govalidator.IsNull(payload.Name) {
return errors.New("Invalid Edge group name")
return errors.New("invalid Edge group name")
}
if payload.Dynamic && (payload.TagIDs == nil || len(payload.TagIDs) == 0) {
return errors.New("TagIDs is mandatory for a dynamic Edge group")
return errors.New("tagIDs is mandatory for a dynamic Edge group")
}
if !payload.Dynamic && (payload.Endpoints == nil || len(payload.Endpoints) == 0) {
return errors.New("Environment is mandatory for a static Edge group")
return errors.New("environment is mandatory for a static Edge group")
}
return nil
}
@ -61,7 +61,7 @@ func (handler *Handler) edgeGroupCreate(w http.ResponseWriter, r *http.Request)
for _, edgeGroup := range edgeGroups {
if edgeGroup.Name == payload.Name {
return httperror.BadRequest("Edge group name must be unique", errors.New("Edge group name must be unique"))
return httperror.BadRequest("Edge group name must be unique", errors.New("edge group name must be unique"))
}
}

View File

@ -42,7 +42,20 @@ func (handler *Handler) edgeGroupDelete(w http.ResponseWriter, r *http.Request)
for _, edgeStack := range edgeStacks {
for _, groupID := range edgeStack.EdgeGroups {
if groupID == portainer.EdgeGroupID(edgeGroupID) {
return httperror.Forbidden("Edge group is used by an Edge stack", errors.New("Edge group is used by an Edge stack"))
return httperror.NewError(http.StatusConflict, "Edge group is used by an Edge stack", errors.New("edge group is used by an Edge stack"))
}
}
}
edgeJobs, err := handler.DataStore.EdgeJob().EdgeJobs()
if err != nil {
return httperror.InternalServerError("Unable to retrieve Edge jobs from the database", err)
}
for _, edgeJob := range edgeJobs {
for _, groupID := range edgeJob.EdgeGroups {
if groupID == portainer.EdgeGroupID(edgeGroupID) {
return httperror.NewError(http.StatusConflict, "Edge group is used by an Edge job", errors.New("edge group is used by an Edge job"))
}
}
}

View File

@ -8,11 +8,13 @@ import (
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/slices"
)
type decoratedEdgeGroup struct {
portainer.EdgeGroup
HasEdgeStack bool `json:"HasEdgeStack"`
HasEdgeGroup bool `json:"HasEdgeGroup"`
EndpointTypes []portainer.EndpointType
}
@ -46,8 +48,21 @@ func (handler *Handler) edgeGroupList(w http.ResponseWriter, r *http.Request) *h
}
}
edgeJobs, err := handler.DataStore.EdgeJob().EdgeJobs()
if err != nil {
return httperror.InternalServerError("Unable to retrieve Edge jobs from the database", err)
}
decoratedEdgeGroups := []decoratedEdgeGroup{}
for _, orgEdgeGroup := range edgeGroups {
usedByEdgeJob := false
for _, edgeJob := range edgeJobs {
if slices.Contains(edgeJob.EdgeGroups, portainer.EdgeGroupID(orgEdgeGroup.ID)) {
usedByEdgeJob = true
break
}
}
edgeGroup := decoratedEdgeGroup{
EdgeGroup: orgEdgeGroup,
EndpointTypes: []portainer.EndpointType{},
@ -63,13 +78,15 @@ func (handler *Handler) edgeGroupList(w http.ResponseWriter, r *http.Request) *h
endpointTypes, err := getEndpointTypes(handler.DataStore.Endpoint(), edgeGroup.Endpoints)
if err != nil {
return httperror.InternalServerError("Unable to retrieve endpoint types for Edge group", err)
return httperror.InternalServerError("Unable to retrieve environment types for Edge group", err)
}
edgeGroup.EndpointTypes = endpointTypes
edgeGroup.HasEdgeStack = usedEdgeGroups[edgeGroup.ID]
edgeGroup.HasEdgeGroup = usedByEdgeJob
decoratedEdgeGroups = append(decoratedEdgeGroups, edgeGroup)
}

View File

@ -10,6 +10,7 @@ import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/internal/slices"
"github.com/asaskevich/govalidator"
)
@ -24,13 +25,13 @@ type edgeGroupUpdatePayload struct {
func (payload *edgeGroupUpdatePayload) Validate(r *http.Request) error {
if govalidator.IsNull(payload.Name) {
return errors.New("Invalid Edge group name")
return errors.New("invalid Edge group name")
}
if payload.Dynamic && (payload.TagIDs == nil || len(payload.TagIDs) == 0) {
return errors.New("TagIDs is mandatory for a dynamic Edge group")
return errors.New("tagIDs is mandatory for a dynamic Edge group")
}
if !payload.Dynamic && (payload.Endpoints == nil || len(payload.Endpoints) == 0) {
return errors.New("Environments is mandatory for a static Edge group")
return errors.New("environments is mandatory for a static Edge group")
}
return nil
}
@ -75,7 +76,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request)
}
for _, edgeGroup := range edgeGroups {
if edgeGroup.Name == payload.Name && edgeGroup.ID != portainer.EdgeGroupID(edgeGroupID) {
return httperror.BadRequest("Edge group name must be unique", errors.New("Edge group name must be unique"))
return httperror.BadRequest("Edge group name must be unique", errors.New("edge group name must be unique"))
}
}
@ -123,17 +124,45 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request)
newRelatedEndpoints := edge.EdgeGroupRelatedEndpoints(edgeGroup, endpoints, endpointGroups)
endpointsToUpdate := append(newRelatedEndpoints, oldRelatedEndpoints...)
edgeJobs, err := handler.DataStore.EdgeJob().EdgeJobs()
if err != nil {
return httperror.InternalServerError("Unable to fetch Edge jobs", err)
}
for _, endpointID := range endpointsToUpdate {
err = handler.updateEndpoint(endpointID)
err = handler.updateEndpointStacks(endpointID)
if err != nil {
return httperror.InternalServerError("Unable to persist Environment relation changes inside the database", err)
}
endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID)
if err != nil {
return httperror.InternalServerError("Unable to get Environment from database", err)
}
if !endpointutils.IsEdgeEndpoint(endpoint) {
continue
}
var operation string
if slices.Contains(newRelatedEndpoints, endpointID) {
operation = "add"
} else if slices.Contains(oldRelatedEndpoints, endpointID) {
operation = "remove"
} else {
continue
}
err = handler.updateEndpointEdgeJobs(edgeGroup.ID, endpointID, edgeJobs, operation)
if err != nil {
return httperror.InternalServerError("Unable to persist Environment Edge Jobs changes inside the database", err)
}
}
return response.JSON(w, edgeGroup)
}
func (handler *Handler) updateEndpoint(endpointID portainer.EndpointID) error {
func (handler *Handler) updateEndpointStacks(endpointID portainer.EndpointID) error {
relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID)
if err != nil {
return err
@ -170,3 +199,20 @@ func (handler *Handler) updateEndpoint(endpointID portainer.EndpointID) error {
return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, relation)
}
func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID, endpointID portainer.EndpointID, edgeJobs []portainer.EdgeJob, operation string) error {
for _, edgeJob := range edgeJobs {
if !slices.Contains(edgeJob.EdgeGroups, edgeGroupID) {
continue
}
switch operation {
case "add":
handler.ReverseTunnelService.AddEdgeJob(endpointID, &edgeJob)
case "remove":
handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID)
}
}
return nil
}

View File

@ -5,6 +5,7 @@ import (
"github.com/gorilla/mux"
httperror "github.com/portainer/libhttp/error"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/http/security"
)
@ -12,7 +13,8 @@ import (
// Handler is the HTTP handler used to handle environment(endpoint) group operations.
type Handler struct {
*mux.Router
DataStore dataservices.DataStore
DataStore dataservices.DataStore
ReverseTunnelService portainer.ReverseTunnelService
}
// NewHandler creates a handler to manage environment(endpoint) group operations.

View File

@ -12,7 +12,9 @@ import (
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/internal/maps"
)
// @id EdgeJobCreate
@ -41,7 +43,7 @@ func (handler *Handler) edgeJobCreate(w http.ResponseWriter, r *http.Request) *h
case "file":
return handler.createEdgeJobFromFile(w, r)
default:
return httperror.BadRequest("Invalid query parameter: method. Valid values are: file or string", errors.New(request.ErrInvalidQueryParameter))
return httperror.BadRequest("Invalid query parameter: method. Valid values are: file or string", errors.New(strings.ToLower(request.ErrInvalidQueryParameter)))
}
}
@ -50,28 +52,29 @@ type edgeJobCreateFromFileContentPayload struct {
CronExpression string
Recurring bool
Endpoints []portainer.EndpointID
EdgeGroups []portainer.EdgeGroupID
FileContent string
}
func (payload *edgeJobCreateFromFileContentPayload) Validate(r *http.Request) error {
if govalidator.IsNull(payload.Name) {
return errors.New("Invalid Edge job name")
return errors.New("invalid Edge job name")
}
if !govalidator.Matches(payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]*$`) {
return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]")
return errors.New("invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]")
}
if govalidator.IsNull(payload.CronExpression) {
return errors.New("Invalid cron expression")
return errors.New("invalid cron expression")
}
if payload.Endpoints == nil || len(payload.Endpoints) == 0 {
return errors.New("Invalid environment payload")
if (payload.Endpoints == nil || len(payload.Endpoints) == 0) && (payload.EdgeGroups == nil || len(payload.EdgeGroups) == 0) {
return errors.New("no environments or groups have been provided")
}
if govalidator.IsNull(payload.FileContent) {
return errors.New("Invalid script file content")
return errors.New("invalid script file content")
}
return nil
@ -86,7 +89,16 @@ func (handler *Handler) createEdgeJobFromFileContent(w http.ResponseWriter, r *h
edgeJob := handler.createEdgeJobObjectFromFileContentPayload(&payload)
err = handler.addAndPersistEdgeJob(edgeJob, []byte(payload.FileContent))
var endpoints []portainer.EndpointID
if len(edgeJob.EdgeGroups) > 0 {
endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, handler.DataStore)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
}
err = handler.addAndPersistEdgeJob(edgeJob, []byte(payload.FileContent), endpoints)
if err != nil {
return httperror.InternalServerError("Unable to schedule Edge job", err)
}
@ -99,36 +111,48 @@ type edgeJobCreateFromFilePayload struct {
CronExpression string
Recurring bool
Endpoints []portainer.EndpointID
EdgeGroups []portainer.EdgeGroupID
File []byte
}
func (payload *edgeJobCreateFromFilePayload) Validate(r *http.Request) error {
name, err := request.RetrieveMultiPartFormValue(r, "Name", false)
if err != nil {
return errors.New("Invalid Edge job name")
return errors.New("invalid Edge job name")
}
if !govalidator.Matches(name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) {
return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]")
return errors.New("invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]")
}
payload.Name = name
cronExpression, err := request.RetrieveMultiPartFormValue(r, "CronExpression", false)
if err != nil {
return errors.New("Invalid cron expression")
return errors.New("invalid cron expression")
}
payload.CronExpression = cronExpression
var endpoints []portainer.EndpointID
err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, false)
err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, true)
if err != nil {
return errors.New("Invalid environments")
return errors.New("invalid environments")
}
payload.Endpoints = endpoints
var edgeGroups []portainer.EdgeGroupID
err = request.RetrieveMultiPartFormJSONValue(r, "EdgeGroups", &edgeGroups, true)
if err != nil {
return errors.New("invalid edge groups")
}
payload.EdgeGroups = edgeGroups
if (payload.Endpoints == nil || len(payload.Endpoints) == 0) && (payload.EdgeGroups == nil || len(payload.EdgeGroups) == 0) {
return errors.New("no environments or groups have been provided")
}
file, _, err := request.RetrieveMultiPartFormFile(r, "file")
if err != nil {
return errors.New("Invalid script file. Ensure that the file is uploaded correctly")
return errors.New("invalid script file. Ensure that the file is uploaded correctly")
}
payload.File = file
@ -144,7 +168,16 @@ func (handler *Handler) createEdgeJobFromFile(w http.ResponseWriter, r *http.Req
edgeJob := handler.createEdgeJobObjectFromFilePayload(payload)
err = handler.addAndPersistEdgeJob(edgeJob, payload.File)
var endpoints []portainer.EndpointID
if len(edgeJob.EdgeGroups) > 0 {
endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, handler.DataStore)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
}
err = handler.addAndPersistEdgeJob(edgeJob, payload.File, endpoints)
if err != nil {
return httperror.InternalServerError("Unable to schedule Edge job", err)
}
@ -155,16 +188,18 @@ func (handler *Handler) createEdgeJobFromFile(w http.ResponseWriter, r *http.Req
func (handler *Handler) createEdgeJobObjectFromFilePayload(payload *edgeJobCreateFromFilePayload) *portainer.EdgeJob {
edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier())
endpoints := convertEndpointsToMetaObject(payload.Endpoints)
endpoints := handler.convertEndpointsToMetaObject(payload.Endpoints)
edgeJob := &portainer.EdgeJob{
ID: edgeJobIdentifier,
Name: payload.Name,
CronExpression: payload.CronExpression,
Recurring: payload.Recurring,
Created: time.Now().Unix(),
Endpoints: endpoints,
Version: 1,
ID: edgeJobIdentifier,
Name: payload.Name,
CronExpression: payload.CronExpression,
Recurring: payload.Recurring,
Created: time.Now().Unix(),
Endpoints: endpoints,
EdgeGroups: payload.EdgeGroups,
Version: 1,
GroupLogsCollection: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{},
}
return edgeJob
@ -173,22 +208,24 @@ func (handler *Handler) createEdgeJobObjectFromFilePayload(payload *edgeJobCreat
func (handler *Handler) createEdgeJobObjectFromFileContentPayload(payload *edgeJobCreateFromFileContentPayload) *portainer.EdgeJob {
edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier())
endpoints := convertEndpointsToMetaObject(payload.Endpoints)
endpoints := handler.convertEndpointsToMetaObject(payload.Endpoints)
edgeJob := &portainer.EdgeJob{
ID: edgeJobIdentifier,
Name: payload.Name,
CronExpression: payload.CronExpression,
Recurring: payload.Recurring,
Created: time.Now().Unix(),
Endpoints: endpoints,
Version: 1,
ID: edgeJobIdentifier,
Name: payload.Name,
CronExpression: payload.CronExpression,
Recurring: payload.Recurring,
Created: time.Now().Unix(),
Endpoints: endpoints,
EdgeGroups: payload.EdgeGroups,
Version: 1,
GroupLogsCollection: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{},
}
return edgeJob
}
func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []byte) error {
func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []byte, endpointsFromGroups []portainer.EndpointID) error {
edgeCronExpression := strings.Split(edgeJob.CronExpression, " ")
if len(edgeCronExpression) == 6 {
edgeCronExpression = edgeCronExpression[1:]
@ -206,29 +243,39 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []
}
}
if len(edgeJob.Endpoints) == 0 {
return errors.New("Environments are mandatory for an Edge job")
}
scriptPath, err := handler.FileService.StoreEdgeJobFileFromBytes(strconv.Itoa(int(edgeJob.ID)), file)
if err != nil {
return err
}
edgeJob.ScriptPath = scriptPath
for endpointID := range edgeJob.Endpoints {
var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta
if len(endpointsFromGroups) > 0 {
endpointsMap = handler.convertEndpointsToMetaObject(endpointsFromGroups)
for ID := range endpointsMap {
endpoint, err := handler.DataStore.Endpoint().Endpoint(ID)
if err != nil {
return err
}
if !endpointutils.IsEdgeEndpoint(endpoint) {
delete(endpointsMap, ID)
}
}
maps.Copy(endpointsMap, edgeJob.Endpoints)
} else {
endpointsMap = edgeJob.Endpoints
}
if len(endpointsMap) == 0 {
return errors.New("environments or edge groups are mandatory for an Edge job")
}
for endpointID := range endpointsMap {
handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob)
}
return handler.DataStore.EdgeJob().Create(edgeJob.ID, edgeJob)
}
func convertEndpointsToMetaObject(endpoints []portainer.EndpointID) map[portainer.EndpointID]portainer.EdgeJobEndpointMeta {
endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}
for _, endpointID := range endpoints {
endpointsMap[endpointID] = portainer.EdgeJobEndpointMeta{}
}
return endpointsMap
}

View File

@ -8,6 +8,8 @@ import (
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/maps"
)
// @id EdgeJobDelete
@ -43,6 +45,23 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h
handler.ReverseTunnelService.RemoveEdgeJob(edgeJob.ID)
var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta
if len(edgeJob.EdgeGroups) > 0 {
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
endpointsMap = handler.convertEndpointsToMetaObject(endpoints)
maps.Copy(endpointsMap, edgeJob.Endpoints)
} else {
endpointsMap = edgeJob.Endpoints
}
for endpointID := range endpointsMap {
handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID)
}
err = handler.DataStore.EdgeJob().DeleteEdgeJob(edgeJob.ID)
if err != nil {
return httperror.InternalServerError("Unable to remove the Edge job from the database", err)

View File

@ -8,6 +8,8 @@ import (
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/slices"
)
// @id EdgeJobTasksClear
@ -43,11 +45,22 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request
}
endpointID := portainer.EndpointID(taskID)
endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
meta := edgeJob.Endpoints[endpointID]
meta.CollectLogs = false
meta.LogsStatus = portainer.EdgeJobLogsStatusIdle
edgeJob.Endpoints[endpointID] = meta
if slices.Contains(endpointsFromGroups, endpointID) {
edgeJob.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{
CollectLogs: false,
LogsStatus: portainer.EdgeJobLogsStatusIdle,
}
} else {
meta := edgeJob.Endpoints[endpointID]
meta.CollectLogs = false
meta.LogsStatus = portainer.EdgeJobLogsStatusIdle
edgeJob.Endpoints[endpointID] = meta
}
err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(edgeJobID), strconv.Itoa(taskID))
if err != nil {

View File

@ -7,6 +7,8 @@ import (
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/slices"
)
// @id EdgeJobTasksCollect
@ -42,13 +44,22 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque
}
endpointID := portainer.EndpointID(taskID)
endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
meta := edgeJob.Endpoints[endpointID]
meta.CollectLogs = true
meta.LogsStatus = portainer.EdgeJobLogsStatusPending
edgeJob.Endpoints[endpointID] = meta
handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob)
if slices.Contains(endpointsFromGroups, endpointID) {
edgeJob.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{
CollectLogs: true,
LogsStatus: portainer.EdgeJobLogsStatusPending,
}
} else {
meta := edgeJob.Endpoints[endpointID]
meta.CollectLogs = true
meta.LogsStatus = portainer.EdgeJobLogsStatusPending
edgeJob.Endpoints[endpointID] = meta
}
err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob)
if err != nil {

View File

@ -8,6 +8,8 @@ import (
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/maps"
)
type taskContainer struct {
@ -44,8 +46,20 @@ func (handler *Handler) edgeJobTasksList(w http.ResponseWriter, r *http.Request)
tasks := make([]taskContainer, 0)
for endpointID, meta := range edgeJob.Endpoints {
endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}
if len(edgeJob.EdgeGroups) > 0 {
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
if err != nil {
return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err)
}
endpointsMap = handler.convertEndpointsToMetaObject(endpoints)
maps.Copy(endpointsMap, edgeJob.GroupLogsCollection)
}
maps.Copy(endpointsMap, edgeJob.Endpoints)
for endpointID, meta := range endpointsMap {
cronTask := taskContainer{
ID: fmt.Sprintf("edgejob_task_%d_%d", edgeJob.ID, endpointID),
EndpointID: endpointID,

View File

@ -9,6 +9,10 @@ import (
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/internal/edge"
"github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/internal/maps"
"github.com/portainer/portainer/api/internal/slices"
"github.com/asaskevich/govalidator"
)
@ -18,12 +22,13 @@ type edgeJobUpdatePayload struct {
CronExpression *string
Recurring *bool
Endpoints []portainer.EndpointID
EdgeGroups []portainer.EdgeGroupID
FileContent *string
}
func (payload *edgeJobUpdatePayload) Validate(r *http.Request) error {
if payload.Name != nil && !govalidator.Matches(*payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) {
return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]")
return errors.New("invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]")
}
return nil
}
@ -80,16 +85,26 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
edgeJob.Name = *payload.Name
}
endpointsToAdd := map[portainer.EndpointID]bool{}
endpointsToRemove := map[portainer.EndpointID]bool{}
if payload.Endpoints != nil {
endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}
newEndpoints := endpointutils.EndpointSet(payload.Endpoints)
for endpointID := range edgeJob.Endpoints {
if !newEndpoints[endpointID] {
endpointsToRemove[endpointID] = true
}
}
for _, endpointID := range payload.Endpoints {
endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID)
if err != nil {
return err
}
if endpoint.Type != portainer.EdgeAgentOnDockerEnvironment && endpoint.Type != portainer.EdgeAgentOnKubernetesEnvironment {
if !endpointutils.IsEdgeEndpoint(endpoint) {
continue
}
@ -97,12 +112,73 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
endpointsMap[endpointID] = meta
} else {
endpointsMap[endpointID] = portainer.EdgeJobEndpointMeta{}
endpointsToAdd[endpointID] = true
}
}
edgeJob.Endpoints = endpointsMap
}
if len(payload.EdgeGroups) == 0 && len(edgeJob.EdgeGroups) > 0 {
endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore)
if err != nil {
return errors.New("unable to get endpoints from edge groups")
}
for _, endpointID := range endpoints {
endpointsToRemove[portainer.EndpointID(endpointID)] = true
}
edgeJob.EdgeGroups = nil
}
edgeGroupsToAdd := []portainer.EdgeGroupID{}
edgeGroupsToRemove := []portainer.EdgeGroupID{}
endpointsFromGroupsToAddMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}
if len(payload.EdgeGroups) > 0 {
for _, edgeGroupID := range payload.EdgeGroups {
_, err := handler.DataStore.EdgeGroup().EdgeGroup(edgeGroupID)
if err != nil {
return err
}
if !slices.Contains(edgeJob.EdgeGroups, edgeGroupID) {
edgeGroupsToAdd = append(edgeGroupsToAdd, edgeGroupID)
}
}
endpointsFromGroupsToAdd, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToAdd, handler.DataStore)
if err != nil {
return errors.New("unable to get endpoints from edge groups")
}
endpointsFromGroupsToAddMap = handler.convertEndpointsToMetaObject(endpointsFromGroupsToAdd)
for endpointID := range endpointsFromGroupsToAddMap {
endpointsToAdd[endpointID] = true
}
newEdgeGroups := edge.EdgeGroupSet(payload.EdgeGroups)
for _, edgeGroupID := range edgeJob.EdgeGroups {
if !newEdgeGroups[edgeGroupID] {
edgeGroupsToRemove = append(edgeGroupsToRemove, edgeGroupID)
}
}
endpointsFromGroupsToRemove, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToRemove, handler.DataStore)
if err != nil {
return errors.New("unable to get endpoints from edge groups")
}
endpointsToRemoveMap := handler.convertEndpointsToMetaObject(endpointsFromGroupsToRemove)
for endpointID := range endpointsToRemoveMap {
endpointsToRemove[endpointID] = true
}
edgeJob.EdgeGroups = payload.EdgeGroups
}
updateVersion := false
if payload.CronExpression != nil && *payload.CronExpression != edgeJob.CronExpression {
edgeJob.CronExpression = *payload.CronExpression
@ -133,9 +209,15 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *
edgeJob.Version++
}
for endpointID := range edgeJob.Endpoints {
maps.Copy(endpointsFromGroupsToAddMap, edgeJob.Endpoints)
for endpointID := range endpointsFromGroupsToAddMap {
handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob)
}
for endpointID := range endpointsToRemove {
handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID)
}
return nil
}

View File

@ -46,3 +46,13 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler {
bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksClear)))).Methods(http.MethodDelete)
return h
}
func (handler *Handler) convertEndpointsToMetaObject(endpoints []portainer.EndpointID) map[portainer.EndpointID]portainer.EdgeJobEndpointMeta {
endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}
for _, endpointID := range endpoints {
endpointsMap[endpointID] = portainer.EdgeJobEndpointMeta{}
}
return endpointsMap
}

View File

@ -65,10 +65,12 @@ func (handler *Handler) endpointEdgeJobsLogs(w http.ResponseWriter, r *http.Requ
return httperror.InternalServerError("Unable to save task log to the filesystem", err)
}
meta := edgeJob.Endpoints[endpoint.ID]
meta.CollectLogs = false
meta.LogsStatus = portainer.EdgeJobLogsStatusCollected
edgeJob.Endpoints[endpoint.ID] = meta
meta := portainer.EdgeJobEndpointMeta{CollectLogs: false, LogsStatus: portainer.EdgeJobLogsStatusCollected}
if _, ok := edgeJob.GroupLogsCollection[endpoint.ID]; ok {
edgeJob.GroupLogsCollection[endpoint.ID] = meta
} else {
edgeJob.Endpoints[endpoint.ID] = meta
}
err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob)

View File

@ -158,10 +158,17 @@ func parseAgentPlatform(r *http.Request) (portainer.EndpointType, error) {
func (handler *Handler) buildSchedules(endpointID portainer.EndpointID, tunnel portainer.TunnelDetails) ([]edgeJobResponse, *httperror.HandlerError) {
schedules := []edgeJobResponse{}
for _, job := range tunnel.Jobs {
var collectLogs bool
if _, ok := job.GroupLogsCollection[endpointID]; ok {
collectLogs = job.GroupLogsCollection[endpointID].CollectLogs
} else {
collectLogs = job.Endpoints[endpointID].CollectLogs
}
schedule := edgeJobResponse{
ID: job.ID,
CronExpression: job.CronExpression,
CollectLogs: job.Endpoints[endpointID].CollectLogs,
CollectLogs: collectLogs,
Version: job.Version,
}

View File

@ -150,6 +150,7 @@ func (server *Server) Start() error {
var edgeGroupsHandler = edgegroups.NewHandler(requestBouncer)
edgeGroupsHandler.DataStore = server.DataStore
edgeGroupsHandler.ReverseTunnelService = server.ReverseTunnelService
var edgeJobsHandler = edgejobs.NewHandler(requestBouncer)
edgeJobsHandler.DataStore = server.DataStore

View File

@ -2,6 +2,7 @@ package edge
import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/internal/tag"
)
@ -34,6 +35,40 @@ func EdgeGroupRelatedEndpoints(edgeGroup *portainer.EdgeGroup, endpoints []porta
return endpointIDs
}
func EdgeGroupSet(edgeGroupIDs []portainer.EdgeGroupID) map[portainer.EdgeGroupID]bool {
set := map[portainer.EdgeGroupID]bool{}
for _, edgeGroupID := range edgeGroupIDs {
set[edgeGroupID] = true
}
return set
}
func GetEndpointsFromEdgeGroups(edgeGroupIDs []portainer.EdgeGroupID, datastore dataservices.DataStore) ([]portainer.EndpointID, error) {
endpoints, err := datastore.Endpoint().Endpoints()
if err != nil {
return nil, err
}
endpointGroups, err := datastore.EndpointGroup().EndpointGroups()
if err != nil {
return nil, err
}
var response []portainer.EndpointID
for _, edgeGroupID := range edgeGroupIDs {
edgeGroup, err := datastore.EdgeGroup().EdgeGroup(edgeGroupID)
if err != nil {
return nil, err
}
response = append(response, EdgeGroupRelatedEndpoints(edgeGroup, endpoints, endpointGroups)...)
}
return response, nil
}
// edgeGroupRelatedToEndpoint returns true is edgeGroup is associated with environment(endpoint)
func edgeGroupRelatedToEndpoint(edgeGroup *portainer.EdgeGroup, endpoint *portainer.Endpoint, endpointGroup *portainer.EndpointGroup) bool {
if !edgeGroup.Dynamic {

34
api/internal/maps/maps.go Normal file
View File

@ -0,0 +1,34 @@
package maps
import "strings"
// Get a key from a nested map. Not support array for the moment
func Get(mapObj map[string]interface{}, path string, key string) interface{} {
if path == "" {
return mapObj[key]
}
paths := strings.Split(path, ".")
v := mapObj
for _, p := range paths {
if p == "" {
continue
}
value, ok := v[p].(map[string]interface{})
if ok {
v = value
} else {
return ""
}
}
return v[key]
}
// Copy copies all key/value pairs in src adding them to dst.
// When a key in src is already present in dst,
// the value in dst will be overwritten by the value associated
// with the key in src.
func Copy[M ~map[K]V, K comparable, V any](dst, src M) {
for k, v := range src {
dst[k] = v
}
}

View File

@ -0,0 +1,38 @@
package maps
import (
"encoding/json"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestGet(t *testing.T) {
t.Run("xx", func(t *testing.T) {
jsonStr := "{\"data\":{\"yesterday\":{\"sunrise\":\"06:19\"}}}"
data := make(map[string]interface{})
err := json.Unmarshal([]byte(jsonStr), &data)
if err != nil {
fmt.Printf("error: %s", err)
return
}
result := Get(data, "data.yesterday", "sunrise")
fmt.Printf("result: %s", result)
expected := "06:19"
assert.Equal(t, expected, result)
})
t.Run("xx", func(t *testing.T) {
jsonStr := "{\"data\":{\"yesterday\": \"hahaha\"}}"
data := make(map[string]interface{})
err := json.Unmarshal([]byte(jsonStr), &data)
if err != nil {
fmt.Printf("error: %s", err)
return
}
result := Get(data, "data.yesterday", "sunrise")
fmt.Printf("result: %s", result)
expected := ""
assert.Equal(t, expected, result)
})
}

View File

@ -238,10 +238,14 @@ type (
Created int64 `json:"Created"`
CronExpression string `json:"CronExpression"`
Endpoints map[EndpointID]EdgeJobEndpointMeta `json:"Endpoints"`
EdgeGroups []EdgeGroupID `json:"EdgeGroups"`
Name string `json:"Name"`
ScriptPath string `json:"ScriptPath"`
Recurring bool `json:"Recurring"`
Version int `json:"Version"`
// Field used for log collection of Endpoints belonging to EdgeGroups
GroupLogsCollection map[EndpointID]EdgeJobEndpointMeta
}
// EdgeJobEndpointMeta represents a meta data object for an Edge job and Environment(Endpoint) relation
@ -1433,6 +1437,7 @@ type (
GetActiveTunnel(endpoint *Endpoint) (TunnelDetails, error)
AddEdgeJob(endpointID EndpointID, edgeJob *EdgeJob)
RemoveEdgeJob(edgeJobID EdgeJobID)
RemoveEdgeJobFromEndpoint(endpointID EndpointID, edgeJobID EdgeJobID)
}
// Server defines the interface to serve the API

View File

@ -214,6 +214,17 @@
</div>
</div>
<!-- !upload -->
<div class="col-sm-12 form-section-title"> Edge Groups </div>
<div class="form-group" ng-if="$ctrl.edgeGroups">
<div class="col-sm-12">
<edge-groups-selector ng-if="!$ctrl.noGroups" value="$ctrl.model.EdgeGroups" on-change="($ctrl.onChangeGroups)" items="$ctrl.edgeGroups"></edge-groups-selector>
</div>
<div ng-if="$ctrl.noGroups" class="col-sm-12 small text-muted">
No Edge groups are available. Head over to the <a ui-sref="edge.groups">Edge groups view</a> to create one.
</div>
</div>
<div class="col-sm-12 form-section-title"> Target environments </div>
<!-- node-selection -->
<associated-endpoints-selector
@ -233,7 +244,7 @@
type="button"
class="btn btn-primary btn-sm"
ng-disabled="$ctrl.actionInProgress || !edgeJobForm.$valid
|| $ctrl.model.Endpoints.length === 0
|| ($ctrl.model.Endpoints.length === 0 && $ctrl.model.EdgeGroups.length === 0)
|| ($ctrl.formValues.method === 'upload' && !$ctrl.model.File)
|| ($ctrl.formValues.method === 'editor' && !$ctrl.model.FileContent)
"

View File

@ -3,7 +3,7 @@ import moment from 'moment';
export class EdgeJobFormController {
/* @ngInject */
constructor() {
constructor($async, $scope, EdgeGroupService, Notifications) {
this.state = {
formValidationError: '',
};
@ -34,10 +34,17 @@ export class EdgeJobFormController {
this.cronRegex =
/(@(annually|yearly|monthly|weekly|daily|hourly|reboot))|(@every (\d+(ns|us|µs|ms|s|m|h))+)|((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ){4,6}((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*))/;
this.$async = $async;
this.$scope = $scope;
this.action = this.action.bind(this);
this.editorUpdate = this.editorUpdate.bind(this);
this.associateEndpoint = this.associateEndpoint.bind(this);
this.dissociateEndpoint = this.dissociateEndpoint.bind(this);
this.onChangeGroups = this.onChangeGroups.bind(this);
this.EdgeGroupService = EdgeGroupService;
this.Notifications = Notifications;
}
onChangeModel(model) {
@ -50,6 +57,12 @@ export class EdgeJobFormController {
};
}
onChangeGroups(groups) {
return this.$scope.$evalAsync(() => {
this.model.EdgeGroups = groups ? groups : [];
});
}
action() {
this.state.formValidationError = '';
@ -89,8 +102,18 @@ export class EdgeJobFormController {
this.model.Endpoints = _.filter(this.model.Endpoints, (id) => id !== endpoint.Id);
}
async getEdgeGroups() {
try {
this.edgeGroups = await this.EdgeGroupService.groups();
this.noGroups = this.edgeGroups.length === 0;
} catch (err) {
this.Notifications.error('Failure', err, 'Unable to retrieve Edge groups');
}
}
$onInit() {
this.onChangeModel(this.model);
this.getEdgeGroups();
}
}

View File

@ -9,6 +9,7 @@ angular.module('portainer.edge').component('edgeJobForm', {
model: '=',
groups: '<',
tags: '<',
edgeGroups: '<',
addLabelAction: '<',
removeLabelAction: '<',
formAction: '<',

View File

@ -74,11 +74,17 @@
>
<td>
<span class="md-checkbox">
<input id="select_{{ $index }}" type="checkbox" ng-model="item.Checked" ng-disabled="item.HasEdgeStack" ng-click="$ctrl.selectItem(item, $event)" />
<input
id="select_{{ $index }}"
type="checkbox"
ng-model="item.Checked"
ng-disabled="item.HasEdgeStack || item.HasEdgeGroup"
ng-click="$ctrl.selectItem(item, $event)"
/>
<label for="select_{{ $index }}"></label>
</span>
<a ui-sref="edge.groups.edit({groupId: item.Id})">{{ item.Name }}</a>
<span ng-if="item.HasEdgeStack" class="label label-info image-tag space-left">in use</span>
<span ng-if="item.HasEdgeStack || item.HasEdgeGroup" class="label label-info image-tag space-left">in use</span>
</td>
<td>{{ item.Endpoints.length }}</td>
<td>{{ item.Dynamic ? 'Dynamic' : 'Static' }}</td>

View File

@ -8,6 +8,7 @@
model="$ctrl.model"
groups="$ctrl.groups"
tags="$ctrl.tags"
edge-groups="$ctrl.edgeGroups"
form-action="$ctrl.create"
form-action-label="Create edge job"
action-in-progress="$ctrl.state.actionInProgress"

View File

@ -13,6 +13,7 @@ export class CreateEdgeJobViewController {
Endpoints: [],
FileContent: '',
File: null,
EdgeGroups: [],
};
this.$async = $async;

View File

@ -16,6 +16,7 @@
endpoints="endpoints"
groups="$ctrl.groups"
tags="$ctrl.tags"
edge-groups="$ctrl.edgeGroups"
form-action="$ctrl.update"
form-action-label="Update Edge job"
action-in-progress="$ctrl.state.actionInProgress"

View File

@ -153,6 +153,8 @@ export class EdgeJobController {
this.groups = groups;
this.tags = tags;
this.edgeJob.EdgeGroups = this.edgeJob.EdgeGroups ? this.edgeJob.EdgeGroups : [];
if (results.length > 0) {
const endpointIds = _.map(results, (result) => result.EndpointId);
const endpoints = await getEnvironments({ query: { endpointIds } });

View File

@ -5,6 +5,7 @@ export function ScheduleCreateRequest(model) {
this.Endpoints = model.Endpoints;
this.FileContent = model.FileContent;
this.File = model.File;
this.EdgeGroups = model.EdgeGroups;
}
export function ScheduleUpdateRequest(model) {
@ -14,4 +15,5 @@ export function ScheduleUpdateRequest(model) {
this.CronExpression = model.CronExpression;
this.Endpoints = model.Endpoints;
this.FileContent = model.FileContent;
this.EdgeGroups = model.EdgeGroups;
}