feat(endpointedge): add support for transactions EE-5327 (#8961)

pull/8539/head^2
andres-portainer 2023-05-18 14:58:33 -03:00 committed by GitHub
parent 881fa01eb2
commit db93e5880f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 125 additions and 23 deletions

View File

@ -31,6 +31,13 @@ func NewService(connection portainer.Connection) (*Service, error) {
}, nil }, nil
} }
func (service *Service) Tx(tx portainer.Transaction) ServiceTx {
return ServiceTx{
service: service,
tx: tx,
}
}
// Settings retrieve the settings object. // Settings retrieve the settings object.
func (service *Service) Settings() (*portainer.Settings, error) { func (service *Service) Settings() (*portainer.Settings, error) {
var settings portainer.Settings var settings portainer.Settings

View File

@ -0,0 +1,31 @@
package settings
import (
portainer "github.com/portainer/portainer/api"
)
type ServiceTx struct {
service *Service
tx portainer.Transaction
}
func (service ServiceTx) BucketName() string {
return BucketName
}
// Settings retrieve the settings object.
func (service ServiceTx) Settings() (*portainer.Settings, error) {
var settings portainer.Settings
err := service.tx.GetObject(BucketName, []byte(settingsKey), &settings)
if err != nil {
return nil, err
}
return &settings, nil
}
// UpdateSettings persists a Settings object.
func (service ServiceTx) UpdateSettings(settings *portainer.Settings) error {
return service.tx.UpdateObject(BucketName, []byte(settingsKey), settings)
}

View File

@ -44,7 +44,7 @@ func (tx *StoreTx) FDOProfile() dataservices.FDOProfileService {
func (tx *StoreTx) HelmUserRepository() dataservices.HelmUserRepositoryService { return nil } func (tx *StoreTx) HelmUserRepository() dataservices.HelmUserRepositoryService { return nil }
func (tx *StoreTx) Registry() dataservices.RegistryService { func (tx *StoreTx) Registry() dataservices.RegistryService {
return nil return tx.store.RegistryService.Tx(tx.tx)
} }
func (tx *StoreTx) ResourceControl() dataservices.ResourceControlService { func (tx *StoreTx) ResourceControl() dataservices.ResourceControlService {
@ -56,7 +56,10 @@ func (tx *StoreTx) Role() dataservices.RoleService {
} }
func (tx *StoreTx) APIKeyRepository() dataservices.APIKeyRepository { return nil } func (tx *StoreTx) APIKeyRepository() dataservices.APIKeyRepository { return nil }
func (tx *StoreTx) Settings() dataservices.SettingsService { return nil }
func (tx *StoreTx) Settings() dataservices.SettingsService {
return tx.store.SettingsService.Tx(tx.tx)
}
func (tx *StoreTx) Snapshot() dataservices.SnapshotService { func (tx *StoreTx) Snapshot() dataservices.SnapshotService {
return tx.store.SnapshotService.Tx(tx.tx) return tx.store.SnapshotService.Tx(tx.tx)

View File

@ -1,6 +1,7 @@
package endpointedge package endpointedge
import ( import (
"errors"
"net/http" "net/http"
"strconv" "strconv"
@ -8,7 +9,9 @@ import (
"github.com/portainer/libhttp/request" "github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response" "github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/http/middlewares" "github.com/portainer/portainer/api/http/middlewares"
"github.com/portainer/portainer/pkg/featureflags"
) )
type logsPayload struct { type logsPayload struct {
@ -53,14 +56,42 @@ func (handler *Handler) endpointEdgeJobsLogs(w http.ResponseWriter, r *http.Requ
return httperror.BadRequest("Invalid request payload", err) return httperror.BadRequest("Invalid request payload", err)
} }
edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) if featureflags.IsEnabled(portainer.FeatureNoTx) {
if handler.DataStore.IsErrObjectNotFound(err) { err = handler.getEdgeJobLobs(handler.DataStore, endpoint.ID, portainer.EdgeJobID(edgeJobID), payload)
} else {
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
return handler.getEdgeJobLobs(tx, endpoint.ID, portainer.EdgeJobID(edgeJobID), payload)
})
}
if err != nil {
var httpErr *httperror.HandlerError
if errors.As(err, &httpErr) {
return httpErr
}
return httperror.InternalServerError("Unexpected error", err)
}
return response.JSON(w, nil)
}
func (handler *Handler) getEdgeJobLobs(tx dataservices.DataStoreTx, endpointID portainer.EndpointID, edgeJobID portainer.EdgeJobID, payload logsPayload) error {
endpoint, err := tx.Endpoint().Endpoint(endpointID)
if tx.IsErrObjectNotFound(err) {
return httperror.NotFound("Unable to find an environment with the specified identifier inside the database", err)
} else if err != nil {
return httperror.InternalServerError("Unable to find an environment with the specified identifier inside the database", err)
}
edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID))
if tx.IsErrObjectNotFound(err) {
return httperror.NotFound("Unable to find an edge job with the specified identifier inside the database", err) return httperror.NotFound("Unable to find an edge job with the specified identifier inside the database", err)
} else if err != nil { } else if err != nil {
return httperror.InternalServerError("Unable to find an edge job with the specified identifier inside the database", err) return httperror.InternalServerError("Unable to find an edge job with the specified identifier inside the database", err)
} }
err = handler.FileService.StoreEdgeJobTaskLogFileFromBytes(strconv.Itoa(edgeJobID), strconv.Itoa(int(endpoint.ID)), []byte(payload.FileContent)) err = handler.FileService.StoreEdgeJobTaskLogFileFromBytes(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID)), []byte(payload.FileContent))
if err != nil { if err != nil {
return httperror.InternalServerError("Unable to save task log to the filesystem", err) return httperror.InternalServerError("Unable to save task log to the filesystem", err)
} }
@ -72,7 +103,7 @@ func (handler *Handler) endpointEdgeJobsLogs(w http.ResponseWriter, r *http.Requ
edgeJob.Endpoints[endpoint.ID] = meta edgeJob.Endpoints[endpoint.ID] = meta
} }
err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) err = tx.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob)
handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob)
@ -80,5 +111,5 @@ func (handler *Handler) endpointEdgeJobsLogs(w http.ResponseWriter, r *http.Requ
return httperror.InternalServerError("Unable to persist edge job changes to the database", err) return httperror.InternalServerError("Unable to persist edge job changes to the database", err)
} }
return response.JSON(w, nil) return nil
} }

View File

@ -17,7 +17,9 @@ import (
"github.com/portainer/libhttp/request" "github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response" "github.com/portainer/libhttp/response"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/edge/cache" "github.com/portainer/portainer/api/internal/edge/cache"
"github.com/portainer/portainer/pkg/featureflags"
) )
type stackStatusResponse struct { type stackStatusResponse struct {
@ -96,6 +98,34 @@ func (handler *Handler) endpointEdgeStatusInspect(w http.ResponseWriter, r *http
return httperror.Forbidden("Permission denied to access environment", err) return httperror.Forbidden("Permission denied to access environment", err)
} }
var statusResponse *endpointEdgeStatusInspectResponse
if featureflags.IsEnabled(portainer.FeatureNoTx) {
statusResponse, err = handler.inspectStatus(handler.DataStore, r, portainer.EndpointID(endpointID))
} else {
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
statusResponse, err = handler.inspectStatus(tx, r, portainer.EndpointID(endpointID))
return err
})
}
if err != nil {
var httpErr *httperror.HandlerError
if errors.As(err, &httpErr) {
return httpErr
}
return httperror.InternalServerError("Unexpected error", err)
}
return cacheResponse(w, endpoint.ID, *statusResponse)
}
func (handler *Handler) inspectStatus(tx dataservices.DataStoreTx, r *http.Request, endpointID portainer.EndpointID) (*endpointEdgeStatusInspectResponse, error) {
endpoint, err := tx.Endpoint().Endpoint(endpointID)
if err != nil {
return nil, err
}
if endpoint.EdgeID == "" { if endpoint.EdgeID == "" {
edgeIdentifier := r.Header.Get(portainer.PortainerAgentEdgeIDHeader) edgeIdentifier := r.Header.Get(portainer.PortainerAgentEdgeIDHeader)
endpoint.EdgeID = edgeIdentifier endpoint.EdgeID = edgeIdentifier
@ -103,7 +133,7 @@ func (handler *Handler) endpointEdgeStatusInspect(w http.ResponseWriter, r *http
agentPlatform, agentPlatformErr := parseAgentPlatform(r) agentPlatform, agentPlatformErr := parseAgentPlatform(r)
if agentPlatformErr != nil { if agentPlatformErr != nil {
return httperror.BadRequest("agent platform header is not valid", err) return nil, httperror.BadRequest("agent platform header is not valid", err)
} }
endpoint.Type = agentPlatform endpoint.Type = agentPlatform
@ -112,21 +142,21 @@ func (handler *Handler) endpointEdgeStatusInspect(w http.ResponseWriter, r *http
endpoint.LastCheckInDate = time.Now().Unix() endpoint.LastCheckInDate = time.Now().Unix()
err = handler.DataStore.Endpoint().UpdateEndpoint(endpoint.ID, endpoint) err = tx.Endpoint().UpdateEndpoint(endpoint.ID, endpoint)
if err != nil { if err != nil {
return httperror.InternalServerError("Unable to Unable to persist environment changes inside the database", err) return nil, httperror.InternalServerError("Unable to Unable to persist environment changes inside the database", err)
} }
err = handler.requestBouncer.TrustedEdgeEnvironmentAccess(endpoint) err = handler.requestBouncer.TrustedEdgeEnvironmentAccess(tx, endpoint)
if err != nil { if err != nil {
return httperror.Forbidden("Permission denied to access environment", err) return nil, httperror.Forbidden("Permission denied to access environment", err)
} }
checkinInterval := endpoint.EdgeCheckinInterval checkinInterval := endpoint.EdgeCheckinInterval
if endpoint.EdgeCheckinInterval == 0 { if endpoint.EdgeCheckinInterval == 0 {
settings, err := handler.DataStore.Settings().Settings() settings, err := tx.Settings().Settings()
if err != nil { if err != nil {
return httperror.InternalServerError("Unable to retrieve settings from the database", err) return nil, httperror.InternalServerError("Unable to retrieve settings from the database", err)
} }
checkinInterval = settings.EdgeAgentCheckinInterval checkinInterval = settings.EdgeAgentCheckinInterval
} }
@ -142,7 +172,7 @@ func (handler *Handler) endpointEdgeStatusInspect(w http.ResponseWriter, r *http
schedules, handlerErr := handler.buildSchedules(endpoint.ID, tunnel) schedules, handlerErr := handler.buildSchedules(endpoint.ID, tunnel)
if handlerErr != nil { if handlerErr != nil {
return handlerErr return nil, handlerErr
} }
statusResponse.Schedules = schedules statusResponse.Schedules = schedules
@ -150,13 +180,13 @@ func (handler *Handler) endpointEdgeStatusInspect(w http.ResponseWriter, r *http
handler.ReverseTunnelService.SetTunnelStatusToActive(endpoint.ID) handler.ReverseTunnelService.SetTunnelStatusToActive(endpoint.ID)
} }
edgeStacksStatus, handlerErr := handler.buildEdgeStacks(endpoint.ID) edgeStacksStatus, handlerErr := handler.buildEdgeStacks(tx, endpoint.ID)
if handlerErr != nil { if handlerErr != nil {
return handlerErr return nil, handlerErr
} }
statusResponse.Stacks = edgeStacksStatus statusResponse.Stacks = edgeStacksStatus
return cacheResponse(w, endpoint.ID, statusResponse) return &statusResponse, nil
} }
func parseAgentPlatform(r *http.Request) (portainer.EndpointType, error) { func parseAgentPlatform(r *http.Request) (portainer.EndpointType, error) {
@ -210,15 +240,15 @@ func (handler *Handler) buildSchedules(endpointID portainer.EndpointID, tunnel p
return schedules, nil return schedules, nil
} }
func (handler *Handler) buildEdgeStacks(endpointID portainer.EndpointID) ([]stackStatusResponse, *httperror.HandlerError) { func (handler *Handler) buildEdgeStacks(tx dataservices.DataStoreTx, endpointID portainer.EndpointID) ([]stackStatusResponse, *httperror.HandlerError) {
relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID) relation, err := tx.EndpointRelation().EndpointRelation(endpointID)
if err != nil { if err != nil {
return nil, httperror.InternalServerError("Unable to retrieve relation object from the database", err) return nil, httperror.InternalServerError("Unable to retrieve relation object from the database", err)
} }
edgeStacksStatus := []stackStatusResponse{} edgeStacksStatus := []stackStatusResponse{}
for stackID := range relation.EdgeStacks { for stackID := range relation.EdgeStacks {
version, ok := handler.DataStore.EdgeStack().EdgeStackVersion(stackID) version, ok := tx.EdgeStack().EdgeStackVersion(stackID)
if !ok { if !ok {
return nil, httperror.InternalServerError("Unable to retrieve edge stack from the database", err) return nil, httperror.InternalServerError("Unable to retrieve edge stack from the database", err)
} }

View File

@ -151,12 +151,12 @@ func (bouncer *RequestBouncer) AuthorizedEdgeEndpointOperation(r *http.Request,
// TrustedEdgeEnvironmentAccess defines a security check for Edge environments, checks if // TrustedEdgeEnvironmentAccess defines a security check for Edge environments, checks if
// the request is coming from a trusted Edge environment // the request is coming from a trusted Edge environment
func (bouncer *RequestBouncer) TrustedEdgeEnvironmentAccess(endpoint *portainer.Endpoint) error { func (bouncer *RequestBouncer) TrustedEdgeEnvironmentAccess(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint) error {
if endpoint.UserTrusted { if endpoint.UserTrusted {
return nil return nil
} }
settings, err := bouncer.dataStore.Settings().Settings() settings, err := tx.Settings().Settings()
if err != nil { if err != nil {
return errors.WithMessage(err, "could not retrieve the settings") return errors.WithMessage(err, "could not retrieve the settings")
} }