From c5a1d7e05115f4aa60bdef352f60ced5d946c3e7 Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Tue, 28 May 2024 16:42:56 -0300 Subject: [PATCH] fix(tunnels): make the tunnels more robust EE-7042 (#11877) --- api/chisel/schedules.go | 39 ++- api/chisel/service.go | 256 +++++++------- api/chisel/service_test.go | 16 +- api/chisel/tunnel.go | 313 ++++++++++-------- api/docker/client/client.go | 5 +- api/exec/swarm_stack.go | 5 +- .../endpointedge_status_inspect.go | 26 +- .../handler/endpointproxy/proxy_docker.go | 2 +- .../handler/endpointproxy/proxy_kubernetes.go | 2 +- .../endpoints/endpoint_association_delete.go | 2 - api/http/handler/websocket/proxy.go | 6 +- api/http/proxy/factory/agent.go | 4 +- api/http/proxy/factory/docker.go | 8 +- api/http/proxy/factory/docker/transport.go | 4 +- api/http/proxy/factory/kubernetes.go | 7 +- .../factory/kubernetes/edge_transport.go | 4 +- api/internal/edge/endpoint.go | 17 +- api/internal/snapshot/snapshot.go | 9 +- api/kubernetes/cli/client.go | 4 +- api/portainer.go | 15 +- 20 files changed, 389 insertions(+), 355 deletions(-) diff --git a/api/chisel/schedules.go b/api/chisel/schedules.go index b99174a70..4b0203a33 100644 --- a/api/chisel/schedules.go +++ b/api/chisel/schedules.go @@ -5,6 +5,17 @@ import ( "github.com/portainer/portainer/api/internal/edge/cache" ) +// EdgeJobs retrieves the edge jobs for the given environment +func (service *Service) EdgeJobs(endpointID portainer.EndpointID) []portainer.EdgeJob { + service.mu.RLock() + defer service.mu.RUnlock() + + return append( + make([]portainer.EdgeJob, 0, len(service.edgeJobs[endpointID])), + service.edgeJobs[endpointID]..., + ) +} + // AddEdgeJob register an EdgeJob inside the tunnel details associated to an environment(endpoint). func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portainer.EdgeJob) { if endpoint.Edge.AsyncMode { @@ -12,10 +23,10 @@ func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portai } service.mu.Lock() - tunnel := service.getTunnelDetails(endpoint.ID) + defer service.mu.Unlock() existingJobIndex := -1 - for idx, existingJob := range tunnel.Jobs { + for idx, existingJob := range service.edgeJobs[endpoint.ID] { if existingJob.ID == edgeJob.ID { existingJobIndex = idx @@ -24,30 +35,28 @@ func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portai } if existingJobIndex == -1 { - tunnel.Jobs = append(tunnel.Jobs, *edgeJob) + service.edgeJobs[endpoint.ID] = append(service.edgeJobs[endpoint.ID], *edgeJob) } else { - tunnel.Jobs[existingJobIndex] = *edgeJob + service.edgeJobs[endpoint.ID][existingJobIndex] = *edgeJob } cache.Del(endpoint.ID) - - service.mu.Unlock() } // RemoveEdgeJob will remove the specified Edge job from each tunnel it was registered with. func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) { service.mu.Lock() - for endpointID, tunnel := range service.tunnelDetailsMap { + for endpointID := range service.edgeJobs { n := 0 - for _, edgeJob := range tunnel.Jobs { + for _, edgeJob := range service.edgeJobs[endpointID] { if edgeJob.ID != edgeJobID { - tunnel.Jobs[n] = edgeJob + service.edgeJobs[endpointID][n] = edgeJob n++ } } - tunnel.Jobs = tunnel.Jobs[:n] + service.edgeJobs[endpointID] = service.edgeJobs[endpointID][:n] cache.Del(endpointID) } @@ -57,19 +66,17 @@ func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) { func (service *Service) RemoveEdgeJobFromEndpoint(endpointID portainer.EndpointID, edgeJobID portainer.EdgeJobID) { service.mu.Lock() - tunnel := service.getTunnelDetails(endpointID) + defer service.mu.Unlock() n := 0 - for _, edgeJob := range tunnel.Jobs { + for _, edgeJob := range service.edgeJobs[endpointID] { if edgeJob.ID != edgeJobID { - tunnel.Jobs[n] = edgeJob + service.edgeJobs[endpointID][n] = edgeJob n++ } } - tunnel.Jobs = tunnel.Jobs[:n] + service.edgeJobs[endpointID] = service.edgeJobs[endpointID][:n] cache.Del(endpointID) - - service.mu.Unlock() } diff --git a/api/chisel/service.go b/api/chisel/service.go index 374b7d075..7aafdb0bd 100644 --- a/api/chisel/service.go +++ b/api/chisel/service.go @@ -19,7 +19,6 @@ import ( const ( tunnelCleanupInterval = 10 * time.Second - requiredTimeout = 15 * time.Second activeTimeout = 4*time.Minute + 30*time.Second pingTimeout = 3 * time.Second ) @@ -28,32 +27,54 @@ const ( // It is used to start a reverse tunnel server and to manage the connection status of each tunnel // connected to the tunnel server. type Service struct { - serverFingerprint string - serverPort string - tunnelDetailsMap map[portainer.EndpointID]*portainer.TunnelDetails - dataStore dataservices.DataStore - snapshotService portainer.SnapshotService - chiselServer *chserver.Server - shutdownCtx context.Context - ProxyManager *proxy.Manager - mu sync.Mutex - fileService portainer.FileService + serverFingerprint string + serverPort string + activeTunnels map[portainer.EndpointID]*portainer.TunnelDetails + edgeJobs map[portainer.EndpointID][]portainer.EdgeJob + dataStore dataservices.DataStore + snapshotService portainer.SnapshotService + chiselServer *chserver.Server + shutdownCtx context.Context + ProxyManager *proxy.Manager + mu sync.RWMutex + fileService portainer.FileService + defaultCheckinInterval int } // NewService returns a pointer to a new instance of Service func NewService(dataStore dataservices.DataStore, shutdownCtx context.Context, fileService portainer.FileService) *Service { + defaultCheckinInterval := portainer.DefaultEdgeAgentCheckinIntervalInSeconds + + settings, err := dataStore.Settings().Settings() + if err == nil { + defaultCheckinInterval = settings.EdgeAgentCheckinInterval + } else { + log.Error().Err(err).Msg("unable to retrieve the settings from the database") + } + return &Service{ - tunnelDetailsMap: make(map[portainer.EndpointID]*portainer.TunnelDetails), - dataStore: dataStore, - shutdownCtx: shutdownCtx, - fileService: fileService, + activeTunnels: make(map[portainer.EndpointID]*portainer.TunnelDetails), + edgeJobs: make(map[portainer.EndpointID][]portainer.EdgeJob), + dataStore: dataStore, + shutdownCtx: shutdownCtx, + fileService: fileService, + defaultCheckinInterval: defaultCheckinInterval, } } // pingAgent ping the given agent so that the agent can keep the tunnel alive func (service *Service) pingAgent(endpointID portainer.EndpointID) error { - tunnel := service.GetTunnelDetails(endpointID) - requestURL := fmt.Sprintf("http://127.0.0.1:%d/ping", tunnel.Port) + endpoint, err := service.dataStore.Endpoint().Endpoint(endpointID) + if err != nil { + return err + } + + tunnelAddr, err := service.TunnelAddr(endpoint) + if err != nil { + return err + } + + requestURL := fmt.Sprintf("http://%s/ping", tunnelAddr) req, err := http.NewRequest(http.MethodHead, requestURL, nil) if err != nil { return err @@ -76,47 +97,49 @@ func (service *Service) pingAgent(endpointID portainer.EndpointID) error { // KeepTunnelAlive keeps the tunnel of the given environment for maxAlive duration, or until ctx is done func (service *Service) KeepTunnelAlive(endpointID portainer.EndpointID, ctx context.Context, maxAlive time.Duration) { - go func() { - log.Debug(). - Int("endpoint_id", int(endpointID)). - Float64("max_alive_minutes", maxAlive.Minutes()). - Msg("KeepTunnelAlive: start") + go service.keepTunnelAlive(endpointID, ctx, maxAlive) +} - maxAliveTicker := time.NewTicker(maxAlive) - defer maxAliveTicker.Stop() +func (service *Service) keepTunnelAlive(endpointID portainer.EndpointID, ctx context.Context, maxAlive time.Duration) { + log.Debug(). + Int("endpoint_id", int(endpointID)). + Float64("max_alive_minutes", maxAlive.Minutes()). + Msg("KeepTunnelAlive: start") - pingTicker := time.NewTicker(tunnelCleanupInterval) - defer pingTicker.Stop() + maxAliveTicker := time.NewTicker(maxAlive) + defer maxAliveTicker.Stop() - for { - select { - case <-pingTicker.C: - service.SetTunnelStatusToActive(endpointID) - err := service.pingAgent(endpointID) - if err != nil { - log.Debug(). - Int("endpoint_id", int(endpointID)). - Err(err). - Msg("KeepTunnelAlive: ping agent") - } - case <-maxAliveTicker.C: - log.Debug(). - Int("endpoint_id", int(endpointID)). - Float64("timeout_minutes", maxAlive.Minutes()). - Msg("KeepTunnelAlive: tunnel keep alive timeout") + pingTicker := time.NewTicker(tunnelCleanupInterval) + defer pingTicker.Stop() - return - case <-ctx.Done(): - err := ctx.Err() + for { + select { + case <-pingTicker.C: + service.UpdateLastActivity(endpointID) + + if err := service.pingAgent(endpointID); err != nil { log.Debug(). Int("endpoint_id", int(endpointID)). Err(err). - Msg("KeepTunnelAlive: tunnel stop") - - return + Msg("KeepTunnelAlive: ping agent") } + case <-maxAliveTicker.C: + log.Debug(). + Int("endpoint_id", int(endpointID)). + Float64("timeout_minutes", maxAlive.Minutes()). + Msg("KeepTunnelAlive: tunnel keep alive timeout") + + return + case <-ctx.Done(): + err := ctx.Err() + log.Debug(). + Int("endpoint_id", int(endpointID)). + Err(err). + Msg("KeepTunnelAlive: tunnel stop") + + return } - }() + } } // StartTunnelServer starts a tunnel server on the specified addr and port. @@ -126,7 +149,6 @@ func (service *Service) KeepTunnelAlive(endpointID portainer.EndpointID, ctx con // The snapshotter is used in the tunnel status verification process. func (service *Service) StartTunnelServer(addr, port string, snapshotService portainer.SnapshotService) error { privateKeyFile, err := service.retrievePrivateKeyFile() - if err != nil { return err } @@ -144,21 +166,21 @@ func (service *Service) StartTunnelServer(addr, port string, snapshotService por service.serverFingerprint = chiselServer.GetFingerprint() service.serverPort = port - err = chiselServer.Start(addr, port) - if err != nil { + if err := chiselServer.Start(addr, port); err != nil { return err } + service.chiselServer = chiselServer // TODO: work-around Chisel default behavior. // By default, Chisel will allow anyone to connect if no user exists. username, password := generateRandomCredentials() - err = service.chiselServer.AddUser(username, password, "127.0.0.1") - if err != nil { + if err = service.chiselServer.AddUser(username, password, "127.0.0.1"); err != nil { return err } service.snapshotService = snapshotService + go service.startTunnelVerificationLoop() return nil @@ -172,37 +194,39 @@ func (service *Service) StopTunnelServer() error { func (service *Service) retrievePrivateKeyFile() (string, error) { privateKeyFile := service.fileService.GetDefaultChiselPrivateKeyPath() - exist, _ := service.fileService.FileExists(privateKeyFile) - if !exist { - log.Debug(). - Str("private-key", privateKeyFile). - Msg("Chisel private key file does not exist") - - privateKey, err := ccrypto.GenerateKey("") - if err != nil { - log.Error(). - Err(err). - Msg("Failed to generate chisel private key") - return "", err - } - - err = service.fileService.StoreChiselPrivateKey(privateKey) - if err != nil { - log.Error(). - Err(err). - Msg("Failed to save Chisel private key to disk") - return "", err - } else { - log.Info(). - Str("private-key", privateKeyFile). - Msg("Generated a new Chisel private key file") - } - } else { + if exists, _ := service.fileService.FileExists(privateKeyFile); exists { log.Info(). Str("private-key", privateKeyFile). - Msg("Found Chisel private key file on disk") + Msg("found Chisel private key file on disk") + + return privateKeyFile, nil } + log.Debug(). + Str("private-key", privateKeyFile). + Msg("chisel private key file does not exist") + + privateKey, err := ccrypto.GenerateKey("") + if err != nil { + log.Error(). + Err(err). + Msg("failed to generate chisel private key") + + return "", err + } + + if err = service.fileService.StoreChiselPrivateKey(privateKey); err != nil { + log.Error(). + Err(err). + Msg("failed to save Chisel private key to disk") + + return "", err + } + + log.Info(). + Str("private-key", privateKeyFile). + Msg("generated a new Chisel private key file") + return privateKeyFile, nil } @@ -230,63 +254,45 @@ func (service *Service) startTunnelVerificationLoop() { } } +// checkTunnels finds the first tunnel that has not had any activity recently +// and attempts to take a snapshot, then closes it and returns func (service *Service) checkTunnels() { - tunnels := make(map[portainer.EndpointID]portainer.TunnelDetails) + service.mu.RLock() - service.mu.Lock() - for key, tunnel := range service.tunnelDetailsMap { - if tunnel.LastActivity.IsZero() || tunnel.Status == portainer.EdgeAgentIdle { - continue - } - - if tunnel.Status == portainer.EdgeAgentManagementRequired && time.Since(tunnel.LastActivity) < requiredTimeout { - continue - } - - if tunnel.Status == portainer.EdgeAgentActive && time.Since(tunnel.LastActivity) < activeTimeout { - continue - } - - tunnels[key] = *tunnel - } - service.mu.Unlock() - - for endpointID, tunnel := range tunnels { + for endpointID, tunnel := range service.activeTunnels { elapsed := time.Since(tunnel.LastActivity) log.Debug(). Int("endpoint_id", int(endpointID)). - Str("status", tunnel.Status). - Float64("status_time_seconds", elapsed.Seconds()). + Float64("last_activity_seconds", elapsed.Seconds()). Msg("environment tunnel monitoring") - if tunnel.Status == portainer.EdgeAgentManagementRequired && elapsed > requiredTimeout { - log.Debug(). - Int("endpoint_id", int(endpointID)). - Str("status", tunnel.Status). - Float64("status_time_seconds", elapsed.Seconds()). - Float64("timeout_seconds", requiredTimeout.Seconds()). - Msg("REQUIRED state timeout exceeded") + if tunnel.Status == portainer.EdgeAgentManagementRequired && elapsed < activeTimeout { + continue } - if tunnel.Status == portainer.EdgeAgentActive && elapsed > activeTimeout { - log.Debug(). - Int("endpoint_id", int(endpointID)). - Str("status", tunnel.Status). - Float64("status_time_seconds", elapsed.Seconds()). - Float64("timeout_seconds", activeTimeout.Seconds()). - Msg("ACTIVE state timeout exceeded") + tunnelPort := tunnel.Port - err := service.snapshotEnvironment(endpointID, tunnel.Port) - if err != nil { - log.Error(). - Int("endpoint_id", int(endpointID)). - Err(err). - Msg("unable to snapshot Edge environment") - } + service.mu.RUnlock() + + log.Debug(). + Int("endpoint_id", int(endpointID)). + Float64("last_activity_seconds", elapsed.Seconds()). + Float64("timeout_seconds", activeTimeout.Seconds()). + Msg("last activity timeout exceeded") + + if err := service.snapshotEnvironment(endpointID, tunnelPort); err != nil { + log.Error(). + Int("endpoint_id", int(endpointID)). + Err(err). + Msg("unable to snapshot Edge environment") } - service.SetTunnelStatusToIdle(portainer.EndpointID(endpointID)) + service.close(portainer.EndpointID(endpointID)) + + return } + + service.mu.RUnlock() } func (service *Service) snapshotEnvironment(endpointID portainer.EndpointID, tunnelPort int) error { diff --git a/api/chisel/service_test.go b/api/chisel/service_test.go index a3e1da313..c2f41d4ef 100644 --- a/api/chisel/service_test.go +++ b/api/chisel/service_test.go @@ -8,14 +8,20 @@ import ( "time" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/datastore" "github.com/stretchr/testify/require" ) func TestPingAgentPanic(t *testing.T) { - endpointID := portainer.EndpointID(1) + endpoint := &portainer.Endpoint{ + ID: 1, + Type: portainer.EdgeAgentOnDockerEnvironment, + } - s := NewService(nil, nil, nil) + _, store := datastore.MustNewTestStore(t, true, true) + + s := NewService(store, nil, nil) defer func() { require.Nil(t, recover()) @@ -36,10 +42,10 @@ func TestPingAgentPanic(t *testing.T) { errCh <- srv.Serve(ln) }() - s.getTunnelDetails(endpointID) - s.tunnelDetailsMap[endpointID].Port = ln.Addr().(*net.TCPAddr).Port + s.Open(endpoint) + s.activeTunnels[endpoint.ID].Port = ln.Addr().(*net.TCPAddr).Port - require.Error(t, s.pingAgent(endpointID)) + require.Error(t, s.pingAgent(endpoint.ID)) require.NoError(t, srv.Shutdown(context.Background())) require.ErrorIs(t, <-errCh, http.ErrServerClosed) } diff --git a/api/chisel/tunnel.go b/api/chisel/tunnel.go index 7a38ba359..826e090cd 100644 --- a/api/chisel/tunnel.go +++ b/api/chisel/tunnel.go @@ -5,14 +5,18 @@ import ( "errors" "fmt" "math/rand" + "net" "strings" "time" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge/cache" + "github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/pkg/libcrypto" "github.com/dchest/uniuri" + "github.com/rs/zerolog/log" ) const ( @@ -20,18 +24,181 @@ const ( maxAvailablePort = 65535 ) +// Open will mark the tunnel as REQUIRED so the agent opens it +func (s *Service) Open(endpoint *portainer.Endpoint) error { + if !endpointutils.IsEdgeEndpoint(endpoint) { + return errors.New("cannot open a tunnel for non-edge environments") + } + + if endpoint.Edge.AsyncMode { + return errors.New("cannot open a tunnel for async edge environments") + } + + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.activeTunnels[endpoint.ID]; ok { + return nil + } + + defer cache.Del(endpoint.ID) + + tun := &portainer.TunnelDetails{ + Status: portainer.EdgeAgentManagementRequired, + Port: s.getUnusedPort(), + LastActivity: time.Now(), + } + + username, password := generateRandomCredentials() + + if s.chiselServer != nil { + authorizedRemote := fmt.Sprintf("^R:0.0.0.0:%d$", tun.Port) + + if err := s.chiselServer.AddUser(username, password, authorizedRemote); err != nil { + return err + } + } + + credentials, err := encryptCredentials(username, password, endpoint.EdgeID) + if err != nil { + return err + } + + tun.Credentials = credentials + + s.activeTunnels[endpoint.ID] = tun + + return nil +} + +// close removes the tunnel from the map so the agent will close it +func (s *Service) close(endpointID portainer.EndpointID) { + s.mu.Lock() + defer s.mu.Unlock() + + tun, ok := s.activeTunnels[endpointID] + if !ok { + return + } + + if len(tun.Credentials) > 0 && s.chiselServer != nil { + user, _, _ := strings.Cut(tun.Credentials, ":") + s.chiselServer.DeleteUser(user) + } + + if s.ProxyManager != nil { + s.ProxyManager.DeleteEndpointProxy(endpointID) + } + + delete(s.activeTunnels, endpointID) + + cache.Del(endpointID) +} + +// Config returns the tunnel details needed for the agent to connect +func (s *Service) Config(endpointID portainer.EndpointID) portainer.TunnelDetails { + s.mu.RLock() + defer s.mu.RUnlock() + + if tun, ok := s.activeTunnels[endpointID]; ok { + return *tun + } + + return portainer.TunnelDetails{Status: portainer.EdgeAgentIdle} +} + +// TunnelAddr returns the address of the local tunnel, including the port, it +// will block until the tunnel is ready +func (s *Service) TunnelAddr(endpoint *portainer.Endpoint) (string, error) { + if err := s.Open(endpoint); err != nil { + return "", err + } + + tun := s.Config(endpoint.ID) + checkinInterval := time.Duration(s.tryEffectiveCheckinInterval(endpoint)) * time.Second + + for t0 := time.Now(); ; { + if time.Since(t0) > 2*checkinInterval { + s.close(endpoint.ID) + + return "", errors.New("unable to open the tunnel") + } + + // Check if the tunnel is established + conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: tun.Port}) + if err != nil { + time.Sleep(checkinInterval / 100) + + continue + } + + conn.Close() + + break + } + + s.UpdateLastActivity(endpoint.ID) + + return fmt.Sprintf("127.0.0.1:%d", tun.Port), nil +} + +// tryEffectiveCheckinInterval avoids a potential deadlock by returning a +// previous known value after a timeout +func (s *Service) tryEffectiveCheckinInterval(endpoint *portainer.Endpoint) int { + ch := make(chan int, 1) + + go func() { + ch <- edge.EffectiveCheckinInterval(s.dataStore, endpoint) + }() + + select { + case <-time.After(50 * time.Millisecond): + s.mu.RLock() + defer s.mu.RUnlock() + + return s.defaultCheckinInterval + case i := <-ch: + s.mu.Lock() + s.defaultCheckinInterval = i + s.mu.Unlock() + + return i + } +} + +// UpdateLastActivity sets the current timestamp to avoid the tunnel timeout +func (s *Service) UpdateLastActivity(endpointID portainer.EndpointID) { + s.mu.Lock() + defer s.mu.Unlock() + + if tun, ok := s.activeTunnels[endpointID]; ok { + tun.LastActivity = time.Now() + } +} + // NOTE: it needs to be called with the lock acquired // getUnusedPort is used to generate an unused random port in the dynamic port range. // Dynamic ports (also called private ports) are 49152 to 65535. func (service *Service) getUnusedPort() int { port := randomInt(minAvailablePort, maxAvailablePort) - for _, tunnel := range service.tunnelDetailsMap { + for _, tunnel := range service.activeTunnels { if tunnel.Port == port { return service.getUnusedPort() } } + conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port}) + if err == nil { + conn.Close() + + log.Debug(). + Int("port", port). + Msg("selected port is in use, trying a different one") + + return service.getUnusedPort() + } + return port } @@ -39,152 +206,10 @@ func randomInt(min, max int) int { return min + rand.Intn(max-min) } -// NOTE: it needs to be called with the lock acquired -func (service *Service) getTunnelDetails(endpointID portainer.EndpointID) *portainer.TunnelDetails { - - if tunnel, ok := service.tunnelDetailsMap[endpointID]; ok { - return tunnel - } - - tunnel := &portainer.TunnelDetails{ - Status: portainer.EdgeAgentIdle, - } - - service.tunnelDetailsMap[endpointID] = tunnel - - cache.Del(endpointID) - - return tunnel -} - -// GetTunnelDetails returns information about the tunnel associated to an environment(endpoint). -func (service *Service) GetTunnelDetails(endpointID portainer.EndpointID) portainer.TunnelDetails { - service.mu.Lock() - defer service.mu.Unlock() - - return *service.getTunnelDetails(endpointID) -} - -// GetActiveTunnel retrieves an active tunnel which allows communicating with edge agent -func (service *Service) GetActiveTunnel(endpoint *portainer.Endpoint) (portainer.TunnelDetails, error) { - if endpoint.Edge.AsyncMode { - return portainer.TunnelDetails{}, errors.New("cannot open tunnel on async endpoint") - } - - tunnel := service.GetTunnelDetails(endpoint.ID) - - if tunnel.Status == portainer.EdgeAgentActive { - // update the LastActivity - service.SetTunnelStatusToActive(endpoint.ID) - } - - if tunnel.Status == portainer.EdgeAgentIdle || tunnel.Status == portainer.EdgeAgentManagementRequired { - err := service.SetTunnelStatusToRequired(endpoint.ID) - if err != nil { - return portainer.TunnelDetails{}, fmt.Errorf("failed opening tunnel to endpoint: %w", err) - } - - if endpoint.EdgeCheckinInterval == 0 { - settings, err := service.dataStore.Settings().Settings() - if err != nil { - return portainer.TunnelDetails{}, fmt.Errorf("failed fetching settings from db: %w", err) - } - - endpoint.EdgeCheckinInterval = settings.EdgeAgentCheckinInterval - } - - time.Sleep(2 * time.Duration(endpoint.EdgeCheckinInterval) * time.Second) - } - - return service.GetTunnelDetails(endpoint.ID), nil -} - -// SetTunnelStatusToActive update the status of the tunnel associated to the specified environment(endpoint). -// It sets the status to ACTIVE. -func (service *Service) SetTunnelStatusToActive(endpointID portainer.EndpointID) { - service.mu.Lock() - tunnel := service.getTunnelDetails(endpointID) - tunnel.Status = portainer.EdgeAgentActive - tunnel.Credentials = "" - tunnel.LastActivity = time.Now() - service.mu.Unlock() - - cache.Del(endpointID) -} - -// SetTunnelStatusToIdle update the status of the tunnel associated to the specified environment(endpoint). -// It sets the status to IDLE. -// It removes any existing credentials associated to the tunnel. -func (service *Service) SetTunnelStatusToIdle(endpointID portainer.EndpointID) { - service.mu.Lock() - - tunnel := service.getTunnelDetails(endpointID) - tunnel.Status = portainer.EdgeAgentIdle - tunnel.Port = 0 - tunnel.LastActivity = time.Now() - - credentials := tunnel.Credentials - if credentials != "" { - tunnel.Credentials = "" - - if service.chiselServer != nil { - service.chiselServer.DeleteUser(strings.Split(credentials, ":")[0]) - } - } - - service.ProxyManager.DeleteEndpointProxy(endpointID) - - service.mu.Unlock() - - cache.Del(endpointID) -} - -// SetTunnelStatusToRequired update the status of the tunnel associated to the specified environment(endpoint). -// It sets the status to REQUIRED. -// If no port is currently associated to the tunnel, it will associate a random unused port to the tunnel -// and generate temporary credentials that can be used to establish a reverse tunnel on that port. -// Credentials are encrypted using the Edge ID associated to the environment(endpoint). -func (service *Service) SetTunnelStatusToRequired(endpointID portainer.EndpointID) error { - defer cache.Del(endpointID) - - tunnel := service.getTunnelDetails(endpointID) - - service.mu.Lock() - defer service.mu.Unlock() - - if tunnel.Port == 0 { - endpoint, err := service.dataStore.Endpoint().Endpoint(endpointID) - if err != nil { - return err - } - - tunnel.Status = portainer.EdgeAgentManagementRequired - tunnel.Port = service.getUnusedPort() - tunnel.LastActivity = time.Now() - - username, password := generateRandomCredentials() - authorizedRemote := fmt.Sprintf("^R:0.0.0.0:%d$", tunnel.Port) - - if service.chiselServer != nil { - err = service.chiselServer.AddUser(username, password, authorizedRemote) - if err != nil { - return err - } - } - - credentials, err := encryptCredentials(username, password, endpoint.EdgeID) - if err != nil { - return err - } - tunnel.Credentials = credentials - } - - return nil -} - func generateRandomCredentials() (string, string) { username := uniuri.NewLen(8) password := uniuri.NewLen(8) + return username, password } diff --git a/api/docker/client/client.go b/api/docker/client/client.go index 71a9ae233..065a40382 100644 --- a/api/docker/client/client.go +++ b/api/docker/client/client.go @@ -3,7 +3,6 @@ package client import ( "bytes" "errors" - "fmt" "io" "maps" "net/http" @@ -50,12 +49,12 @@ func (factory *ClientFactory) CreateClient(endpoint *portainer.Endpoint, nodeNam case portainer.AgentOnDockerEnvironment: return createAgentClient(endpoint, endpoint.URL, factory.signatureService, nodeName, timeout) case portainer.EdgeAgentOnDockerEnvironment: - tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint) + tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint) if err != nil { return nil, err } - endpointURL := fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port) + endpointURL := "http://" + tunnelAddr return createAgentClient(endpoint, endpointURL, factory.signatureService, nodeName, timeout) } diff --git a/api/exec/swarm_stack.go b/api/exec/swarm_stack.go index 5745098ee..36d99d2b9 100644 --- a/api/exec/swarm_stack.go +++ b/api/exec/swarm_stack.go @@ -3,7 +3,6 @@ package exec import ( "bytes" "errors" - "fmt" "os" "os/exec" "path" @@ -186,11 +185,11 @@ func (manager *SwarmStackManager) prepareDockerCommandAndArgs(binaryPath, config endpointURL := endpoint.URL if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment { - tunnel, err := manager.reverseTunnelService.GetActiveTunnel(endpoint) + tunnelAddr, err := manager.reverseTunnelService.TunnelAddr(endpoint) if err != nil { return "", nil, err } - endpointURL = fmt.Sprintf("tcp://127.0.0.1:%d", tunnel.Port) + endpointURL = "tcp://" + tunnelAddr } args = append(args, "-H", endpointURL) diff --git a/api/http/handler/endpointedge/endpointedge_status_inspect.go b/api/http/handler/endpointedge/endpointedge_status_inspect.go index a6f8e21fc..1978bdd15 100644 --- a/api/http/handler/endpointedge/endpointedge_status_inspect.go +++ b/api/http/handler/endpointedge/endpointedge_status_inspect.go @@ -15,6 +15,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" + "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/edge/cache" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" @@ -134,7 +135,7 @@ func (handler *Handler) inspectStatus(tx dataservices.DataStoreTx, r *http.Reque // Take an initial snapshot if endpoint.LastCheckInDate == 0 { - handler.ReverseTunnelService.SetTunnelStatusToRequired(endpoint.ID) + handler.ReverseTunnelService.Open(endpoint) } agentPlatform, agentPlatformErr := parseAgentPlatform(r) @@ -153,34 +154,21 @@ func (handler *Handler) inspectStatus(tx dataservices.DataStoreTx, r *http.Reque return nil, httperror.InternalServerError("Unable to persist environment changes inside the database", err) } - checkinInterval := endpoint.EdgeCheckinInterval - if endpoint.EdgeCheckinInterval == 0 { - settings, err := tx.Settings().Settings() - if err != nil { - return nil, httperror.InternalServerError("Unable to retrieve settings from the database", err) - } - checkinInterval = settings.EdgeAgentCheckinInterval - } - - tunnel := handler.ReverseTunnelService.GetTunnelDetails(endpoint.ID) + tunnel := handler.ReverseTunnelService.Config(endpoint.ID) statusResponse := endpointEdgeStatusInspectResponse{ Status: tunnel.Status, Port: tunnel.Port, - CheckinInterval: checkinInterval, + CheckinInterval: edge.EffectiveCheckinInterval(tx, endpoint), Credentials: tunnel.Credentials, } - schedules, handlerErr := handler.buildSchedules(endpoint.ID, tunnel) + schedules, handlerErr := handler.buildSchedules(endpoint.ID) if handlerErr != nil { return nil, handlerErr } statusResponse.Schedules = schedules - if tunnel.Status == portainer.EdgeAgentManagementRequired { - handler.ReverseTunnelService.SetTunnelStatusToActive(endpoint.ID) - } - edgeStacksStatus, handlerErr := handler.buildEdgeStacks(tx, endpoint.ID) if handlerErr != nil { return nil, handlerErr @@ -213,9 +201,9 @@ func parseAgentPlatform(r *http.Request) (portainer.EndpointType, error) { } } -func (handler *Handler) buildSchedules(endpointID portainer.EndpointID, tunnel portainer.TunnelDetails) ([]edgeJobResponse, *httperror.HandlerError) { +func (handler *Handler) buildSchedules(endpointID portainer.EndpointID) ([]edgeJobResponse, *httperror.HandlerError) { schedules := []edgeJobResponse{} - for _, job := range tunnel.Jobs { + for _, job := range handler.ReverseTunnelService.EdgeJobs(endpointID) { var collectLogs bool if _, ok := job.GroupLogsCollection[endpointID]; ok { collectLogs = job.GroupLogsCollection[endpointID].CollectLogs diff --git a/api/http/handler/endpointproxy/proxy_docker.go b/api/http/handler/endpointproxy/proxy_docker.go index db603880d..30782a981 100644 --- a/api/http/handler/endpointproxy/proxy_docker.go +++ b/api/http/handler/endpointproxy/proxy_docker.go @@ -34,7 +34,7 @@ func (handler *Handler) proxyRequestsToDockerAPI(w http.ResponseWriter, r *http. return httperror.InternalServerError("No Edge agent registered with the environment", errors.New("No agent available")) } - _, err := handler.ReverseTunnelService.GetActiveTunnel(endpoint) + _, err := handler.ReverseTunnelService.TunnelAddr(endpoint) if err != nil { return httperror.InternalServerError("Unable to get the active tunnel", err) } diff --git a/api/http/handler/endpointproxy/proxy_kubernetes.go b/api/http/handler/endpointproxy/proxy_kubernetes.go index 80643d94b..ddf54d6ff 100644 --- a/api/http/handler/endpointproxy/proxy_kubernetes.go +++ b/api/http/handler/endpointproxy/proxy_kubernetes.go @@ -34,7 +34,7 @@ func (handler *Handler) proxyRequestsToKubernetesAPI(w http.ResponseWriter, r *h return httperror.InternalServerError("No Edge agent registered with the environment", errors.New("No agent available")) } - _, err := handler.ReverseTunnelService.GetActiveTunnel(endpoint) + _, err := handler.ReverseTunnelService.TunnelAddr(endpoint) if err != nil { return httperror.InternalServerError("Unable to get the active tunnel", err) } diff --git a/api/http/handler/endpoints/endpoint_association_delete.go b/api/http/handler/endpoints/endpoint_association_delete.go index 2fd889102..55cad593c 100644 --- a/api/http/handler/endpoints/endpoint_association_delete.go +++ b/api/http/handler/endpoints/endpoint_association_delete.go @@ -59,8 +59,6 @@ func (handler *Handler) endpointAssociationDelete(w http.ResponseWriter, r *http return httperror.InternalServerError("Failed persisting environment in database", err) } - handler.ReverseTunnelService.SetTunnelStatusToIdle(endpoint.ID) - return response.Empty(w) } diff --git a/api/http/handler/websocket/proxy.go b/api/http/handler/websocket/proxy.go index f6ebf0888..3d6858ff3 100644 --- a/api/http/handler/websocket/proxy.go +++ b/api/http/handler/websocket/proxy.go @@ -18,12 +18,12 @@ import ( ) func (handler *Handler) proxyEdgeAgentWebsocketRequest(w http.ResponseWriter, r *http.Request, params *webSocketRequestParams) error { - tunnel, err := handler.ReverseTunnelService.GetActiveTunnel(params.endpoint) + tunnelAddr, err := handler.ReverseTunnelService.TunnelAddr(params.endpoint) if err != nil { return err } - agentURL, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)) + agentURL, err := url.Parse("http://" + tunnelAddr) if err != nil { return err } @@ -93,7 +93,7 @@ func (handler *Handler) doProxyWebsocketRequest( } if isEdge { - handler.ReverseTunnelService.SetTunnelStatusToActive(params.endpoint.ID) + handler.ReverseTunnelService.UpdateLastActivity(params.endpoint.ID) handler.ReverseTunnelService.KeepTunnelAlive(params.endpoint.ID, r.Context(), portainer.WebSocketKeepAlive) } diff --git a/api/http/proxy/factory/agent.go b/api/http/proxy/factory/agent.go index df74dc3dd..06b0e8370 100644 --- a/api/http/proxy/factory/agent.go +++ b/api/http/proxy/factory/agent.go @@ -26,12 +26,12 @@ func (factory *ProxyFactory) NewAgentProxy(endpoint *portainer.Endpoint) (*Proxy urlString := endpoint.URL if endpointutils.IsEdgeEndpoint(endpoint) { - tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint) + tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint) if err != nil { return nil, errors.Wrap(err, "failed starting tunnel") } - urlString = fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port) + urlString = "http://" + tunnelAddr } endpointURL, err := url.ParseURL(urlString) diff --git a/api/http/proxy/factory/docker.go b/api/http/proxy/factory/docker.go index ca9dce35e..0e040cac5 100644 --- a/api/http/proxy/factory/docker.go +++ b/api/http/proxy/factory/docker.go @@ -1,7 +1,6 @@ package factory import ( - "fmt" "io" "net/http" "strings" @@ -35,8 +34,11 @@ func (factory *ProxyFactory) newDockerLocalProxy(endpoint *portainer.Endpoint) ( func (factory *ProxyFactory) newDockerHTTPProxy(endpoint *portainer.Endpoint) (http.Handler, error) { rawURL := endpoint.URL if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment { - tunnel := factory.reverseTunnelService.GetTunnelDetails(endpoint.ID) - rawURL = fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port) + tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint) + if err != nil { + return nil, err + } + rawURL = "http://" + tunnelAddr } endpointURL, err := url.ParseURL(rawURL) diff --git a/api/http/proxy/factory/docker/transport.go b/api/http/proxy/factory/docker/transport.go index 2548abe00..97e9ddb00 100644 --- a/api/http/proxy/factory/docker/transport.go +++ b/api/http/proxy/factory/docker/transport.go @@ -138,9 +138,7 @@ func (transport *Transport) executeDockerRequest(request *http.Request) (*http.R } if err == nil { - transport.reverseTunnelService.SetTunnelStatusToActive(transport.endpoint.ID) - } else { - transport.reverseTunnelService.SetTunnelStatusToIdle(transport.endpoint.ID) + transport.reverseTunnelService.UpdateLastActivity(transport.endpoint.ID) } return response, err diff --git a/api/http/proxy/factory/kubernetes.go b/api/http/proxy/factory/kubernetes.go index e676b9ea4..d36b7f609 100644 --- a/api/http/proxy/factory/kubernetes.go +++ b/api/http/proxy/factory/kubernetes.go @@ -51,8 +51,11 @@ func (factory *ProxyFactory) newKubernetesLocalProxy(endpoint *portainer.Endpoin } func (factory *ProxyFactory) newKubernetesEdgeHTTPProxy(endpoint *portainer.Endpoint) (http.Handler, error) { - tunnel := factory.reverseTunnelService.GetTunnelDetails(endpoint.ID) - rawURL := fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port) + tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint) + if err != nil { + return nil, err + } + rawURL := "http://" + tunnelAddr endpointURL, err := url.Parse(rawURL) if err != nil { diff --git a/api/http/proxy/factory/kubernetes/edge_transport.go b/api/http/proxy/factory/kubernetes/edge_transport.go index 6d227d589..4eed6934a 100644 --- a/api/http/proxy/factory/kubernetes/edge_transport.go +++ b/api/http/proxy/factory/kubernetes/edge_transport.go @@ -59,9 +59,7 @@ func (transport *edgeTransport) RoundTrip(request *http.Request) (*http.Response response, err := transport.baseTransport.RoundTrip(request) if err == nil { - transport.reverseTunnelService.SetTunnelStatusToActive(transport.endpoint.ID) - } else { - transport.reverseTunnelService.SetTunnelStatusToIdle(transport.endpoint.ID) + transport.reverseTunnelService.UpdateLastActivity(transport.endpoint.ID) } return response, err diff --git a/api/internal/edge/endpoint.go b/api/internal/edge/endpoint.go index 843561107..090fe8ef9 100644 --- a/api/internal/edge/endpoint.go +++ b/api/internal/edge/endpoint.go @@ -1,6 +1,9 @@ package edge -import portainer "github.com/portainer/portainer/api" +import ( + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" +) // EndpointRelatedEdgeStacks returns a list of Edge stacks related to this Environment(Endpoint) func EndpointRelatedEdgeStacks(endpoint *portainer.Endpoint, endpointGroup *portainer.EndpointGroup, edgeGroups []portainer.EdgeGroup, edgeStacks []portainer.EdgeStack) []portainer.EdgeStackID { @@ -24,3 +27,15 @@ func EndpointRelatedEdgeStacks(endpoint *portainer.Endpoint, endpointGroup *port return relatedEdgeStacks } + +func EffectiveCheckinInterval(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint) int { + if endpoint.EdgeCheckinInterval != 0 { + return endpoint.EdgeCheckinInterval + } + + if settings, err := tx.Settings().Settings(); err == nil { + return settings.EdgeAgentCheckinInterval + } + + return portainer.DefaultEdgeAgentCheckinIntervalInSeconds +} diff --git a/api/internal/snapshot/snapshot.go b/api/internal/snapshot/snapshot.go index 6d57abbcc..e38d5366a 100644 --- a/api/internal/snapshot/snapshot.go +++ b/api/internal/snapshot/snapshot.go @@ -57,8 +57,6 @@ func NewService( // NewBackgroundSnapshotter queues snapshots of existing edge environments that // do not have one already func NewBackgroundSnapshotter(dataStore dataservices.DataStore, tunnelService portainer.ReverseTunnelService) { - var endpointIDs []portainer.EndpointID - err := dataStore.ViewTx(func(tx dataservices.DataStoreTx) error { endpoints, err := tx.Endpoint().Endpoints() if err != nil { @@ -73,7 +71,7 @@ func NewBackgroundSnapshotter(dataStore dataservices.DataStore, tunnelService po s, err := tx.Snapshot().Read(e.ID) if dataservices.IsErrObjectNotFound(err) || (err == nil && s.Docker == nil && s.Kubernetes == nil) { - endpointIDs = append(endpointIDs, e.ID) + tunnelService.Open(&e) } } @@ -83,11 +81,6 @@ func NewBackgroundSnapshotter(dataStore dataservices.DataStore, tunnelService po log.Error().Err(err).Msg("background snapshotter failure") return } - - for _, endpointID := range endpointIDs { - tunnelService.SetTunnelStatusToActive(endpointID) - time.Sleep(10 * time.Second) - } } func parseSnapshotFrequency(snapshotInterval string, dataStore dataservices.DataStore) (float64, error) { diff --git a/api/kubernetes/cli/client.go b/api/kubernetes/cli/client.go index 8fd62c286..6b86c588d 100644 --- a/api/kubernetes/cli/client.go +++ b/api/kubernetes/cli/client.go @@ -249,11 +249,11 @@ func (factory *ClientFactory) buildAgentConfig(endpoint *portainer.Endpoint) (*r } func (factory *ClientFactory) buildEdgeConfig(endpoint *portainer.Endpoint) (*rest.Config, error) { - tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint) + tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint) if err != nil { return nil, errors.Wrap(err, "failed activating tunnel") } - endpointURL := fmt.Sprintf("http://127.0.0.1:%d/kubernetes", tunnel.Port) + endpointURL := fmt.Sprintf("http://%s/kubernetes", tunnelAddr) config, err := clientcmd.BuildConfigFromFlags(endpointURL, "") if err != nil { diff --git a/api/portainer.go b/api/portainer.go index 847d6b5e2..e0694a6c2 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -1296,7 +1296,6 @@ type ( Status string LastActivity time.Time Port int - Jobs []EdgeJob Credentials string } @@ -1557,13 +1556,13 @@ type ( ReverseTunnelService interface { StartTunnelServer(addr, port string, snapshotService SnapshotService) error StopTunnelServer() error - GenerateEdgeKey(url, host string, endpointIdentifier int) string - SetTunnelStatusToActive(endpointID EndpointID) - SetTunnelStatusToRequired(endpointID EndpointID) error - SetTunnelStatusToIdle(endpointID EndpointID) + GenerateEdgeKey(apiURL, tunnelAddr string, endpointIdentifier int) string + Open(endpoint *Endpoint) error + Config(endpointID EndpointID) TunnelDetails + TunnelAddr(endpoint *Endpoint) (string, error) + UpdateLastActivity(endpointID EndpointID) KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration) - GetTunnelDetails(endpointID EndpointID) TunnelDetails - GetActiveTunnel(endpoint *Endpoint) (TunnelDetails, error) + EdgeJobs(endpointId EndpointID) []EdgeJob AddEdgeJob(endpoint *Endpoint, edgeJob *EdgeJob) RemoveEdgeJob(edgeJobID EdgeJobID) RemoveEdgeJobFromEndpoint(endpointID EndpointID, edgeJobID EdgeJobID) @@ -1878,8 +1877,6 @@ const ( EdgeAgentIdle string = "IDLE" // EdgeAgentManagementRequired represents a required state for a tunnel connected to an Edge environment(endpoint) EdgeAgentManagementRequired string = "REQUIRED" - // EdgeAgentActive represents an active state for a tunnel connected to an Edge environment(endpoint) - EdgeAgentActive string = "ACTIVE" ) // represents an authorization type