From 211ebb8a69621f9a7a2acf7ff51aa43dde5b23ef Mon Sep 17 00:00:00 2001 From: Matt Rickard Date: Wed, 23 Aug 2017 11:33:00 -0700 Subject: [PATCH] Use kubernetes client in integration tests --- test/integration/addons_test.go | 67 +----- test/integration/cluster_dns_test.go | 49 ++-- test/integration/mount_test.go | 21 +- test/integration/persistence_test.go | 54 +---- .../testdata/busybox-mount-test.yaml | 2 + test/integration/testdata/busybox.yaml | 2 + test/integration/util/kubernetes.go | 219 ++++++++++++++++++ test/integration/util/util.go | 67 ++++-- 8 files changed, 317 insertions(+), 164 deletions(-) create mode 100644 test/integration/util/kubernetes.go diff --git a/test/integration/addons_test.go b/test/integration/addons_test.go index 8405a0b506..218856dd33 100644 --- a/test/integration/addons_test.go +++ b/test/integration/addons_test.go @@ -26,79 +26,32 @@ import ( "testing" "time" - "github.com/pkg/errors" - - "k8s.io/client-go/pkg/api" - "k8s.io/client-go/pkg/api/v1" - + "k8s.io/apimachinery/pkg/labels" "k8s.io/minikube/test/integration/util" ) -var ( - addonManagerCmd = []string{"get", "pods", "--namespace=kube-system"} - dashboardRcCmd = []string{"get", "rc", "kubernetes-dashboard", "--namespace=kube-system"} - dashboardSvcCmd = []string{"get", "svc", "kubernetes-dashboard", "--namespace=kube-system"} -) - func testAddons(t *testing.T) { t.Parallel() - kubectlRunner := util.NewKubectlRunner(t) - - checkAddon := func() error { - pods := v1.PodList{} - if err := kubectlRunner.RunCommandParseOutput(addonManagerCmd, &pods); err != nil { - return errors.Wrap(err, "Error parsing kubectl output") - } - - for _, p := range pods.Items { - if strings.HasPrefix(p.ObjectMeta.Name, "kube-addon-manager-") { - if p.Status.Phase == "Running" { - return nil - } - return fmt.Errorf("Pod is not Running. Status: %s", p.Status.Phase) - } - } - - return fmt.Errorf("Addon manager not found. Found pods: %v", pods) + client, err := util.GetClient() + if err != nil { + t.Fatalf("Could not get kubernetes client: %s", err) } - - if err := util.Retry(t, checkAddon, 5*time.Second, 60); err != nil { - t.Fatalf("Addon Manager pod is unhealthy: %s", err) + selector := labels.SelectorFromSet(labels.Set(map[string]string{"component": "kube-addon-manager"})) + if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil { + t.Errorf("Error waiting for addon manager to be up") } } func testDashboard(t *testing.T) { t.Parallel() - kubectlRunner := util.NewKubectlRunner(t) minikubeRunner := util.MinikubeRunner{ BinaryPath: *binaryPath, Args: *args, - T: t} - - checkDashboard := func() error { - rc := api.ReplicationController{} - svc := api.Service{} - if err := kubectlRunner.RunCommandParseOutput(dashboardRcCmd, &rc); err != nil { - return err - } - - if err := kubectlRunner.RunCommandParseOutput(dashboardSvcCmd, &svc); err != nil { - return err - } - - if rc.Status.Replicas != rc.Status.FullyLabeledReplicas { - return fmt.Errorf("Not enough pods running. Expected %d, got %d.", rc.Status.Replicas, rc.Status.FullyLabeledReplicas) - } - - if svc.Spec.Ports[0].NodePort != 30000 { - return fmt.Errorf("Dashboard is exposed on wrong port, expected 30000, actual %d", svc.Spec.Ports[0].NodePort) - } - - return nil + T: t, } - if err := util.Retry(t, checkDashboard, 5*time.Second, 60); err != nil { - t.Fatalf("Dashboard is unhealthy: %s", err) + if err := util.WaitForDashboardRunning(t); err != nil { + t.Fatalf("waiting for dashboard to be up: %s", err) } dashboardURL := minikubeRunner.RunCommand("dashboard --url", true) diff --git a/test/integration/cluster_dns_test.go b/test/integration/cluster_dns_test.go index 7f3aa6dc5e..87f9cc73cd 100644 --- a/test/integration/cluster_dns_test.go +++ b/test/integration/cluster_dns_test.go @@ -19,58 +19,39 @@ limitations under the License. package integration import ( - "fmt" "path/filepath" "strings" "testing" - "time" - "k8s.io/kubernetes/pkg/api" "k8s.io/minikube/test/integration/util" ) func testClusterDNS(t *testing.T) { t.Parallel() + if err := util.WaitForDNSRunning(t); err != nil { + t.Fatalf("Waiting for DNS to be running: %s", err) + } + kubectlRunner := util.NewKubectlRunner(t) podName := "busybox" podPath, _ := filepath.Abs("testdata/busybox.yaml") defer kubectlRunner.RunCommand([]string{"delete", "-f", podPath}) - setupTest := func() error { - if _, err := kubectlRunner.RunCommand([]string{"create", "-f", podPath}); err != nil { - return err - } - return nil + if _, err := kubectlRunner.RunCommand([]string{"create", "-f", podPath}); err != nil { + t.Fatalf("creating busybox pod: %s", err) } - if err := util.Retry(t, setupTest, 2*time.Second, 20); err != nil { - t.Fatal("Error setting up DNS test.") + if err := util.WaitForBusyboxRunning(t, "default"); err != nil { + t.Fatalf("Waiting for busybox pod to be up: %s", err) } - dnsTest := func() error { - p := &api.Pod{} - for p.Status.Phase != "Running" { - var err error - p, err = kubectlRunner.GetPod(podName, "default") - if err != nil { - return err - } - } - - dnsByteArr, err := kubectlRunner.RunCommand([]string{"exec", podName, - "nslookup", "kubernetes"}) - dnsOutput := string(dnsByteArr) - if err != nil { - return err - } - - if !strings.Contains(dnsOutput, "10.0.0.1") || !strings.Contains(dnsOutput, "10.0.0.10") { - return fmt.Errorf("DNS lookup failed, could not find both 10.0.0.1 and 10.0.0.10. Output: %s", dnsOutput) - } - return nil + dnsByteArr, err := kubectlRunner.RunCommand([]string{"exec", podName, + "nslookup", "kubernetes"}) + if err != nil { + t.Fatalf("running nslookup in pod:%s", err) } - - if err := util.Retry(t, dnsTest, 5*time.Second, 20); err != nil { - t.Fatal("DNS lookup failed with error:", err) + dnsOutput := string(dnsByteArr) + if !strings.Contains(dnsOutput, "10.0.0.1") || !strings.Contains(dnsOutput, "10.0.0.10") { + t.Errorf("DNS lookup failed, could not find both 10.0.0.1 and 10.0.0.10. Output: %s", dnsOutput) } } diff --git a/test/integration/mount_test.go b/test/integration/mount_test.go index e28e22bc80..8394e593f2 100644 --- a/test/integration/mount_test.go +++ b/test/integration/mount_test.go @@ -27,7 +27,7 @@ import ( "testing" "time" - "k8s.io/kubernetes/pkg/api" + "k8s.io/apimachinery/pkg/labels" "k8s.io/minikube/test/integration/util" ) @@ -79,17 +79,16 @@ func testMounting(t *testing.T) { t.Fatal("mountTest failed with error:", err) } + client, err := util.GetClient() + if err != nil { + t.Fatalf("getting kubernetes client: %s", err) + } + selector := labels.SelectorFromSet(labels.Set(map[string]string{"integration-test": "busybox-mount"})) + if err := util.WaitForPodsWithLabelRunning(client, "default", selector); err != nil { + t.Fatalf("Error waiting for busybox mount pod to be up: %s", err) + } + mountTest := func() error { - - // Wait for the pod to actually startup. - p := &api.Pod{} - for p.Status.Phase != "Running" { - p, err = kubectlRunner.GetPod(podName, "default") - if err != nil { - return err - } - } - path := filepath.Join(tempDir, "frompod") out, err := ioutil.ReadFile(path) if err != nil { diff --git a/test/integration/persistence_test.go b/test/integration/persistence_test.go index 2206716031..8d9246f820 100644 --- a/test/integration/persistence_test.go +++ b/test/integration/persistence_test.go @@ -19,13 +19,11 @@ limitations under the License. package integration import ( - "fmt" "path/filepath" "testing" "time" "github.com/docker/machine/libmachine/state" - "k8s.io/kubernetes/pkg/api" "k8s.io/minikube/test/integration/util" ) @@ -34,7 +32,6 @@ func TestPersistence(t *testing.T) { minikubeRunner.EnsureRunning() kubectlRunner := util.NewKubectlRunner(t) - podName := "busybox" podPath, _ := filepath.Abs("testdata/busybox.yaml") podNamespace := kubectlRunner.CreateRandomNamespace() @@ -45,42 +42,19 @@ func TestPersistence(t *testing.T) { t.Fatalf("Error creating test pod: %s", err) } - checkPod := func() error { - p, err := kubectlRunner.GetPod(podName, podNamespace) - if err != nil { - return err + verify := func(t *testing.T) { + if err := util.WaitForDashboardRunning(t); err != nil { + t.Fatalf("waiting for dashboard to be up: %s", err) } - if kubectlRunner.IsPodReady(p) { - return nil + + if err := util.WaitForBusyboxRunning(t, podNamespace); err != nil { + t.Fatalf("waiting for busybox to be up: %s", err) } - return fmt.Errorf("Pod %s is not ready yet.", podName) + } - if err := util.Retry(t, checkPod, 6*time.Second, 20); err != nil { - t.Fatalf("Error checking the status of pod %s. Err: %s", podName, err) - } - - checkDashboard := func() error { - pods := api.PodList{} - cmd := []string{"get", "pods", "--namespace=kube-system", "--selector=app=kubernetes-dashboard"} - if err := kubectlRunner.RunCommandParseOutput(cmd, &pods); err != nil { - return err - } - if len(pods.Items) < 1 { - return fmt.Errorf("No pods found matching query: %v", cmd) - } - db := pods.Items[0] - if kubectlRunner.IsPodReady(&db) { - return nil - } - return fmt.Errorf("Dashboard pod is not ready yet.") - } - - // Make sure the dashboard is running before we stop the VM. - // On slow networks it can take several minutes to pull the addon-manager then the dashboard image. - if err := util.Retry(t, checkDashboard, 20*time.Second, 25); err != nil { - t.Fatalf("Dashboard pod is not healthy: %s", err) - } + // Make sure everything is up before we stop. + verify(t) // Now restart minikube and make sure the pod is still there. // minikubeRunner.RunCommand("stop", true) @@ -97,12 +71,6 @@ func TestPersistence(t *testing.T) { minikubeRunner.Start() minikubeRunner.CheckStatus(state.Running.String()) - if err := util.Retry(t, checkPod, 3*time.Second, 5); err != nil { - t.Fatalf("Error checking the status of pod %s. Err: %s", podName, err) - } - - // Now make sure it's still running after. - if err := util.Retry(t, checkDashboard, 3*time.Second, 5); err != nil { - t.Fatalf("Dashboard pod is not healthy: %s", err) - } + // Make sure the same things come up after we've restarted. + verify(t) } diff --git a/test/integration/testdata/busybox-mount-test.yaml b/test/integration/testdata/busybox-mount-test.yaml index afee16cd81..06ec5b5325 100644 --- a/test/integration/testdata/busybox-mount-test.yaml +++ b/test/integration/testdata/busybox-mount-test.yaml @@ -2,6 +2,8 @@ apiVersion: v1 kind: Pod metadata: name: busybox-mount + labels: + integration-test: busybox-mount spec: containers: - image: busybox:glibc diff --git a/test/integration/testdata/busybox.yaml b/test/integration/testdata/busybox.yaml index b4e0d6fadd..a200d158dc 100644 --- a/test/integration/testdata/busybox.yaml +++ b/test/integration/testdata/busybox.yaml @@ -2,6 +2,8 @@ apiVersion: v1 kind: Pod metadata: name: busybox + labels: + integration-test: busybox spec: containers: - image: busybox:glibc diff --git a/test/integration/util/kubernetes.go b/test/integration/util/kubernetes.go new file mode 100644 index 0000000000..ee6ab583a4 --- /dev/null +++ b/test/integration/util/kubernetes.go @@ -0,0 +1,219 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "testing" + "time" + + "github.com/pkg/errors" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api/v1" + + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" +) + +type PodStore struct { + cache.Store + stopCh chan struct{} + Reflector *cache.Reflector +} + +func (s *PodStore) List() []*v1.Pod { + objects := s.Store.List() + pods := make([]*v1.Pod, 0) + for _, o := range objects { + pods = append(pods, o.(*v1.Pod)) + } + return pods +} + +func (s *PodStore) Stop() { + close(s.stopCh) +} + +func GetClient() (kubernetes.Interface, error) { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + configOverrides := &clientcmd.ConfigOverrides{} + kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) + config, err := kubeConfig.ClientConfig() + if err != nil { + return nil, fmt.Errorf("Error creating kubeConfig: %s", err) + } + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, errors.Wrap(err, "Error creating new client from kubeConfig.ClientConfig()") + } + return client, nil +} + +func NewPodStore(c kubernetes.Interface, namespace string, label labels.Selector, field fields.Selector) *PodStore { + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = label.String() + options.FieldSelector = field.String() + obj, err := c.Core().Pods(namespace).List(options) + return runtime.Object(obj), err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = label.String() + options.FieldSelector = field.String() + return c.Core().Pods(namespace).Watch(options) + }, + } + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + stopCh := make(chan struct{}) + reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0) + reflector.RunUntil(stopCh) + return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector} +} + +func StartPods(c kubernetes.Interface, namespace string, pod v1.Pod, waitForRunning bool) error { + pod.ObjectMeta.Labels["name"] = pod.Name + if waitForRunning { + label := labels.SelectorFromSet(labels.Set(map[string]string{"name": pod.Name})) + err := WaitForPodsWithLabelRunning(c, namespace, label) + if err != nil { + return fmt.Errorf("Error waiting for pod %s to be running: %v", pod.Name, err) + } + } + return nil +} + +// Wait up to 10 minutes for all matching pods to become Running and at least one +// matching pod exists. +func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels.Selector) error { + running := false + PodStore := NewPodStore(c, ns, label, fields.Everything()) + defer PodStore.Stop() +waitLoop: + for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(250 * time.Millisecond) { + pods := PodStore.List() + if len(pods) == 0 { + continue waitLoop + } + for _, p := range pods { + if p.Status.Phase != v1.PodRunning { + continue waitLoop + } + } + running = true + break + } + if !running { + return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String()) + } + return nil +} + +// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status. +func WaitForRCToStabilize(t *testing.T, c kubernetes.Interface, ns, name string, timeout time.Duration) error { + options := metav1.ListOptions{FieldSelector: fields.Set{ + "metadata.name": name, + "metadata.namespace": ns, + }.AsSelector().String()} + w, err := c.Core().ReplicationControllers(ns).Watch(options) + if err != nil { + return err + } + _, err = watch.Until(timeout, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "") + } + switch rc := event.Object.(type) { + case *v1.ReplicationController: + if rc.Name == name && rc.Namespace == ns && + rc.Generation <= rc.Status.ObservedGeneration && + *(rc.Spec.Replicas) == rc.Status.Replicas { + return true, nil + } + t.Logf("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d", + name, rc.Generation, rc.Status.ObservedGeneration, *(rc.Spec.Replicas), rc.Status.Replicas) + } + return false, nil + }) + return err +} + +// WaitForService waits until the service appears (exist == true), or disappears (exist == false) +func WaitForService(t *testing.T, c kubernetes.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error { + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + _, err := c.Core().Services(namespace).Get(name, metav1.GetOptions{}) + switch { + case err == nil: + t.Logf("Service %s in namespace %s found.", name, namespace) + return exist, nil + case apierrs.IsNotFound(err): + t.Logf("Service %s in namespace %s disappeared.", name, namespace) + return !exist, nil + case !IsRetryableAPIError(err): + t.Logf("Non-retryable failure while getting service.") + return false, err + default: + t.Logf("Get service %s in namespace %s failed: %v", name, namespace, err) + return false, nil + } + }) + if err != nil { + stateMsg := map[bool]string{true: "to appear", false: "to disappear"} + return fmt.Errorf("error waiting for service %s/%s %s: %v", namespace, name, stateMsg[exist], err) + } + return nil +} + +//WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum. +func WaitForServiceEndpointsNum(t *testing.T, c kubernetes.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error { + return wait.Poll(interval, timeout, func() (bool, error) { + t.Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum) + list, err := c.Core().Endpoints(namespace).List(metav1.ListOptions{}) + if err != nil { + return false, err + } + + for _, e := range list.Items { + if e.Name == serviceName && countEndpointsNum(&e) == expectNum { + return true, nil + } + } + return false, nil + }) +} + +func countEndpointsNum(e *v1.Endpoints) int { + num := 0 + for _, sub := range e.Subsets { + num += len(sub.Addresses) + } + return num +} + +func IsRetryableAPIError(err error) bool { + return apierrs.IsTimeout(err) || apierrs.IsServerTimeout(err) || apierrs.IsTooManyRequests(err) || apierrs.IsInternalError(err) +} diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 4da4dac6cd..c49ce5d4fe 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -29,7 +29,9 @@ import ( "testing" "time" - "k8s.io/kubernetes/pkg/api" + "github.com/pkg/errors" + + "k8s.io/apimachinery/pkg/labels" commonutil "k8s.io/minikube/pkg/util" ) @@ -41,20 +43,6 @@ type MinikubeRunner struct { Args string } -func (k *KubectlRunner) IsPodReady(p *api.Pod) bool { - for _, cond := range p.Status.Conditions { - if cond.Type == "Ready" { - if cond.Status == "True" { - return true - } - k.T.Logf("Pod %s not ready. Ready: %s. Reason: %s", p.Name, cond.Status, cond.Reason) - return false /**/ - } - } - k.T.Logf("Unable to find ready pod condition: %v", p.Status.Conditions) - return false -} - func (m *MinikubeRunner) RunCommand(command string, checkError bool) string { commandArr := strings.Split(command, " ") path, _ := filepath.Abs(m.BinaryPath) @@ -202,10 +190,51 @@ func (k *KubectlRunner) DeleteNamespace(namespace string) error { return err } -func (k *KubectlRunner) GetPod(name, namespace string) (*api.Pod, error) { - p := &api.Pod{} - err := k.RunCommandParseOutput([]string{"get", "pod", name, "--namespace=" + namespace}, p) - return p, err +func WaitForBusyboxRunning(t *testing.T, namespace string) error { + client, err := GetClient() + if err != nil { + return errors.Wrap(err, "getting kubernetes client") + } + selector := labels.SelectorFromSet(labels.Set(map[string]string{"integration-test": "busybox"})) + return WaitForPodsWithLabelRunning(client, namespace, selector) +} + +func WaitForDNSRunning(t *testing.T) error { + client, err := GetClient() + if err != nil { + return errors.Wrap(err, "getting kubernetes client") + } + + selector := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "kube-dns"})) + if err := WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil { + return errors.Wrap(err, "waiting for kube-dns pods") + } + + if err := WaitForService(t, client, "kube-system", "kube-dns", true, time.Millisecond*500, time.Minute*10); err != nil { + t.Errorf("Error waiting for kube-dns service to be up") + } + + return nil +} + +func WaitForDashboardRunning(t *testing.T) error { + client, err := GetClient() + if err != nil { + return errors.Wrap(err, "getting kubernetes client") + } + if err := WaitForRCToStabilize(t, client, "kube-system", "kubernetes-dashboard", time.Minute*10); err != nil { + return errors.Wrap(err, "waiting for dashboard RC to stabilize") + } + + if err := WaitForService(t, client, "kube-system", "kubernetes-dashboard", true, time.Millisecond*500, time.Minute*10); err != nil { + return errors.Wrap(err, "waiting for dashboard service to be up") + } + + if err := WaitForServiceEndpointsNum(t, client, "kube-system", "kubernetes-dashboard", 1, time.Second*3, time.Minute*10); err != nil { + return errors.Wrap(err, "waiting for one dashboard endpoint to be up") + } + + return nil } func Retry(t *testing.T, callback func() error, d time.Duration, attempts int) (err error) {