fix(tunnels): make the tunnels more robust EE-7042 (#11877)
parent
aaab2fa9d8
commit
c5a1d7e051
|
@ -5,6 +5,17 @@ import (
|
||||||
"github.com/portainer/portainer/api/internal/edge/cache"
|
"github.com/portainer/portainer/api/internal/edge/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// EdgeJobs retrieves the edge jobs for the given environment
|
||||||
|
func (service *Service) EdgeJobs(endpointID portainer.EndpointID) []portainer.EdgeJob {
|
||||||
|
service.mu.RLock()
|
||||||
|
defer service.mu.RUnlock()
|
||||||
|
|
||||||
|
return append(
|
||||||
|
make([]portainer.EdgeJob, 0, len(service.edgeJobs[endpointID])),
|
||||||
|
service.edgeJobs[endpointID]...,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// AddEdgeJob register an EdgeJob inside the tunnel details associated to an environment(endpoint).
|
// AddEdgeJob register an EdgeJob inside the tunnel details associated to an environment(endpoint).
|
||||||
func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portainer.EdgeJob) {
|
func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portainer.EdgeJob) {
|
||||||
if endpoint.Edge.AsyncMode {
|
if endpoint.Edge.AsyncMode {
|
||||||
|
@ -12,10 +23,10 @@ func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portai
|
||||||
}
|
}
|
||||||
|
|
||||||
service.mu.Lock()
|
service.mu.Lock()
|
||||||
tunnel := service.getTunnelDetails(endpoint.ID)
|
defer service.mu.Unlock()
|
||||||
|
|
||||||
existingJobIndex := -1
|
existingJobIndex := -1
|
||||||
for idx, existingJob := range tunnel.Jobs {
|
for idx, existingJob := range service.edgeJobs[endpoint.ID] {
|
||||||
if existingJob.ID == edgeJob.ID {
|
if existingJob.ID == edgeJob.ID {
|
||||||
existingJobIndex = idx
|
existingJobIndex = idx
|
||||||
|
|
||||||
|
@ -24,30 +35,28 @@ func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portai
|
||||||
}
|
}
|
||||||
|
|
||||||
if existingJobIndex == -1 {
|
if existingJobIndex == -1 {
|
||||||
tunnel.Jobs = append(tunnel.Jobs, *edgeJob)
|
service.edgeJobs[endpoint.ID] = append(service.edgeJobs[endpoint.ID], *edgeJob)
|
||||||
} else {
|
} else {
|
||||||
tunnel.Jobs[existingJobIndex] = *edgeJob
|
service.edgeJobs[endpoint.ID][existingJobIndex] = *edgeJob
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.Del(endpoint.ID)
|
cache.Del(endpoint.ID)
|
||||||
|
|
||||||
service.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveEdgeJob will remove the specified Edge job from each tunnel it was registered with.
|
// RemoveEdgeJob will remove the specified Edge job from each tunnel it was registered with.
|
||||||
func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) {
|
func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) {
|
||||||
service.mu.Lock()
|
service.mu.Lock()
|
||||||
|
|
||||||
for endpointID, tunnel := range service.tunnelDetailsMap {
|
for endpointID := range service.edgeJobs {
|
||||||
n := 0
|
n := 0
|
||||||
for _, edgeJob := range tunnel.Jobs {
|
for _, edgeJob := range service.edgeJobs[endpointID] {
|
||||||
if edgeJob.ID != edgeJobID {
|
if edgeJob.ID != edgeJobID {
|
||||||
tunnel.Jobs[n] = edgeJob
|
service.edgeJobs[endpointID][n] = edgeJob
|
||||||
n++
|
n++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tunnel.Jobs = tunnel.Jobs[:n]
|
service.edgeJobs[endpointID] = service.edgeJobs[endpointID][:n]
|
||||||
|
|
||||||
cache.Del(endpointID)
|
cache.Del(endpointID)
|
||||||
}
|
}
|
||||||
|
@ -57,19 +66,17 @@ func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) {
|
||||||
|
|
||||||
func (service *Service) RemoveEdgeJobFromEndpoint(endpointID portainer.EndpointID, edgeJobID portainer.EdgeJobID) {
|
func (service *Service) RemoveEdgeJobFromEndpoint(endpointID portainer.EndpointID, edgeJobID portainer.EdgeJobID) {
|
||||||
service.mu.Lock()
|
service.mu.Lock()
|
||||||
tunnel := service.getTunnelDetails(endpointID)
|
defer service.mu.Unlock()
|
||||||
|
|
||||||
n := 0
|
n := 0
|
||||||
for _, edgeJob := range tunnel.Jobs {
|
for _, edgeJob := range service.edgeJobs[endpointID] {
|
||||||
if edgeJob.ID != edgeJobID {
|
if edgeJob.ID != edgeJobID {
|
||||||
tunnel.Jobs[n] = edgeJob
|
service.edgeJobs[endpointID][n] = edgeJob
|
||||||
n++
|
n++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tunnel.Jobs = tunnel.Jobs[:n]
|
service.edgeJobs[endpointID] = service.edgeJobs[endpointID][:n]
|
||||||
|
|
||||||
cache.Del(endpointID)
|
cache.Del(endpointID)
|
||||||
|
|
||||||
service.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
tunnelCleanupInterval = 10 * time.Second
|
tunnelCleanupInterval = 10 * time.Second
|
||||||
requiredTimeout = 15 * time.Second
|
|
||||||
activeTimeout = 4*time.Minute + 30*time.Second
|
activeTimeout = 4*time.Minute + 30*time.Second
|
||||||
pingTimeout = 3 * time.Second
|
pingTimeout = 3 * time.Second
|
||||||
)
|
)
|
||||||
|
@ -28,32 +27,54 @@ const (
|
||||||
// It is used to start a reverse tunnel server and to manage the connection status of each tunnel
|
// It is used to start a reverse tunnel server and to manage the connection status of each tunnel
|
||||||
// connected to the tunnel server.
|
// connected to the tunnel server.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
serverFingerprint string
|
serverFingerprint string
|
||||||
serverPort string
|
serverPort string
|
||||||
tunnelDetailsMap map[portainer.EndpointID]*portainer.TunnelDetails
|
activeTunnels map[portainer.EndpointID]*portainer.TunnelDetails
|
||||||
dataStore dataservices.DataStore
|
edgeJobs map[portainer.EndpointID][]portainer.EdgeJob
|
||||||
snapshotService portainer.SnapshotService
|
dataStore dataservices.DataStore
|
||||||
chiselServer *chserver.Server
|
snapshotService portainer.SnapshotService
|
||||||
shutdownCtx context.Context
|
chiselServer *chserver.Server
|
||||||
ProxyManager *proxy.Manager
|
shutdownCtx context.Context
|
||||||
mu sync.Mutex
|
ProxyManager *proxy.Manager
|
||||||
fileService portainer.FileService
|
mu sync.RWMutex
|
||||||
|
fileService portainer.FileService
|
||||||
|
defaultCheckinInterval int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService returns a pointer to a new instance of Service
|
// NewService returns a pointer to a new instance of Service
|
||||||
func NewService(dataStore dataservices.DataStore, shutdownCtx context.Context, fileService portainer.FileService) *Service {
|
func NewService(dataStore dataservices.DataStore, shutdownCtx context.Context, fileService portainer.FileService) *Service {
|
||||||
|
defaultCheckinInterval := portainer.DefaultEdgeAgentCheckinIntervalInSeconds
|
||||||
|
|
||||||
|
settings, err := dataStore.Settings().Settings()
|
||||||
|
if err == nil {
|
||||||
|
defaultCheckinInterval = settings.EdgeAgentCheckinInterval
|
||||||
|
} else {
|
||||||
|
log.Error().Err(err).Msg("unable to retrieve the settings from the database")
|
||||||
|
}
|
||||||
|
|
||||||
return &Service{
|
return &Service{
|
||||||
tunnelDetailsMap: make(map[portainer.EndpointID]*portainer.TunnelDetails),
|
activeTunnels: make(map[portainer.EndpointID]*portainer.TunnelDetails),
|
||||||
dataStore: dataStore,
|
edgeJobs: make(map[portainer.EndpointID][]portainer.EdgeJob),
|
||||||
shutdownCtx: shutdownCtx,
|
dataStore: dataStore,
|
||||||
fileService: fileService,
|
shutdownCtx: shutdownCtx,
|
||||||
|
fileService: fileService,
|
||||||
|
defaultCheckinInterval: defaultCheckinInterval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pingAgent ping the given agent so that the agent can keep the tunnel alive
|
// pingAgent ping the given agent so that the agent can keep the tunnel alive
|
||||||
func (service *Service) pingAgent(endpointID portainer.EndpointID) error {
|
func (service *Service) pingAgent(endpointID portainer.EndpointID) error {
|
||||||
tunnel := service.GetTunnelDetails(endpointID)
|
endpoint, err := service.dataStore.Endpoint().Endpoint(endpointID)
|
||||||
requestURL := fmt.Sprintf("http://127.0.0.1:%d/ping", tunnel.Port)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tunnelAddr, err := service.TunnelAddr(endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
requestURL := fmt.Sprintf("http://%s/ping", tunnelAddr)
|
||||||
req, err := http.NewRequest(http.MethodHead, requestURL, nil)
|
req, err := http.NewRequest(http.MethodHead, requestURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -76,47 +97,49 @@ func (service *Service) pingAgent(endpointID portainer.EndpointID) error {
|
||||||
|
|
||||||
// KeepTunnelAlive keeps the tunnel of the given environment for maxAlive duration, or until ctx is done
|
// KeepTunnelAlive keeps the tunnel of the given environment for maxAlive duration, or until ctx is done
|
||||||
func (service *Service) KeepTunnelAlive(endpointID portainer.EndpointID, ctx context.Context, maxAlive time.Duration) {
|
func (service *Service) KeepTunnelAlive(endpointID portainer.EndpointID, ctx context.Context, maxAlive time.Duration) {
|
||||||
go func() {
|
go service.keepTunnelAlive(endpointID, ctx, maxAlive)
|
||||||
log.Debug().
|
}
|
||||||
Int("endpoint_id", int(endpointID)).
|
|
||||||
Float64("max_alive_minutes", maxAlive.Minutes()).
|
|
||||||
Msg("KeepTunnelAlive: start")
|
|
||||||
|
|
||||||
maxAliveTicker := time.NewTicker(maxAlive)
|
func (service *Service) keepTunnelAlive(endpointID portainer.EndpointID, ctx context.Context, maxAlive time.Duration) {
|
||||||
defer maxAliveTicker.Stop()
|
log.Debug().
|
||||||
|
Int("endpoint_id", int(endpointID)).
|
||||||
|
Float64("max_alive_minutes", maxAlive.Minutes()).
|
||||||
|
Msg("KeepTunnelAlive: start")
|
||||||
|
|
||||||
pingTicker := time.NewTicker(tunnelCleanupInterval)
|
maxAliveTicker := time.NewTicker(maxAlive)
|
||||||
defer pingTicker.Stop()
|
defer maxAliveTicker.Stop()
|
||||||
|
|
||||||
for {
|
pingTicker := time.NewTicker(tunnelCleanupInterval)
|
||||||
select {
|
defer pingTicker.Stop()
|
||||||
case <-pingTicker.C:
|
|
||||||
service.SetTunnelStatusToActive(endpointID)
|
|
||||||
err := service.pingAgent(endpointID)
|
|
||||||
if err != nil {
|
|
||||||
log.Debug().
|
|
||||||
Int("endpoint_id", int(endpointID)).
|
|
||||||
Err(err).
|
|
||||||
Msg("KeepTunnelAlive: ping agent")
|
|
||||||
}
|
|
||||||
case <-maxAliveTicker.C:
|
|
||||||
log.Debug().
|
|
||||||
Int("endpoint_id", int(endpointID)).
|
|
||||||
Float64("timeout_minutes", maxAlive.Minutes()).
|
|
||||||
Msg("KeepTunnelAlive: tunnel keep alive timeout")
|
|
||||||
|
|
||||||
return
|
for {
|
||||||
case <-ctx.Done():
|
select {
|
||||||
err := ctx.Err()
|
case <-pingTicker.C:
|
||||||
|
service.UpdateLastActivity(endpointID)
|
||||||
|
|
||||||
|
if err := service.pingAgent(endpointID); err != nil {
|
||||||
log.Debug().
|
log.Debug().
|
||||||
Int("endpoint_id", int(endpointID)).
|
Int("endpoint_id", int(endpointID)).
|
||||||
Err(err).
|
Err(err).
|
||||||
Msg("KeepTunnelAlive: tunnel stop")
|
Msg("KeepTunnelAlive: ping agent")
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
case <-maxAliveTicker.C:
|
||||||
|
log.Debug().
|
||||||
|
Int("endpoint_id", int(endpointID)).
|
||||||
|
Float64("timeout_minutes", maxAlive.Minutes()).
|
||||||
|
Msg("KeepTunnelAlive: tunnel keep alive timeout")
|
||||||
|
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
err := ctx.Err()
|
||||||
|
log.Debug().
|
||||||
|
Int("endpoint_id", int(endpointID)).
|
||||||
|
Err(err).
|
||||||
|
Msg("KeepTunnelAlive: tunnel stop")
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartTunnelServer starts a tunnel server on the specified addr and port.
|
// StartTunnelServer starts a tunnel server on the specified addr and port.
|
||||||
|
@ -126,7 +149,6 @@ func (service *Service) KeepTunnelAlive(endpointID portainer.EndpointID, ctx con
|
||||||
// The snapshotter is used in the tunnel status verification process.
|
// The snapshotter is used in the tunnel status verification process.
|
||||||
func (service *Service) StartTunnelServer(addr, port string, snapshotService portainer.SnapshotService) error {
|
func (service *Service) StartTunnelServer(addr, port string, snapshotService portainer.SnapshotService) error {
|
||||||
privateKeyFile, err := service.retrievePrivateKeyFile()
|
privateKeyFile, err := service.retrievePrivateKeyFile()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -144,21 +166,21 @@ func (service *Service) StartTunnelServer(addr, port string, snapshotService por
|
||||||
service.serverFingerprint = chiselServer.GetFingerprint()
|
service.serverFingerprint = chiselServer.GetFingerprint()
|
||||||
service.serverPort = port
|
service.serverPort = port
|
||||||
|
|
||||||
err = chiselServer.Start(addr, port)
|
if err := chiselServer.Start(addr, port); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
service.chiselServer = chiselServer
|
service.chiselServer = chiselServer
|
||||||
|
|
||||||
// TODO: work-around Chisel default behavior.
|
// TODO: work-around Chisel default behavior.
|
||||||
// By default, Chisel will allow anyone to connect if no user exists.
|
// By default, Chisel will allow anyone to connect if no user exists.
|
||||||
username, password := generateRandomCredentials()
|
username, password := generateRandomCredentials()
|
||||||
err = service.chiselServer.AddUser(username, password, "127.0.0.1")
|
if err = service.chiselServer.AddUser(username, password, "127.0.0.1"); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
service.snapshotService = snapshotService
|
service.snapshotService = snapshotService
|
||||||
|
|
||||||
go service.startTunnelVerificationLoop()
|
go service.startTunnelVerificationLoop()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -172,37 +194,39 @@ func (service *Service) StopTunnelServer() error {
|
||||||
func (service *Service) retrievePrivateKeyFile() (string, error) {
|
func (service *Service) retrievePrivateKeyFile() (string, error) {
|
||||||
privateKeyFile := service.fileService.GetDefaultChiselPrivateKeyPath()
|
privateKeyFile := service.fileService.GetDefaultChiselPrivateKeyPath()
|
||||||
|
|
||||||
exist, _ := service.fileService.FileExists(privateKeyFile)
|
if exists, _ := service.fileService.FileExists(privateKeyFile); exists {
|
||||||
if !exist {
|
|
||||||
log.Debug().
|
|
||||||
Str("private-key", privateKeyFile).
|
|
||||||
Msg("Chisel private key file does not exist")
|
|
||||||
|
|
||||||
privateKey, err := ccrypto.GenerateKey("")
|
|
||||||
if err != nil {
|
|
||||||
log.Error().
|
|
||||||
Err(err).
|
|
||||||
Msg("Failed to generate chisel private key")
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = service.fileService.StoreChiselPrivateKey(privateKey)
|
|
||||||
if err != nil {
|
|
||||||
log.Error().
|
|
||||||
Err(err).
|
|
||||||
Msg("Failed to save Chisel private key to disk")
|
|
||||||
return "", err
|
|
||||||
} else {
|
|
||||||
log.Info().
|
|
||||||
Str("private-key", privateKeyFile).
|
|
||||||
Msg("Generated a new Chisel private key file")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Info().
|
log.Info().
|
||||||
Str("private-key", privateKeyFile).
|
Str("private-key", privateKeyFile).
|
||||||
Msg("Found Chisel private key file on disk")
|
Msg("found Chisel private key file on disk")
|
||||||
|
|
||||||
|
return privateKeyFile, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug().
|
||||||
|
Str("private-key", privateKeyFile).
|
||||||
|
Msg("chisel private key file does not exist")
|
||||||
|
|
||||||
|
privateKey, err := ccrypto.GenerateKey("")
|
||||||
|
if err != nil {
|
||||||
|
log.Error().
|
||||||
|
Err(err).
|
||||||
|
Msg("failed to generate chisel private key")
|
||||||
|
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = service.fileService.StoreChiselPrivateKey(privateKey); err != nil {
|
||||||
|
log.Error().
|
||||||
|
Err(err).
|
||||||
|
Msg("failed to save Chisel private key to disk")
|
||||||
|
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().
|
||||||
|
Str("private-key", privateKeyFile).
|
||||||
|
Msg("generated a new Chisel private key file")
|
||||||
|
|
||||||
return privateKeyFile, nil
|
return privateKeyFile, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,63 +254,45 @@ func (service *Service) startTunnelVerificationLoop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkTunnels finds the first tunnel that has not had any activity recently
|
||||||
|
// and attempts to take a snapshot, then closes it and returns
|
||||||
func (service *Service) checkTunnels() {
|
func (service *Service) checkTunnels() {
|
||||||
tunnels := make(map[portainer.EndpointID]portainer.TunnelDetails)
|
service.mu.RLock()
|
||||||
|
|
||||||
service.mu.Lock()
|
for endpointID, tunnel := range service.activeTunnels {
|
||||||
for key, tunnel := range service.tunnelDetailsMap {
|
|
||||||
if tunnel.LastActivity.IsZero() || tunnel.Status == portainer.EdgeAgentIdle {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if tunnel.Status == portainer.EdgeAgentManagementRequired && time.Since(tunnel.LastActivity) < requiredTimeout {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if tunnel.Status == portainer.EdgeAgentActive && time.Since(tunnel.LastActivity) < activeTimeout {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tunnels[key] = *tunnel
|
|
||||||
}
|
|
||||||
service.mu.Unlock()
|
|
||||||
|
|
||||||
for endpointID, tunnel := range tunnels {
|
|
||||||
elapsed := time.Since(tunnel.LastActivity)
|
elapsed := time.Since(tunnel.LastActivity)
|
||||||
log.Debug().
|
log.Debug().
|
||||||
Int("endpoint_id", int(endpointID)).
|
Int("endpoint_id", int(endpointID)).
|
||||||
Str("status", tunnel.Status).
|
Float64("last_activity_seconds", elapsed.Seconds()).
|
||||||
Float64("status_time_seconds", elapsed.Seconds()).
|
|
||||||
Msg("environment tunnel monitoring")
|
Msg("environment tunnel monitoring")
|
||||||
|
|
||||||
if tunnel.Status == portainer.EdgeAgentManagementRequired && elapsed > requiredTimeout {
|
if tunnel.Status == portainer.EdgeAgentManagementRequired && elapsed < activeTimeout {
|
||||||
log.Debug().
|
continue
|
||||||
Int("endpoint_id", int(endpointID)).
|
|
||||||
Str("status", tunnel.Status).
|
|
||||||
Float64("status_time_seconds", elapsed.Seconds()).
|
|
||||||
Float64("timeout_seconds", requiredTimeout.Seconds()).
|
|
||||||
Msg("REQUIRED state timeout exceeded")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if tunnel.Status == portainer.EdgeAgentActive && elapsed > activeTimeout {
|
tunnelPort := tunnel.Port
|
||||||
log.Debug().
|
|
||||||
Int("endpoint_id", int(endpointID)).
|
|
||||||
Str("status", tunnel.Status).
|
|
||||||
Float64("status_time_seconds", elapsed.Seconds()).
|
|
||||||
Float64("timeout_seconds", activeTimeout.Seconds()).
|
|
||||||
Msg("ACTIVE state timeout exceeded")
|
|
||||||
|
|
||||||
err := service.snapshotEnvironment(endpointID, tunnel.Port)
|
service.mu.RUnlock()
|
||||||
if err != nil {
|
|
||||||
log.Error().
|
log.Debug().
|
||||||
Int("endpoint_id", int(endpointID)).
|
Int("endpoint_id", int(endpointID)).
|
||||||
Err(err).
|
Float64("last_activity_seconds", elapsed.Seconds()).
|
||||||
Msg("unable to snapshot Edge environment")
|
Float64("timeout_seconds", activeTimeout.Seconds()).
|
||||||
}
|
Msg("last activity timeout exceeded")
|
||||||
|
|
||||||
|
if err := service.snapshotEnvironment(endpointID, tunnelPort); err != nil {
|
||||||
|
log.Error().
|
||||||
|
Int("endpoint_id", int(endpointID)).
|
||||||
|
Err(err).
|
||||||
|
Msg("unable to snapshot Edge environment")
|
||||||
}
|
}
|
||||||
|
|
||||||
service.SetTunnelStatusToIdle(portainer.EndpointID(endpointID))
|
service.close(portainer.EndpointID(endpointID))
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
service.mu.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *Service) snapshotEnvironment(endpointID portainer.EndpointID, tunnelPort int) error {
|
func (service *Service) snapshotEnvironment(endpointID portainer.EndpointID, tunnelPort int) error {
|
||||||
|
|
|
@ -8,14 +8,20 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
portainer "github.com/portainer/portainer/api"
|
portainer "github.com/portainer/portainer/api"
|
||||||
|
"github.com/portainer/portainer/api/datastore"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPingAgentPanic(t *testing.T) {
|
func TestPingAgentPanic(t *testing.T) {
|
||||||
endpointID := portainer.EndpointID(1)
|
endpoint := &portainer.Endpoint{
|
||||||
|
ID: 1,
|
||||||
|
Type: portainer.EdgeAgentOnDockerEnvironment,
|
||||||
|
}
|
||||||
|
|
||||||
s := NewService(nil, nil, nil)
|
_, store := datastore.MustNewTestStore(t, true, true)
|
||||||
|
|
||||||
|
s := NewService(store, nil, nil)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
require.Nil(t, recover())
|
require.Nil(t, recover())
|
||||||
|
@ -36,10 +42,10 @@ func TestPingAgentPanic(t *testing.T) {
|
||||||
errCh <- srv.Serve(ln)
|
errCh <- srv.Serve(ln)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s.getTunnelDetails(endpointID)
|
s.Open(endpoint)
|
||||||
s.tunnelDetailsMap[endpointID].Port = ln.Addr().(*net.TCPAddr).Port
|
s.activeTunnels[endpoint.ID].Port = ln.Addr().(*net.TCPAddr).Port
|
||||||
|
|
||||||
require.Error(t, s.pingAgent(endpointID))
|
require.Error(t, s.pingAgent(endpoint.ID))
|
||||||
require.NoError(t, srv.Shutdown(context.Background()))
|
require.NoError(t, srv.Shutdown(context.Background()))
|
||||||
require.ErrorIs(t, <-errCh, http.ErrServerClosed)
|
require.ErrorIs(t, <-errCh, http.ErrServerClosed)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,14 +5,18 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
portainer "github.com/portainer/portainer/api"
|
portainer "github.com/portainer/portainer/api"
|
||||||
|
"github.com/portainer/portainer/api/internal/edge"
|
||||||
"github.com/portainer/portainer/api/internal/edge/cache"
|
"github.com/portainer/portainer/api/internal/edge/cache"
|
||||||
|
"github.com/portainer/portainer/api/internal/endpointutils"
|
||||||
"github.com/portainer/portainer/pkg/libcrypto"
|
"github.com/portainer/portainer/pkg/libcrypto"
|
||||||
|
|
||||||
"github.com/dchest/uniuri"
|
"github.com/dchest/uniuri"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -20,18 +24,181 @@ const (
|
||||||
maxAvailablePort = 65535
|
maxAvailablePort = 65535
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Open will mark the tunnel as REQUIRED so the agent opens it
|
||||||
|
func (s *Service) Open(endpoint *portainer.Endpoint) error {
|
||||||
|
if !endpointutils.IsEdgeEndpoint(endpoint) {
|
||||||
|
return errors.New("cannot open a tunnel for non-edge environments")
|
||||||
|
}
|
||||||
|
|
||||||
|
if endpoint.Edge.AsyncMode {
|
||||||
|
return errors.New("cannot open a tunnel for async edge environments")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if _, ok := s.activeTunnels[endpoint.ID]; ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
defer cache.Del(endpoint.ID)
|
||||||
|
|
||||||
|
tun := &portainer.TunnelDetails{
|
||||||
|
Status: portainer.EdgeAgentManagementRequired,
|
||||||
|
Port: s.getUnusedPort(),
|
||||||
|
LastActivity: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
username, password := generateRandomCredentials()
|
||||||
|
|
||||||
|
if s.chiselServer != nil {
|
||||||
|
authorizedRemote := fmt.Sprintf("^R:0.0.0.0:%d$", tun.Port)
|
||||||
|
|
||||||
|
if err := s.chiselServer.AddUser(username, password, authorizedRemote); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
credentials, err := encryptCredentials(username, password, endpoint.EdgeID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tun.Credentials = credentials
|
||||||
|
|
||||||
|
s.activeTunnels[endpoint.ID] = tun
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// close removes the tunnel from the map so the agent will close it
|
||||||
|
func (s *Service) close(endpointID portainer.EndpointID) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
tun, ok := s.activeTunnels[endpointID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tun.Credentials) > 0 && s.chiselServer != nil {
|
||||||
|
user, _, _ := strings.Cut(tun.Credentials, ":")
|
||||||
|
s.chiselServer.DeleteUser(user)
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.ProxyManager != nil {
|
||||||
|
s.ProxyManager.DeleteEndpointProxy(endpointID)
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(s.activeTunnels, endpointID)
|
||||||
|
|
||||||
|
cache.Del(endpointID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config returns the tunnel details needed for the agent to connect
|
||||||
|
func (s *Service) Config(endpointID portainer.EndpointID) portainer.TunnelDetails {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
if tun, ok := s.activeTunnels[endpointID]; ok {
|
||||||
|
return *tun
|
||||||
|
}
|
||||||
|
|
||||||
|
return portainer.TunnelDetails{Status: portainer.EdgeAgentIdle}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TunnelAddr returns the address of the local tunnel, including the port, it
|
||||||
|
// will block until the tunnel is ready
|
||||||
|
func (s *Service) TunnelAddr(endpoint *portainer.Endpoint) (string, error) {
|
||||||
|
if err := s.Open(endpoint); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
tun := s.Config(endpoint.ID)
|
||||||
|
checkinInterval := time.Duration(s.tryEffectiveCheckinInterval(endpoint)) * time.Second
|
||||||
|
|
||||||
|
for t0 := time.Now(); ; {
|
||||||
|
if time.Since(t0) > 2*checkinInterval {
|
||||||
|
s.close(endpoint.ID)
|
||||||
|
|
||||||
|
return "", errors.New("unable to open the tunnel")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the tunnel is established
|
||||||
|
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: tun.Port})
|
||||||
|
if err != nil {
|
||||||
|
time.Sleep(checkinInterval / 100)
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.Close()
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
s.UpdateLastActivity(endpoint.ID)
|
||||||
|
|
||||||
|
return fmt.Sprintf("127.0.0.1:%d", tun.Port), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// tryEffectiveCheckinInterval avoids a potential deadlock by returning a
|
||||||
|
// previous known value after a timeout
|
||||||
|
func (s *Service) tryEffectiveCheckinInterval(endpoint *portainer.Endpoint) int {
|
||||||
|
ch := make(chan int, 1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ch <- edge.EffectiveCheckinInterval(s.dataStore, endpoint)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
return s.defaultCheckinInterval
|
||||||
|
case i := <-ch:
|
||||||
|
s.mu.Lock()
|
||||||
|
s.defaultCheckinInterval = i
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateLastActivity sets the current timestamp to avoid the tunnel timeout
|
||||||
|
func (s *Service) UpdateLastActivity(endpointID portainer.EndpointID) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if tun, ok := s.activeTunnels[endpointID]; ok {
|
||||||
|
tun.LastActivity = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: it needs to be called with the lock acquired
|
// NOTE: it needs to be called with the lock acquired
|
||||||
// getUnusedPort is used to generate an unused random port in the dynamic port range.
|
// getUnusedPort is used to generate an unused random port in the dynamic port range.
|
||||||
// Dynamic ports (also called private ports) are 49152 to 65535.
|
// Dynamic ports (also called private ports) are 49152 to 65535.
|
||||||
func (service *Service) getUnusedPort() int {
|
func (service *Service) getUnusedPort() int {
|
||||||
port := randomInt(minAvailablePort, maxAvailablePort)
|
port := randomInt(minAvailablePort, maxAvailablePort)
|
||||||
|
|
||||||
for _, tunnel := range service.tunnelDetailsMap {
|
for _, tunnel := range service.activeTunnels {
|
||||||
if tunnel.Port == port {
|
if tunnel.Port == port {
|
||||||
return service.getUnusedPort()
|
return service.getUnusedPort()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port})
|
||||||
|
if err == nil {
|
||||||
|
conn.Close()
|
||||||
|
|
||||||
|
log.Debug().
|
||||||
|
Int("port", port).
|
||||||
|
Msg("selected port is in use, trying a different one")
|
||||||
|
|
||||||
|
return service.getUnusedPort()
|
||||||
|
}
|
||||||
|
|
||||||
return port
|
return port
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,152 +206,10 @@ func randomInt(min, max int) int {
|
||||||
return min + rand.Intn(max-min)
|
return min + rand.Intn(max-min)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: it needs to be called with the lock acquired
|
|
||||||
func (service *Service) getTunnelDetails(endpointID portainer.EndpointID) *portainer.TunnelDetails {
|
|
||||||
|
|
||||||
if tunnel, ok := service.tunnelDetailsMap[endpointID]; ok {
|
|
||||||
return tunnel
|
|
||||||
}
|
|
||||||
|
|
||||||
tunnel := &portainer.TunnelDetails{
|
|
||||||
Status: portainer.EdgeAgentIdle,
|
|
||||||
}
|
|
||||||
|
|
||||||
service.tunnelDetailsMap[endpointID] = tunnel
|
|
||||||
|
|
||||||
cache.Del(endpointID)
|
|
||||||
|
|
||||||
return tunnel
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetTunnelDetails returns information about the tunnel associated to an environment(endpoint).
|
|
||||||
func (service *Service) GetTunnelDetails(endpointID portainer.EndpointID) portainer.TunnelDetails {
|
|
||||||
service.mu.Lock()
|
|
||||||
defer service.mu.Unlock()
|
|
||||||
|
|
||||||
return *service.getTunnelDetails(endpointID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetActiveTunnel retrieves an active tunnel which allows communicating with edge agent
|
|
||||||
func (service *Service) GetActiveTunnel(endpoint *portainer.Endpoint) (portainer.TunnelDetails, error) {
|
|
||||||
if endpoint.Edge.AsyncMode {
|
|
||||||
return portainer.TunnelDetails{}, errors.New("cannot open tunnel on async endpoint")
|
|
||||||
}
|
|
||||||
|
|
||||||
tunnel := service.GetTunnelDetails(endpoint.ID)
|
|
||||||
|
|
||||||
if tunnel.Status == portainer.EdgeAgentActive {
|
|
||||||
// update the LastActivity
|
|
||||||
service.SetTunnelStatusToActive(endpoint.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
if tunnel.Status == portainer.EdgeAgentIdle || tunnel.Status == portainer.EdgeAgentManagementRequired {
|
|
||||||
err := service.SetTunnelStatusToRequired(endpoint.ID)
|
|
||||||
if err != nil {
|
|
||||||
return portainer.TunnelDetails{}, fmt.Errorf("failed opening tunnel to endpoint: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if endpoint.EdgeCheckinInterval == 0 {
|
|
||||||
settings, err := service.dataStore.Settings().Settings()
|
|
||||||
if err != nil {
|
|
||||||
return portainer.TunnelDetails{}, fmt.Errorf("failed fetching settings from db: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
endpoint.EdgeCheckinInterval = settings.EdgeAgentCheckinInterval
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Duration(endpoint.EdgeCheckinInterval) * time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
return service.GetTunnelDetails(endpoint.ID), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetTunnelStatusToActive update the status of the tunnel associated to the specified environment(endpoint).
|
|
||||||
// It sets the status to ACTIVE.
|
|
||||||
func (service *Service) SetTunnelStatusToActive(endpointID portainer.EndpointID) {
|
|
||||||
service.mu.Lock()
|
|
||||||
tunnel := service.getTunnelDetails(endpointID)
|
|
||||||
tunnel.Status = portainer.EdgeAgentActive
|
|
||||||
tunnel.Credentials = ""
|
|
||||||
tunnel.LastActivity = time.Now()
|
|
||||||
service.mu.Unlock()
|
|
||||||
|
|
||||||
cache.Del(endpointID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetTunnelStatusToIdle update the status of the tunnel associated to the specified environment(endpoint).
|
|
||||||
// It sets the status to IDLE.
|
|
||||||
// It removes any existing credentials associated to the tunnel.
|
|
||||||
func (service *Service) SetTunnelStatusToIdle(endpointID portainer.EndpointID) {
|
|
||||||
service.mu.Lock()
|
|
||||||
|
|
||||||
tunnel := service.getTunnelDetails(endpointID)
|
|
||||||
tunnel.Status = portainer.EdgeAgentIdle
|
|
||||||
tunnel.Port = 0
|
|
||||||
tunnel.LastActivity = time.Now()
|
|
||||||
|
|
||||||
credentials := tunnel.Credentials
|
|
||||||
if credentials != "" {
|
|
||||||
tunnel.Credentials = ""
|
|
||||||
|
|
||||||
if service.chiselServer != nil {
|
|
||||||
service.chiselServer.DeleteUser(strings.Split(credentials, ":")[0])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
service.ProxyManager.DeleteEndpointProxy(endpointID)
|
|
||||||
|
|
||||||
service.mu.Unlock()
|
|
||||||
|
|
||||||
cache.Del(endpointID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetTunnelStatusToRequired update the status of the tunnel associated to the specified environment(endpoint).
|
|
||||||
// It sets the status to REQUIRED.
|
|
||||||
// If no port is currently associated to the tunnel, it will associate a random unused port to the tunnel
|
|
||||||
// and generate temporary credentials that can be used to establish a reverse tunnel on that port.
|
|
||||||
// Credentials are encrypted using the Edge ID associated to the environment(endpoint).
|
|
||||||
func (service *Service) SetTunnelStatusToRequired(endpointID portainer.EndpointID) error {
|
|
||||||
defer cache.Del(endpointID)
|
|
||||||
|
|
||||||
tunnel := service.getTunnelDetails(endpointID)
|
|
||||||
|
|
||||||
service.mu.Lock()
|
|
||||||
defer service.mu.Unlock()
|
|
||||||
|
|
||||||
if tunnel.Port == 0 {
|
|
||||||
endpoint, err := service.dataStore.Endpoint().Endpoint(endpointID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
tunnel.Status = portainer.EdgeAgentManagementRequired
|
|
||||||
tunnel.Port = service.getUnusedPort()
|
|
||||||
tunnel.LastActivity = time.Now()
|
|
||||||
|
|
||||||
username, password := generateRandomCredentials()
|
|
||||||
authorizedRemote := fmt.Sprintf("^R:0.0.0.0:%d$", tunnel.Port)
|
|
||||||
|
|
||||||
if service.chiselServer != nil {
|
|
||||||
err = service.chiselServer.AddUser(username, password, authorizedRemote)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
credentials, err := encryptCredentials(username, password, endpoint.EdgeID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
tunnel.Credentials = credentials
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func generateRandomCredentials() (string, string) {
|
func generateRandomCredentials() (string, string) {
|
||||||
username := uniuri.NewLen(8)
|
username := uniuri.NewLen(8)
|
||||||
password := uniuri.NewLen(8)
|
password := uniuri.NewLen(8)
|
||||||
|
|
||||||
return username, password
|
return username, password
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package client
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"maps"
|
"maps"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -50,12 +49,12 @@ func (factory *ClientFactory) CreateClient(endpoint *portainer.Endpoint, nodeNam
|
||||||
case portainer.AgentOnDockerEnvironment:
|
case portainer.AgentOnDockerEnvironment:
|
||||||
return createAgentClient(endpoint, endpoint.URL, factory.signatureService, nodeName, timeout)
|
return createAgentClient(endpoint, endpoint.URL, factory.signatureService, nodeName, timeout)
|
||||||
case portainer.EdgeAgentOnDockerEnvironment:
|
case portainer.EdgeAgentOnDockerEnvironment:
|
||||||
tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint)
|
tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointURL := fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)
|
endpointURL := "http://" + tunnelAddr
|
||||||
|
|
||||||
return createAgentClient(endpoint, endpointURL, factory.signatureService, nodeName, timeout)
|
return createAgentClient(endpoint, endpointURL, factory.signatureService, nodeName, timeout)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package exec
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path"
|
"path"
|
||||||
|
@ -186,11 +185,11 @@ func (manager *SwarmStackManager) prepareDockerCommandAndArgs(binaryPath, config
|
||||||
|
|
||||||
endpointURL := endpoint.URL
|
endpointURL := endpoint.URL
|
||||||
if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment {
|
if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment {
|
||||||
tunnel, err := manager.reverseTunnelService.GetActiveTunnel(endpoint)
|
tunnelAddr, err := manager.reverseTunnelService.TunnelAddr(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, err
|
return "", nil, err
|
||||||
}
|
}
|
||||||
endpointURL = fmt.Sprintf("tcp://127.0.0.1:%d", tunnel.Port)
|
endpointURL = "tcp://" + tunnelAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
args = append(args, "-H", endpointURL)
|
args = append(args, "-H", endpointURL)
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
portainer "github.com/portainer/portainer/api"
|
portainer "github.com/portainer/portainer/api"
|
||||||
"github.com/portainer/portainer/api/dataservices"
|
"github.com/portainer/portainer/api/dataservices"
|
||||||
|
"github.com/portainer/portainer/api/internal/edge"
|
||||||
"github.com/portainer/portainer/api/internal/edge/cache"
|
"github.com/portainer/portainer/api/internal/edge/cache"
|
||||||
httperror "github.com/portainer/portainer/pkg/libhttp/error"
|
httperror "github.com/portainer/portainer/pkg/libhttp/error"
|
||||||
"github.com/portainer/portainer/pkg/libhttp/request"
|
"github.com/portainer/portainer/pkg/libhttp/request"
|
||||||
|
@ -134,7 +135,7 @@ func (handler *Handler) inspectStatus(tx dataservices.DataStoreTx, r *http.Reque
|
||||||
|
|
||||||
// Take an initial snapshot
|
// Take an initial snapshot
|
||||||
if endpoint.LastCheckInDate == 0 {
|
if endpoint.LastCheckInDate == 0 {
|
||||||
handler.ReverseTunnelService.SetTunnelStatusToRequired(endpoint.ID)
|
handler.ReverseTunnelService.Open(endpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
agentPlatform, agentPlatformErr := parseAgentPlatform(r)
|
agentPlatform, agentPlatformErr := parseAgentPlatform(r)
|
||||||
|
@ -153,34 +154,21 @@ func (handler *Handler) inspectStatus(tx dataservices.DataStoreTx, r *http.Reque
|
||||||
return nil, httperror.InternalServerError("Unable to persist environment changes inside the database", err)
|
return nil, httperror.InternalServerError("Unable to persist environment changes inside the database", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkinInterval := endpoint.EdgeCheckinInterval
|
tunnel := handler.ReverseTunnelService.Config(endpoint.ID)
|
||||||
if endpoint.EdgeCheckinInterval == 0 {
|
|
||||||
settings, err := tx.Settings().Settings()
|
|
||||||
if err != nil {
|
|
||||||
return nil, httperror.InternalServerError("Unable to retrieve settings from the database", err)
|
|
||||||
}
|
|
||||||
checkinInterval = settings.EdgeAgentCheckinInterval
|
|
||||||
}
|
|
||||||
|
|
||||||
tunnel := handler.ReverseTunnelService.GetTunnelDetails(endpoint.ID)
|
|
||||||
|
|
||||||
statusResponse := endpointEdgeStatusInspectResponse{
|
statusResponse := endpointEdgeStatusInspectResponse{
|
||||||
Status: tunnel.Status,
|
Status: tunnel.Status,
|
||||||
Port: tunnel.Port,
|
Port: tunnel.Port,
|
||||||
CheckinInterval: checkinInterval,
|
CheckinInterval: edge.EffectiveCheckinInterval(tx, endpoint),
|
||||||
Credentials: tunnel.Credentials,
|
Credentials: tunnel.Credentials,
|
||||||
}
|
}
|
||||||
|
|
||||||
schedules, handlerErr := handler.buildSchedules(endpoint.ID, tunnel)
|
schedules, handlerErr := handler.buildSchedules(endpoint.ID)
|
||||||
if handlerErr != nil {
|
if handlerErr != nil {
|
||||||
return nil, handlerErr
|
return nil, handlerErr
|
||||||
}
|
}
|
||||||
statusResponse.Schedules = schedules
|
statusResponse.Schedules = schedules
|
||||||
|
|
||||||
if tunnel.Status == portainer.EdgeAgentManagementRequired {
|
|
||||||
handler.ReverseTunnelService.SetTunnelStatusToActive(endpoint.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
edgeStacksStatus, handlerErr := handler.buildEdgeStacks(tx, endpoint.ID)
|
edgeStacksStatus, handlerErr := handler.buildEdgeStacks(tx, endpoint.ID)
|
||||||
if handlerErr != nil {
|
if handlerErr != nil {
|
||||||
return nil, handlerErr
|
return nil, handlerErr
|
||||||
|
@ -213,9 +201,9 @@ func parseAgentPlatform(r *http.Request) (portainer.EndpointType, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (handler *Handler) buildSchedules(endpointID portainer.EndpointID, tunnel portainer.TunnelDetails) ([]edgeJobResponse, *httperror.HandlerError) {
|
func (handler *Handler) buildSchedules(endpointID portainer.EndpointID) ([]edgeJobResponse, *httperror.HandlerError) {
|
||||||
schedules := []edgeJobResponse{}
|
schedules := []edgeJobResponse{}
|
||||||
for _, job := range tunnel.Jobs {
|
for _, job := range handler.ReverseTunnelService.EdgeJobs(endpointID) {
|
||||||
var collectLogs bool
|
var collectLogs bool
|
||||||
if _, ok := job.GroupLogsCollection[endpointID]; ok {
|
if _, ok := job.GroupLogsCollection[endpointID]; ok {
|
||||||
collectLogs = job.GroupLogsCollection[endpointID].CollectLogs
|
collectLogs = job.GroupLogsCollection[endpointID].CollectLogs
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (handler *Handler) proxyRequestsToDockerAPI(w http.ResponseWriter, r *http.
|
||||||
return httperror.InternalServerError("No Edge agent registered with the environment", errors.New("No agent available"))
|
return httperror.InternalServerError("No Edge agent registered with the environment", errors.New("No agent available"))
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := handler.ReverseTunnelService.GetActiveTunnel(endpoint)
|
_, err := handler.ReverseTunnelService.TunnelAddr(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httperror.InternalServerError("Unable to get the active tunnel", err)
|
return httperror.InternalServerError("Unable to get the active tunnel", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (handler *Handler) proxyRequestsToKubernetesAPI(w http.ResponseWriter, r *h
|
||||||
return httperror.InternalServerError("No Edge agent registered with the environment", errors.New("No agent available"))
|
return httperror.InternalServerError("No Edge agent registered with the environment", errors.New("No agent available"))
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := handler.ReverseTunnelService.GetActiveTunnel(endpoint)
|
_, err := handler.ReverseTunnelService.TunnelAddr(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httperror.InternalServerError("Unable to get the active tunnel", err)
|
return httperror.InternalServerError("Unable to get the active tunnel", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,8 +59,6 @@ func (handler *Handler) endpointAssociationDelete(w http.ResponseWriter, r *http
|
||||||
return httperror.InternalServerError("Failed persisting environment in database", err)
|
return httperror.InternalServerError("Failed persisting environment in database", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.ReverseTunnelService.SetTunnelStatusToIdle(endpoint.ID)
|
|
||||||
|
|
||||||
return response.Empty(w)
|
return response.Empty(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,12 +18,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (handler *Handler) proxyEdgeAgentWebsocketRequest(w http.ResponseWriter, r *http.Request, params *webSocketRequestParams) error {
|
func (handler *Handler) proxyEdgeAgentWebsocketRequest(w http.ResponseWriter, r *http.Request, params *webSocketRequestParams) error {
|
||||||
tunnel, err := handler.ReverseTunnelService.GetActiveTunnel(params.endpoint)
|
tunnelAddr, err := handler.ReverseTunnelService.TunnelAddr(params.endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
agentURL, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port))
|
agentURL, err := url.Parse("http://" + tunnelAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ func (handler *Handler) doProxyWebsocketRequest(
|
||||||
}
|
}
|
||||||
|
|
||||||
if isEdge {
|
if isEdge {
|
||||||
handler.ReverseTunnelService.SetTunnelStatusToActive(params.endpoint.ID)
|
handler.ReverseTunnelService.UpdateLastActivity(params.endpoint.ID)
|
||||||
handler.ReverseTunnelService.KeepTunnelAlive(params.endpoint.ID, r.Context(), portainer.WebSocketKeepAlive)
|
handler.ReverseTunnelService.KeepTunnelAlive(params.endpoint.ID, r.Context(), portainer.WebSocketKeepAlive)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,12 +26,12 @@ func (factory *ProxyFactory) NewAgentProxy(endpoint *portainer.Endpoint) (*Proxy
|
||||||
urlString := endpoint.URL
|
urlString := endpoint.URL
|
||||||
|
|
||||||
if endpointutils.IsEdgeEndpoint(endpoint) {
|
if endpointutils.IsEdgeEndpoint(endpoint) {
|
||||||
tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint)
|
tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed starting tunnel")
|
return nil, errors.Wrap(err, "failed starting tunnel")
|
||||||
}
|
}
|
||||||
|
|
||||||
urlString = fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)
|
urlString = "http://" + tunnelAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointURL, err := url.ParseURL(urlString)
|
endpointURL, err := url.ParseURL(urlString)
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package factory
|
package factory
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -35,8 +34,11 @@ func (factory *ProxyFactory) newDockerLocalProxy(endpoint *portainer.Endpoint) (
|
||||||
func (factory *ProxyFactory) newDockerHTTPProxy(endpoint *portainer.Endpoint) (http.Handler, error) {
|
func (factory *ProxyFactory) newDockerHTTPProxy(endpoint *portainer.Endpoint) (http.Handler, error) {
|
||||||
rawURL := endpoint.URL
|
rawURL := endpoint.URL
|
||||||
if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment {
|
if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment {
|
||||||
tunnel := factory.reverseTunnelService.GetTunnelDetails(endpoint.ID)
|
tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint)
|
||||||
rawURL = fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rawURL = "http://" + tunnelAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointURL, err := url.ParseURL(rawURL)
|
endpointURL, err := url.ParseURL(rawURL)
|
||||||
|
|
|
@ -138,9 +138,7 @@ func (transport *Transport) executeDockerRequest(request *http.Request) (*http.R
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
transport.reverseTunnelService.SetTunnelStatusToActive(transport.endpoint.ID)
|
transport.reverseTunnelService.UpdateLastActivity(transport.endpoint.ID)
|
||||||
} else {
|
|
||||||
transport.reverseTunnelService.SetTunnelStatusToIdle(transport.endpoint.ID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return response, err
|
return response, err
|
||||||
|
|
|
@ -51,8 +51,11 @@ func (factory *ProxyFactory) newKubernetesLocalProxy(endpoint *portainer.Endpoin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (factory *ProxyFactory) newKubernetesEdgeHTTPProxy(endpoint *portainer.Endpoint) (http.Handler, error) {
|
func (factory *ProxyFactory) newKubernetesEdgeHTTPProxy(endpoint *portainer.Endpoint) (http.Handler, error) {
|
||||||
tunnel := factory.reverseTunnelService.GetTunnelDetails(endpoint.ID)
|
tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint)
|
||||||
rawURL := fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rawURL := "http://" + tunnelAddr
|
||||||
|
|
||||||
endpointURL, err := url.Parse(rawURL)
|
endpointURL, err := url.Parse(rawURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -59,9 +59,7 @@ func (transport *edgeTransport) RoundTrip(request *http.Request) (*http.Response
|
||||||
response, err := transport.baseTransport.RoundTrip(request)
|
response, err := transport.baseTransport.RoundTrip(request)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
transport.reverseTunnelService.SetTunnelStatusToActive(transport.endpoint.ID)
|
transport.reverseTunnelService.UpdateLastActivity(transport.endpoint.ID)
|
||||||
} else {
|
|
||||||
transport.reverseTunnelService.SetTunnelStatusToIdle(transport.endpoint.ID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return response, err
|
return response, err
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
package edge
|
package edge
|
||||||
|
|
||||||
import portainer "github.com/portainer/portainer/api"
|
import (
|
||||||
|
portainer "github.com/portainer/portainer/api"
|
||||||
|
"github.com/portainer/portainer/api/dataservices"
|
||||||
|
)
|
||||||
|
|
||||||
// EndpointRelatedEdgeStacks returns a list of Edge stacks related to this Environment(Endpoint)
|
// EndpointRelatedEdgeStacks returns a list of Edge stacks related to this Environment(Endpoint)
|
||||||
func EndpointRelatedEdgeStacks(endpoint *portainer.Endpoint, endpointGroup *portainer.EndpointGroup, edgeGroups []portainer.EdgeGroup, edgeStacks []portainer.EdgeStack) []portainer.EdgeStackID {
|
func EndpointRelatedEdgeStacks(endpoint *portainer.Endpoint, endpointGroup *portainer.EndpointGroup, edgeGroups []portainer.EdgeGroup, edgeStacks []portainer.EdgeStack) []portainer.EdgeStackID {
|
||||||
|
@ -24,3 +27,15 @@ func EndpointRelatedEdgeStacks(endpoint *portainer.Endpoint, endpointGroup *port
|
||||||
|
|
||||||
return relatedEdgeStacks
|
return relatedEdgeStacks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func EffectiveCheckinInterval(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint) int {
|
||||||
|
if endpoint.EdgeCheckinInterval != 0 {
|
||||||
|
return endpoint.EdgeCheckinInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
if settings, err := tx.Settings().Settings(); err == nil {
|
||||||
|
return settings.EdgeAgentCheckinInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
return portainer.DefaultEdgeAgentCheckinIntervalInSeconds
|
||||||
|
}
|
||||||
|
|
|
@ -57,8 +57,6 @@ func NewService(
|
||||||
// NewBackgroundSnapshotter queues snapshots of existing edge environments that
|
// NewBackgroundSnapshotter queues snapshots of existing edge environments that
|
||||||
// do not have one already
|
// do not have one already
|
||||||
func NewBackgroundSnapshotter(dataStore dataservices.DataStore, tunnelService portainer.ReverseTunnelService) {
|
func NewBackgroundSnapshotter(dataStore dataservices.DataStore, tunnelService portainer.ReverseTunnelService) {
|
||||||
var endpointIDs []portainer.EndpointID
|
|
||||||
|
|
||||||
err := dataStore.ViewTx(func(tx dataservices.DataStoreTx) error {
|
err := dataStore.ViewTx(func(tx dataservices.DataStoreTx) error {
|
||||||
endpoints, err := tx.Endpoint().Endpoints()
|
endpoints, err := tx.Endpoint().Endpoints()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -73,7 +71,7 @@ func NewBackgroundSnapshotter(dataStore dataservices.DataStore, tunnelService po
|
||||||
s, err := tx.Snapshot().Read(e.ID)
|
s, err := tx.Snapshot().Read(e.ID)
|
||||||
if dataservices.IsErrObjectNotFound(err) ||
|
if dataservices.IsErrObjectNotFound(err) ||
|
||||||
(err == nil && s.Docker == nil && s.Kubernetes == nil) {
|
(err == nil && s.Docker == nil && s.Kubernetes == nil) {
|
||||||
endpointIDs = append(endpointIDs, e.ID)
|
tunnelService.Open(&e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,11 +81,6 @@ func NewBackgroundSnapshotter(dataStore dataservices.DataStore, tunnelService po
|
||||||
log.Error().Err(err).Msg("background snapshotter failure")
|
log.Error().Err(err).Msg("background snapshotter failure")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, endpointID := range endpointIDs {
|
|
||||||
tunnelService.SetTunnelStatusToActive(endpointID)
|
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseSnapshotFrequency(snapshotInterval string, dataStore dataservices.DataStore) (float64, error) {
|
func parseSnapshotFrequency(snapshotInterval string, dataStore dataservices.DataStore) (float64, error) {
|
||||||
|
|
|
@ -249,11 +249,11 @@ func (factory *ClientFactory) buildAgentConfig(endpoint *portainer.Endpoint) (*r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (factory *ClientFactory) buildEdgeConfig(endpoint *portainer.Endpoint) (*rest.Config, error) {
|
func (factory *ClientFactory) buildEdgeConfig(endpoint *portainer.Endpoint) (*rest.Config, error) {
|
||||||
tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint)
|
tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed activating tunnel")
|
return nil, errors.Wrap(err, "failed activating tunnel")
|
||||||
}
|
}
|
||||||
endpointURL := fmt.Sprintf("http://127.0.0.1:%d/kubernetes", tunnel.Port)
|
endpointURL := fmt.Sprintf("http://%s/kubernetes", tunnelAddr)
|
||||||
|
|
||||||
config, err := clientcmd.BuildConfigFromFlags(endpointURL, "")
|
config, err := clientcmd.BuildConfigFromFlags(endpointURL, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1296,7 +1296,6 @@ type (
|
||||||
Status string
|
Status string
|
||||||
LastActivity time.Time
|
LastActivity time.Time
|
||||||
Port int
|
Port int
|
||||||
Jobs []EdgeJob
|
|
||||||
Credentials string
|
Credentials string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1557,13 +1556,13 @@ type (
|
||||||
ReverseTunnelService interface {
|
ReverseTunnelService interface {
|
||||||
StartTunnelServer(addr, port string, snapshotService SnapshotService) error
|
StartTunnelServer(addr, port string, snapshotService SnapshotService) error
|
||||||
StopTunnelServer() error
|
StopTunnelServer() error
|
||||||
GenerateEdgeKey(url, host string, endpointIdentifier int) string
|
GenerateEdgeKey(apiURL, tunnelAddr string, endpointIdentifier int) string
|
||||||
SetTunnelStatusToActive(endpointID EndpointID)
|
Open(endpoint *Endpoint) error
|
||||||
SetTunnelStatusToRequired(endpointID EndpointID) error
|
Config(endpointID EndpointID) TunnelDetails
|
||||||
SetTunnelStatusToIdle(endpointID EndpointID)
|
TunnelAddr(endpoint *Endpoint) (string, error)
|
||||||
|
UpdateLastActivity(endpointID EndpointID)
|
||||||
KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration)
|
KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration)
|
||||||
GetTunnelDetails(endpointID EndpointID) TunnelDetails
|
EdgeJobs(endpointId EndpointID) []EdgeJob
|
||||||
GetActiveTunnel(endpoint *Endpoint) (TunnelDetails, error)
|
|
||||||
AddEdgeJob(endpoint *Endpoint, edgeJob *EdgeJob)
|
AddEdgeJob(endpoint *Endpoint, edgeJob *EdgeJob)
|
||||||
RemoveEdgeJob(edgeJobID EdgeJobID)
|
RemoveEdgeJob(edgeJobID EdgeJobID)
|
||||||
RemoveEdgeJobFromEndpoint(endpointID EndpointID, edgeJobID EdgeJobID)
|
RemoveEdgeJobFromEndpoint(endpointID EndpointID, edgeJobID EdgeJobID)
|
||||||
|
@ -1878,8 +1877,6 @@ const (
|
||||||
EdgeAgentIdle string = "IDLE"
|
EdgeAgentIdle string = "IDLE"
|
||||||
// EdgeAgentManagementRequired represents a required state for a tunnel connected to an Edge environment(endpoint)
|
// EdgeAgentManagementRequired represents a required state for a tunnel connected to an Edge environment(endpoint)
|
||||||
EdgeAgentManagementRequired string = "REQUIRED"
|
EdgeAgentManagementRequired string = "REQUIRED"
|
||||||
// EdgeAgentActive represents an active state for a tunnel connected to an Edge environment(endpoint)
|
|
||||||
EdgeAgentActive string = "ACTIVE"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// represents an authorization type
|
// represents an authorization type
|
||||||
|
|
Loading…
Reference in New Issue