fix(k8s) keep tunnel alive for websocket connection EE-1690 (#5677)
* fix(k8s) EE-1690 keep tunnel alive for websocket connection * fix(k8s) EE-1690 fix comment Co-authored-by: Simon Meng <simon.meng@portainer.io>pull/5650/head
parent
483559af09
commit
33118babdd
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -42,6 +43,55 @@ func NewService(dataStore portainer.DataStore, shutdownCtx context.Context) *Ser
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pingAgent ping the given agent so that the agent can keep the tunnel alive
|
||||||
|
func (service *Service) pingAgent(endpointID portainer.EndpointID) error{
|
||||||
|
tunnel := service.GetTunnelDetails(endpointID)
|
||||||
|
requestURL := fmt.Sprintf("http://127.0.0.1:%d/ping", tunnel.Port)
|
||||||
|
req, err := http.NewRequest(http.MethodHead, requestURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
httpClient := &http.Client{
|
||||||
|
Timeout: 3 * time.Second,
|
||||||
|
}
|
||||||
|
_, err = httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeepTunnelAlive keeps the tunnel of the given environment for maxAlive duration, or until ctx is done
|
||||||
|
func (service *Service) KeepTunnelAlive(endpointID portainer.EndpointID, ctx context.Context, maxAlive time.Duration) {
|
||||||
|
go func() {
|
||||||
|
log.Printf("[DEBUG] [chisel,KeepTunnelAlive] [endpoint_id: %d] [message: start for %.0f minutes]\n", endpointID, maxAlive.Minutes())
|
||||||
|
maxAliveTicker := time.NewTicker(maxAlive)
|
||||||
|
defer maxAliveTicker.Stop()
|
||||||
|
pingTicker := time.NewTicker(tunnelCleanupInterval)
|
||||||
|
defer pingTicker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-pingTicker.C:
|
||||||
|
service.SetTunnelStatusToActive(endpointID)
|
||||||
|
err := service.pingAgent(endpointID)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[DEBUG] [chisel,KeepTunnelAlive] [endpoint_id: %d] [warning: ping agent err=%s]\n", endpointID, err)
|
||||||
|
}
|
||||||
|
case <-maxAliveTicker.C:
|
||||||
|
log.Printf("[DEBUG] [chisel,KeepTunnelAlive] [endpoint_id: %d] [message: stop as %.0f minutes timeout]\n", endpointID, maxAlive.Minutes())
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
err := ctx.Err()
|
||||||
|
log.Printf("[DEBUG] [chisel,KeepTunnelAlive] [endpoint_id: %d] [message: stop as err=%s]\n", endpointID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// StartTunnelServer starts a tunnel server on the specified addr and port.
|
// StartTunnelServer starts a tunnel server on the specified addr and port.
|
||||||
// It uses a seed to generate a new private/public key pair. If the seed cannot
|
// It uses a seed to generate a new private/public key pair. If the seed cannot
|
||||||
// be found inside the database, it will generate a new one randomly and persist it.
|
// be found inside the database, it will generate a new one randomly and persist it.
|
||||||
|
|
|
@ -35,6 +35,9 @@ func (handler *Handler) proxyEdgeAgentWebsocketRequest(w http.ResponseWriter, r
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.ReverseTunnelService.SetTunnelStatusToActive(params.endpoint.ID)
|
handler.ReverseTunnelService.SetTunnelStatusToActive(params.endpoint.ID)
|
||||||
|
|
||||||
|
handler.ReverseTunnelService.KeepTunnelAlive(params.endpoint.ID, r.Context(), portainer.WebSocketKeepAlive)
|
||||||
|
|
||||||
proxy.ServeHTTP(w, r)
|
proxy.ServeHTTP(w, r)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -21,9 +21,7 @@ const shellPodImage = "portainer/kubectl-shell"
|
||||||
// - The shell pod will be automatically removed after a specified max life (prevent zombie pods)
|
// - The shell pod will be automatically removed after a specified max life (prevent zombie pods)
|
||||||
// - The shell pod will be automatically removed if request is cancelled (or client closes websocket connection)
|
// - The shell pod will be automatically removed if request is cancelled (or client closes websocket connection)
|
||||||
func (kcl *KubeClient) CreateUserShellPod(ctx context.Context, serviceAccountName string) (*portainer.KubernetesShellPod, error) {
|
func (kcl *KubeClient) CreateUserShellPod(ctx context.Context, serviceAccountName string) (*portainer.KubernetesShellPod, error) {
|
||||||
// Schedule the pod for automatic removal
|
maxPodKeepAliveSecondsStr := fmt.Sprintf("%d", int(portainer.WebSocketKeepAlive.Seconds()))
|
||||||
maxPodKeepAlive := 1 * time.Hour
|
|
||||||
maxPodKeepAliveSecondsStr := fmt.Sprintf("%d", int(maxPodKeepAlive.Seconds()))
|
|
||||||
|
|
||||||
podPrefix := userShellPodPrefix(serviceAccountName)
|
podPrefix := userShellPodPrefix(serviceAccountName)
|
||||||
|
|
||||||
|
@ -81,7 +79,7 @@ func (kcl *KubeClient) CreateUserShellPod(ctx context.Context, serviceAccountNam
|
||||||
// Handle pod lifecycle/cleanup - terminate pod after maxPodKeepAlive or upon request (long-lived) cancellation
|
// Handle pod lifecycle/cleanup - terminate pod after maxPodKeepAlive or upon request (long-lived) cancellation
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(maxPodKeepAlive):
|
case <-time.After(portainer.WebSocketKeepAlive):
|
||||||
log.Println("[DEBUG] [internal,kubernetes/pod] [message: pod removal schedule duration exceeded]")
|
log.Println("[DEBUG] [internal,kubernetes/pod] [message: pod removal schedule duration exceeded]")
|
||||||
kcl.cli.CoreV1().Pods(portainerNamespace).Delete(shellPod.Name, nil)
|
kcl.cli.CoreV1().Pods(portainerNamespace).Delete(shellPod.Name, nil)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -1324,6 +1324,7 @@ type (
|
||||||
SetTunnelStatusToActive(endpointID EndpointID)
|
SetTunnelStatusToActive(endpointID EndpointID)
|
||||||
SetTunnelStatusToRequired(endpointID EndpointID) error
|
SetTunnelStatusToRequired(endpointID EndpointID) error
|
||||||
SetTunnelStatusToIdle(endpointID EndpointID)
|
SetTunnelStatusToIdle(endpointID EndpointID)
|
||||||
|
KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration)
|
||||||
GetTunnelDetails(endpointID EndpointID) *TunnelDetails
|
GetTunnelDetails(endpointID EndpointID) *TunnelDetails
|
||||||
AddEdgeJob(endpointID EndpointID, edgeJob *EdgeJob)
|
AddEdgeJob(endpointID EndpointID, edgeJob *EdgeJob)
|
||||||
RemoveEdgeJob(edgeJobID EdgeJobID)
|
RemoveEdgeJob(edgeJobID EdgeJobID)
|
||||||
|
@ -1493,6 +1494,8 @@ const (
|
||||||
DefaultUserSessionTimeout = "8h"
|
DefaultUserSessionTimeout = "8h"
|
||||||
// DefaultUserSessionTimeout represents the default timeout after which the user session is cleared
|
// DefaultUserSessionTimeout represents the default timeout after which the user session is cleared
|
||||||
DefaultKubeconfigExpiry = "0"
|
DefaultKubeconfigExpiry = "0"
|
||||||
|
// WebSocketKeepAlive web socket keep alive for edge environments
|
||||||
|
WebSocketKeepAlive = 1 * time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
Loading…
Reference in New Issue