package docker import ( "encoding/json" "fmt" "math/rand" "net" "net/http" "os" "os/exec" "path/filepath" "regexp" "strings" "time" "golang.org/x/mod/semver" "golang.org/x/sync/errgroup" ) type TestConfig struct { TestDir string KubeconfigFile string Token string K3sImage string DBType string SkipStart bool Servers []DockerNode Agents []DockerNode ServerYaml string AgentYaml string } type DockerNode struct { Name string IP string Port int // Not filled by agent nodes URL string // Not filled by agent nodes } // NewTestConfig initializes the test environment and returns the configuration // If k3sImage == "rancher/systemd-node", then the systemd-node container and the local k3s binary // will be used to start the server. This is useful for scenarios where the server needs to be restarted. // k3s version and tag information should be extracted from the version.sh script // and supplied as an argument to the function/test func NewTestConfig(k3sImage string) (*TestConfig, error) { config := &TestConfig{ K3sImage: k3sImage, } // Create temporary directory tempDir, err := os.MkdirTemp("", "k3s-test-") if err != nil { return nil, fmt.Errorf("failed to create temp directory: %v", err) } config.TestDir = tempDir // Create required directories if err := os.MkdirAll(filepath.Join(config.TestDir, "logs"), 0755); err != nil { return nil, fmt.Errorf("failed to create logs directory: %v", err) } // Generate random secret config.Token = fmt.Sprintf("%012d", rand.Int63n(1000000000000)) return config, nil } // portFree checks if a port is in use and returns true if it is free func portFree(port int) bool { listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return false } listener.Close() return true } // getPort finds an available port func getPort() int { var port int for i := 0; i < 100; i++ { port = 10000 + rand.Intn(50000) if portFree(port) { return port } } return -1 } // ProvisionServers starts the required number of k3s servers // and updates the kubeconfig file with the first cp server details func (config *TestConfig) ProvisionServers(numOfServers int) error { for i := 0; i < numOfServers; i++ { // If a server i already exists, skip. This is useful for scenarios where // the first server is started seperate from the rest of the servers if config.Servers != nil && i < len(config.Servers) { continue } testID := filepath.Base(config.TestDir) name := fmt.Sprintf("server-%d-%s", i, strings.ToLower(testID)) port := getPort() if port == -1 { return fmt.Errorf("failed to find an available port") } var joinServer string var dbConnect string var err error if config.DBType == "" && numOfServers > 1 { config.DBType = "etcd" } else if config.DBType == "" { config.DBType = "sqlite" } if i == 0 { dbConnect, err = config.setupDatabase(true) if err != nil { return err } } else { dbConnect, err = config.setupDatabase(false) if err != nil { return err } if config.Servers[0].URL == "" { return fmt.Errorf("first server URL is empty") } joinServer = fmt.Sprintf("--server %s", config.Servers[0].URL) } newServer := DockerNode{ Name: name, Port: port, } var skipStart string if config.SkipStart { skipStart = "INSTALL_K3S_SKIP_START=true" } // If we need restarts, we use the systemd-node container, volume mount the k3s binary // and start the server using the install script if config.K3sImage == "rancher/systemd-node" { dRun := strings.Join([]string{"docker run -d", "--name", name, "--hostname", name, "--privileged", "-p", fmt.Sprintf("127.0.0.1:%d:6443", port), "--memory", "2048m", "-e", fmt.Sprintf("K3S_TOKEN=%s", config.Token), "-e", "K3S_DEBUG=true", "-e", "GOCOVERDIR=/tmp/k3s-cov", "-v", "/sys/fs/bpf:/sys/fs/bpf", "-v", "/lib/modules:/lib/modules", "-v", "/var/run/docker.sock:/var/run/docker.sock", "-v", "/var/lib/docker:/var/lib/docker", "--mount", "type=bind,source=$(pwd)/../../../dist/artifacts/k3s,target=/usr/local/bin/k3s", fmt.Sprintf("%s:v0.0.5", config.K3sImage), "/usr/lib/systemd/systemd --unit=noop.target --show-status=true"}, " ") if out, err := RunCommand(dRun); err != nil { return fmt.Errorf("failed to start systemd container: %s: %v", out, err) } time.Sleep(5 * time.Second) cmd := "mkdir -p /tmp/k3s-cov" if out, err := newServer.RunCmdOnNode(cmd); err != nil { return fmt.Errorf("failed to create coverage directory: %s: %v", out, err) } // Create empty config.yaml for later use cmd = "mkdir -p /etc/rancher/k3s; touch /etc/rancher/k3s/config.yaml" if out, err := newServer.RunCmdOnNode(cmd); err != nil { return fmt.Errorf("failed to create empty config.yaml: %s: %v", out, err) } // Write the raw YAML directly to the config.yaml on the systemd-node container if config.ServerYaml != "" { cmd = fmt.Sprintf("echo '%s' > /etc/rancher/k3s/config.yaml", config.ServerYaml) if out, err := newServer.RunCmdOnNode(cmd); err != nil { return fmt.Errorf("failed to write server yaml: %s: %v", out, err) } } cmd = fmt.Sprintf("curl -sfL https://get.k3s.io | INSTALL_K3S_EXEC='%s' %s INSTALL_K3S_SKIP_DOWNLOAD=true sh -", dbConnect+" "+joinServer+" "+os.Getenv(fmt.Sprintf("SERVER_%d_ARGS", i)), skipStart) if _, err := newServer.RunCmdOnNode(cmd); err != nil { // Attempt to dump the last few lines of the journalctl logs logs, _ := newServer.DumpServiceLogs(10) return fmt.Errorf("failed to start server: %s: %v", logs, err) } } else { // Write the server yaml to the testing directory and mount it into the container var yamlMount string if config.ServerYaml != "" { if err := os.WriteFile(filepath.Join(config.TestDir, fmt.Sprintf("server-%d.yaml", i)), []byte(config.ServerYaml), 0644); err != nil { return fmt.Errorf("failed to write server yaml: %v", err) } yamlMount = fmt.Sprintf("--mount type=bind,src=%s,dst=/etc/rancher/k3s/config.yaml", filepath.Join(config.TestDir, fmt.Sprintf("server-%d.yaml", i))) } // Assemble all the Docker args dRun := strings.Join([]string{"docker run -d", "--name", name, "--hostname", name, "--privileged", "-p", fmt.Sprintf("127.0.0.1:%d:6443", port), "-p", "6443", "-e", fmt.Sprintf("K3S_TOKEN=%s", config.Token), "-e", "K3S_DEBUG=true", "-e", "GOCOVERDIR=/tmp/", os.Getenv("SERVER_DOCKER_ARGS"), os.Getenv(fmt.Sprintf("SERVER_%d_DOCKER_ARGS", i)), os.Getenv("REGISTRY_CLUSTER_ARGS"), yamlMount, config.K3sImage, "server", dbConnect, joinServer, os.Getenv(fmt.Sprintf("SERVER_%d_ARGS", i))}, " ") if out, err := RunCommand(dRun); err != nil { return fmt.Errorf("failed to run server container: %s: %v", out, err) } } // Get the IP address of the container ipOutput, err := RunCommand("docker inspect --format \"{{ .NetworkSettings.IPAddress }}\" " + name) if err != nil { return err } ip := strings.TrimSpace(ipOutput) url := fmt.Sprintf("https://%s:6443", ip) newServer.URL = url newServer.IP = ip config.Servers = append(config.Servers, newServer) fmt.Printf("Started %s @ %s\n", name, url) // Sleep for a bit to allow the first server to start if i == 0 && numOfServers > 1 { time.Sleep(10 * time.Second) } } if config.SkipStart { return nil } // Wait for kubeconfig to be available time.Sleep(5 * time.Second) return config.CopyAndModifyKubeconfig() } // setupDatabase will start the configured database if startDB is true, // and return the correct flag to join the configured database func (config *TestConfig) setupDatabase(startDB bool) (string, error) { joinFlag := "" startCmd := "" switch config.DBType { case "mysql": startCmd = "docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=docker -p 3306:3306 mysql:8.4" joinFlag = "--datastore-endpoint='mysql://root:docker@tcp(172.17.0.1:3306)/k3s'" case "postgres": startCmd = "docker run -d --name postgres -e POSTGRES_PASSWORD=docker -p 5432:5432 postgres:16-alpine" joinFlag = "--datastore-endpoint='postgres://postgres:docker@tcp(172.17.0.1:5432)/k3s'" case "etcd": if startDB { joinFlag = "--cluster-init" } case "sqlite": break default: return "", fmt.Errorf("unsupported database type: %s", config.DBType) } if startDB && startCmd != "" { if out, err := RunCommand(startCmd); err != nil { return "", fmt.Errorf("failed to start %s container: %s: %v", config.DBType, out, err) } // Wait for DB to start time.Sleep(10 * time.Second) } return joinFlag, nil } func (config *TestConfig) ProvisionAgents(numOfAgents int) error { if err := checkVersionSkew(config); err != nil { return err } testID := filepath.Base(config.TestDir) k3sURL := getEnvOrDefault("K3S_URL", config.Servers[0].URL) var g errgroup.Group for i := 0; i < numOfAgents; i++ { i := i // capture loop variable g.Go(func() error { name := fmt.Sprintf("agent-%d-%s", i, strings.ToLower(testID)) agentInstanceArgs := fmt.Sprintf("AGENT_%d_ARGS", i) newAgent := DockerNode{ Name: name, } var skipStart string if config.SkipStart { skipStart = "INSTALL_K3S_SKIP_START=true" } if config.K3sImage == "rancher/systemd-node" { dRun := strings.Join([]string{"docker run -d", "--name", name, "--hostname", name, "--privileged", "--memory", "2048m", "-e", fmt.Sprintf("K3S_TOKEN=%s", config.Token), "-e", fmt.Sprintf("K3S_URL=%s", k3sURL), "-v", "/sys/fs/bpf:/sys/fs/bpf", "-v", "/lib/modules:/lib/modules", "-v", "/var/run/docker.sock:/var/run/docker.sock", "-v", "/var/lib/docker:/var/lib/docker", "--mount", "type=bind,source=$(pwd)/../../../dist/artifacts/k3s,target=/usr/local/bin/k3s", fmt.Sprintf("%s:v0.0.5", config.K3sImage), "/usr/lib/systemd/systemd --unit=noop.target --show-status=true"}, " ") if out, err := RunCommand(dRun); err != nil { return fmt.Errorf("failed to start systemd container: %s: %v", out, err) } time.Sleep(5 * time.Second) // Create empty config.yaml for later use cmd := "mkdir -p /etc/rancher/k3s; touch /etc/rancher/k3s/config.yaml" if out, err := newAgent.RunCmdOnNode(cmd); err != nil { return fmt.Errorf("failed to create empty config.yaml: %s: %v", out, err) } // Write the raw YAML directly to the config.yaml on the systemd-node container if config.AgentYaml != "" { cmd = fmt.Sprintf("echo '%s' > /etc/rancher/k3s/config.yaml", config.AgentYaml) if out, err := newAgent.RunCmdOnNode(cmd); err != nil { return fmt.Errorf("failed to write server yaml: %s: %v", out, err) } } sCmd := fmt.Sprintf("curl -sfL https://get.k3s.io | INSTALL_K3S_EXEC='agent %s' %s INSTALL_K3S_SKIP_DOWNLOAD=true sh -", os.Getenv(agentInstanceArgs), skipStart) if _, err := newAgent.RunCmdOnNode(sCmd); err != nil { // Attempt to dump the last few lines of the journalctl logs logs, _ := newAgent.DumpServiceLogs(10) return fmt.Errorf("failed to start server: %s: %v", logs, err) } } else { // Assemble all the Docker args dRun := strings.Join([]string{"docker run -d", "--name", name, "--hostname", name, "--privileged", "-e", fmt.Sprintf("K3S_TOKEN=%s", config.Token), "-e", fmt.Sprintf("K3S_URL=%s", k3sURL), "-e", "GOCOVERDIR=/tmp/", os.Getenv("AGENT_DOCKER_ARGS"), os.Getenv(fmt.Sprintf("AGENT_%d_DOCKER_ARGS", i)), os.Getenv("REGISTRY_CLUSTER_ARGS"), config.K3sImage, "agent", os.Getenv("ARGS"), os.Getenv(agentInstanceArgs)}, " ") if out, err := RunCommand(dRun); err != nil { return fmt.Errorf("failed to run agent container: %s: %v", out, err) } } // Get the IP address of the container ipOutput, err := RunCommand("docker inspect --format \"{{ .NetworkSettings.IPAddress }}\" " + name) if err != nil { return err } ip := strings.TrimSpace(ipOutput) newAgent.IP = ip config.Agents = append(config.Agents, newAgent) fmt.Printf("Started %s\n", name) return nil }) } if err := g.Wait(); err != nil { return err } return nil } func (config *TestConfig) RemoveNode(nodeName string) error { cmd := fmt.Sprintf("docker stop %s", nodeName) if _, err := RunCommand(cmd); err != nil { return fmt.Errorf("failed to stop node %s: %v", nodeName, err) } cmd = fmt.Sprintf("docker rm -v %s", nodeName) if _, err := RunCommand(cmd); err != nil { return fmt.Errorf("failed to remove node %s: %v", nodeName, err) } fmt.Println("Stopped and removed", nodeName) return nil } // Returns a list of all server names func (config *TestConfig) GetServerNames() []string { var serverNames []string for _, server := range config.Servers { serverNames = append(serverNames, server.Name) } return serverNames } // Returns a list of all agent names func (config *TestConfig) GetAgentNames() []string { var agentNames []string for _, agent := range config.Agents { agentNames = append(agentNames, agent.Name) } return agentNames } // Returns a list of all node names func (config *TestConfig) GetNodeNames() []string { var nodeNames []string nodeNames = append(nodeNames, config.GetServerNames()...) nodeNames = append(nodeNames, config.GetAgentNames()...) return nodeNames } func (config *TestConfig) Cleanup() error { errs := make([]error, 0) // Stop and remove all servers for _, server := range config.Servers { if err := config.RemoveNode(server.Name); err != nil { errs = append(errs, err) } } config.Servers = nil // Stop and remove all agents for _, agent := range config.Agents { if err := config.RemoveNode(agent.Name); err != nil { errs = append(errs, err) } } config.Agents = nil // Remove volumes created by the agent/server containers cmd := fmt.Sprintf("docker volume ls -q | grep -F %s | xargs -r docker volume rm", strings.ToLower(filepath.Base(config.TestDir))) if _, err := RunCommand(cmd); err != nil { errs = append(errs, fmt.Errorf("failed to remove volumes: %v", err)) } // Stop DB if it was started if config.DBType == "mysql" || config.DBType == "postgres" { cmd := fmt.Sprintf("docker stop %s", config.DBType) if _, err := RunCommand(cmd); err != nil { errs = append(errs, fmt.Errorf("failed to stop %s: %v", config.DBType, err)) } cmd = fmt.Sprintf("docker rm -v %s", config.DBType) if _, err := RunCommand(cmd); err != nil { errs = append(errs, fmt.Errorf("failed to remove %s: %v", config.DBType, err)) } } // Error out if we hit any issues if len(errs) > 0 { return fmt.Errorf("cleanup failed: %v", errs) } if config.TestDir != "" { return os.RemoveAll(config.TestDir) } return nil } // CopyAndModifyKubeconfig copies out kubeconfig from first control-plane server // and updates the port to match the external port func (config *TestConfig) CopyAndModifyKubeconfig() error { if len(config.Servers) == 0 { return fmt.Errorf("no servers available to copy kubeconfig") } serverID := 0 for i := range config.Servers { server_args := os.Getenv(fmt.Sprintf("SERVER_%d_ARGS", i)) if !strings.Contains(server_args, "--disable-apiserver") { serverID = i break } } // Try twice with a 10s delay between attempts, this is flaky on the arm drone runner var err error var cmd string for i := 1; i <= 2; i++ { cmd = fmt.Sprintf("docker cp %s:/etc/rancher/k3s/k3s.yaml %s/kubeconfig.yaml", config.Servers[serverID].Name, config.TestDir) _, err = RunCommand(cmd) if err != nil { fmt.Printf("Failed to copy kubeconfig, attempt %d: %v\n", i, err) time.Sleep(10 * time.Second) } else { break } } if err != nil { return fmt.Errorf("failed to copy kubeconfig: %v", err) } cmd = fmt.Sprintf("sed -i -e \"s/:6443/:%d/g\" %s/kubeconfig.yaml", config.Servers[serverID].Port, config.TestDir) if _, err := RunCommand(cmd); err != nil { return fmt.Errorf("failed to update kubeconfig: %v", err) } config.KubeconfigFile = filepath.Join(config.TestDir, "kubeconfig.yaml") fmt.Println("Kubeconfig file: ", config.KubeconfigFile) return nil } // RunCmdOnNode runs a command on a docker container func (node DockerNode) RunCmdOnNode(cmd string) (string, error) { dCmd := fmt.Sprintf("docker exec %s /bin/sh -c \"%s\"", node.Name, cmd) out, err := RunCommand(dCmd) if err != nil { return out, fmt.Errorf("%v: on node %s: %s", err, node.Name, out) } return out, nil } // RunCommand Runs command on the host. func RunCommand(cmd string) (string, error) { c := exec.Command("bash", "-c", cmd) out, err := c.CombinedOutput() if err != nil { return string(out), fmt.Errorf("failed to run command: %s, %v", cmd, err) } return string(out), err } func checkVersionSkew(config *TestConfig) error { if len(config.Agents) > 0 { serverImage := getEnvOrDefault("K3S_IMAGE_SERVER", config.K3sImage) agentImage := getEnvOrDefault("K3S_IMAGE_AGENT", config.K3sImage) if semver.Compare(semver.MajorMinor(agentImage), semver.MajorMinor(serverImage)) > 0 { return fmt.Errorf("agent version cannot be higher than server - not supported by Kubernetes version skew policy") } } return nil } func getEnvOrDefault(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue } // VerifyValidVersion checks for invalid version strings func VerifyValidVersion(node DockerNode, binary string) error { output, err := node.RunCmdOnNode(binary + " version") if err != nil { return err } lines := strings.Split(output, "\n") // Check for invalid version strings re := regexp.MustCompile(`(?i).*(dev|head|unknown|fail|refuse|\+[^"]*\.).*`) for _, line := range lines { if re.MatchString(line) { return fmt.Errorf("invalid version string found in %s: %s", binary, line) } } return nil } // Dump the journalctl logs for the k3s service func (node DockerNode) DumpServiceLogs(lines int) (string, error) { var cmd string if strings.Contains(node.Name, "agent") { cmd = fmt.Sprintf("journalctl -u k3s-agent -n %d", lines) } else { cmd = fmt.Sprintf("journalctl -u k3s -n %d", lines) } res, err := node.RunCmdOnNode(cmd) if strings.Contains(res, "No entries") { return "", fmt.Errorf("no logs found") } return res, err } // Returns the latest version from the update channel func GetVersionFromChannel(upgradeChannel string) (string, error) { url := fmt.Sprintf("https://update.k3s.io/v1-release/channels/%s", upgradeChannel) client := &http.Client{ CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse }, } resp, err := client.Get(url) if err != nil { return "", fmt.Errorf("failed to get URL: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusFound { return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) } finalURL := resp.Header.Get("Location") if finalURL == "" { return "", fmt.Errorf("location header not set") } version := finalURL[strings.LastIndex(finalURL, "/")+1:] version = strings.Replace(version, "+", "-", 1) return version, nil } // TODO the below functions are replicated from e2e test utils. Consider combining into commmon package func (config TestConfig) DeployWorkload(workload string) (string, error) { resourceDir := "../resources" files, err := os.ReadDir(resourceDir) if err != nil { err = fmt.Errorf("%s : Unable to read resource manifest file for %s", err, workload) return "", err } fmt.Println("\nDeploying", workload) for _, f := range files { filename := filepath.Join(resourceDir, f.Name()) if strings.TrimSpace(f.Name()) == workload { cmd := "kubectl apply -f " + filename + " --kubeconfig=" + config.KubeconfigFile return RunCommand(cmd) } } return "", nil } type svcExternalIP struct { IP string `json:"ip"` IPMode string `json:"ipMode"` } // FetchExternalIPs fetches the external IPs of a service func FetchExternalIPs(kubeconfig string, servicename string) ([]string, error) { var externalIPs []string cmd := "kubectl get svc " + servicename + " -o jsonpath='{.status.loadBalancer.ingress}' --kubeconfig=" + kubeconfig output, err := RunCommand(cmd) if err != nil { return externalIPs, err } var svcExternalIPs []svcExternalIP err = json.Unmarshal([]byte(output), &svcExternalIPs) if err != nil { return externalIPs, fmt.Errorf("error unmarshalling JSON: %v", err) } // Iterate over externalIPs and append each IP to the ips slice for _, ipEntry := range svcExternalIPs { externalIPs = append(externalIPs, ipEntry.IP) } return externalIPs, nil } // RestartCluster restarts the k3s service on each node given func RestartCluster(nodes []DockerNode) error { for _, node := range nodes { cmd := "systemctl restart k3s* --all" if _, err := node.RunCmdOnNode(cmd); err != nil { return err } } return nil }