Use kubernetes client in integration tests

pull/1874/head
Matt Rickard 2017-08-23 11:33:00 -07:00
parent 0b5bd79d50
commit 211ebb8a69
8 changed files with 317 additions and 164 deletions

View File

@ -26,79 +26,32 @@ import (
"testing"
"time"
"github.com/pkg/errors"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/minikube/test/integration/util"
)
var (
addonManagerCmd = []string{"get", "pods", "--namespace=kube-system"}
dashboardRcCmd = []string{"get", "rc", "kubernetes-dashboard", "--namespace=kube-system"}
dashboardSvcCmd = []string{"get", "svc", "kubernetes-dashboard", "--namespace=kube-system"}
)
func testAddons(t *testing.T) {
t.Parallel()
kubectlRunner := util.NewKubectlRunner(t)
checkAddon := func() error {
pods := v1.PodList{}
if err := kubectlRunner.RunCommandParseOutput(addonManagerCmd, &pods); err != nil {
return errors.Wrap(err, "Error parsing kubectl output")
}
for _, p := range pods.Items {
if strings.HasPrefix(p.ObjectMeta.Name, "kube-addon-manager-") {
if p.Status.Phase == "Running" {
return nil
}
return fmt.Errorf("Pod is not Running. Status: %s", p.Status.Phase)
}
}
return fmt.Errorf("Addon manager not found. Found pods: %v", pods)
client, err := util.GetClient()
if err != nil {
t.Fatalf("Could not get kubernetes client: %s", err)
}
if err := util.Retry(t, checkAddon, 5*time.Second, 60); err != nil {
t.Fatalf("Addon Manager pod is unhealthy: %s", err)
selector := labels.SelectorFromSet(labels.Set(map[string]string{"component": "kube-addon-manager"}))
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
t.Errorf("Error waiting for addon manager to be up")
}
}
func testDashboard(t *testing.T) {
t.Parallel()
kubectlRunner := util.NewKubectlRunner(t)
minikubeRunner := util.MinikubeRunner{
BinaryPath: *binaryPath,
Args: *args,
T: t}
checkDashboard := func() error {
rc := api.ReplicationController{}
svc := api.Service{}
if err := kubectlRunner.RunCommandParseOutput(dashboardRcCmd, &rc); err != nil {
return err
}
if err := kubectlRunner.RunCommandParseOutput(dashboardSvcCmd, &svc); err != nil {
return err
}
if rc.Status.Replicas != rc.Status.FullyLabeledReplicas {
return fmt.Errorf("Not enough pods running. Expected %d, got %d.", rc.Status.Replicas, rc.Status.FullyLabeledReplicas)
}
if svc.Spec.Ports[0].NodePort != 30000 {
return fmt.Errorf("Dashboard is exposed on wrong port, expected 30000, actual %d", svc.Spec.Ports[0].NodePort)
}
return nil
T: t,
}
if err := util.Retry(t, checkDashboard, 5*time.Second, 60); err != nil {
t.Fatalf("Dashboard is unhealthy: %s", err)
if err := util.WaitForDashboardRunning(t); err != nil {
t.Fatalf("waiting for dashboard to be up: %s", err)
}
dashboardURL := minikubeRunner.RunCommand("dashboard --url", true)

View File

@ -19,58 +19,39 @@ limitations under the License.
package integration
import (
"fmt"
"path/filepath"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/minikube/test/integration/util"
)
func testClusterDNS(t *testing.T) {
t.Parallel()
if err := util.WaitForDNSRunning(t); err != nil {
t.Fatalf("Waiting for DNS to be running: %s", err)
}
kubectlRunner := util.NewKubectlRunner(t)
podName := "busybox"
podPath, _ := filepath.Abs("testdata/busybox.yaml")
defer kubectlRunner.RunCommand([]string{"delete", "-f", podPath})
setupTest := func() error {
if _, err := kubectlRunner.RunCommand([]string{"create", "-f", podPath}); err != nil {
return err
}
return nil
if _, err := kubectlRunner.RunCommand([]string{"create", "-f", podPath}); err != nil {
t.Fatalf("creating busybox pod: %s", err)
}
if err := util.Retry(t, setupTest, 2*time.Second, 20); err != nil {
t.Fatal("Error setting up DNS test.")
if err := util.WaitForBusyboxRunning(t, "default"); err != nil {
t.Fatalf("Waiting for busybox pod to be up: %s", err)
}
dnsTest := func() error {
p := &api.Pod{}
for p.Status.Phase != "Running" {
var err error
p, err = kubectlRunner.GetPod(podName, "default")
if err != nil {
return err
}
}
dnsByteArr, err := kubectlRunner.RunCommand([]string{"exec", podName,
"nslookup", "kubernetes"})
dnsOutput := string(dnsByteArr)
if err != nil {
return err
}
if !strings.Contains(dnsOutput, "10.0.0.1") || !strings.Contains(dnsOutput, "10.0.0.10") {
return fmt.Errorf("DNS lookup failed, could not find both 10.0.0.1 and 10.0.0.10. Output: %s", dnsOutput)
}
return nil
dnsByteArr, err := kubectlRunner.RunCommand([]string{"exec", podName,
"nslookup", "kubernetes"})
if err != nil {
t.Fatalf("running nslookup in pod:%s", err)
}
if err := util.Retry(t, dnsTest, 5*time.Second, 20); err != nil {
t.Fatal("DNS lookup failed with error:", err)
dnsOutput := string(dnsByteArr)
if !strings.Contains(dnsOutput, "10.0.0.1") || !strings.Contains(dnsOutput, "10.0.0.10") {
t.Errorf("DNS lookup failed, could not find both 10.0.0.1 and 10.0.0.10. Output: %s", dnsOutput)
}
}

View File

@ -27,7 +27,7 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/minikube/test/integration/util"
)
@ -79,17 +79,16 @@ func testMounting(t *testing.T) {
t.Fatal("mountTest failed with error:", err)
}
client, err := util.GetClient()
if err != nil {
t.Fatalf("getting kubernetes client: %s", err)
}
selector := labels.SelectorFromSet(labels.Set(map[string]string{"integration-test": "busybox-mount"}))
if err := util.WaitForPodsWithLabelRunning(client, "default", selector); err != nil {
t.Fatalf("Error waiting for busybox mount pod to be up: %s", err)
}
mountTest := func() error {
// Wait for the pod to actually startup.
p := &api.Pod{}
for p.Status.Phase != "Running" {
p, err = kubectlRunner.GetPod(podName, "default")
if err != nil {
return err
}
}
path := filepath.Join(tempDir, "frompod")
out, err := ioutil.ReadFile(path)
if err != nil {

View File

@ -19,13 +19,11 @@ limitations under the License.
package integration
import (
"fmt"
"path/filepath"
"testing"
"time"
"github.com/docker/machine/libmachine/state"
"k8s.io/kubernetes/pkg/api"
"k8s.io/minikube/test/integration/util"
)
@ -34,7 +32,6 @@ func TestPersistence(t *testing.T) {
minikubeRunner.EnsureRunning()
kubectlRunner := util.NewKubectlRunner(t)
podName := "busybox"
podPath, _ := filepath.Abs("testdata/busybox.yaml")
podNamespace := kubectlRunner.CreateRandomNamespace()
@ -45,42 +42,19 @@ func TestPersistence(t *testing.T) {
t.Fatalf("Error creating test pod: %s", err)
}
checkPod := func() error {
p, err := kubectlRunner.GetPod(podName, podNamespace)
if err != nil {
return err
verify := func(t *testing.T) {
if err := util.WaitForDashboardRunning(t); err != nil {
t.Fatalf("waiting for dashboard to be up: %s", err)
}
if kubectlRunner.IsPodReady(p) {
return nil
if err := util.WaitForBusyboxRunning(t, podNamespace); err != nil {
t.Fatalf("waiting for busybox to be up: %s", err)
}
return fmt.Errorf("Pod %s is not ready yet.", podName)
}
if err := util.Retry(t, checkPod, 6*time.Second, 20); err != nil {
t.Fatalf("Error checking the status of pod %s. Err: %s", podName, err)
}
checkDashboard := func() error {
pods := api.PodList{}
cmd := []string{"get", "pods", "--namespace=kube-system", "--selector=app=kubernetes-dashboard"}
if err := kubectlRunner.RunCommandParseOutput(cmd, &pods); err != nil {
return err
}
if len(pods.Items) < 1 {
return fmt.Errorf("No pods found matching query: %v", cmd)
}
db := pods.Items[0]
if kubectlRunner.IsPodReady(&db) {
return nil
}
return fmt.Errorf("Dashboard pod is not ready yet.")
}
// Make sure the dashboard is running before we stop the VM.
// On slow networks it can take several minutes to pull the addon-manager then the dashboard image.
if err := util.Retry(t, checkDashboard, 20*time.Second, 25); err != nil {
t.Fatalf("Dashboard pod is not healthy: %s", err)
}
// Make sure everything is up before we stop.
verify(t)
// Now restart minikube and make sure the pod is still there.
// minikubeRunner.RunCommand("stop", true)
@ -97,12 +71,6 @@ func TestPersistence(t *testing.T) {
minikubeRunner.Start()
minikubeRunner.CheckStatus(state.Running.String())
if err := util.Retry(t, checkPod, 3*time.Second, 5); err != nil {
t.Fatalf("Error checking the status of pod %s. Err: %s", podName, err)
}
// Now make sure it's still running after.
if err := util.Retry(t, checkDashboard, 3*time.Second, 5); err != nil {
t.Fatalf("Dashboard pod is not healthy: %s", err)
}
// Make sure the same things come up after we've restarted.
verify(t)
}

View File

@ -2,6 +2,8 @@ apiVersion: v1
kind: Pod
metadata:
name: busybox-mount
labels:
integration-test: busybox-mount
spec:
containers:
- image: busybox:glibc

View File

@ -2,6 +2,8 @@ apiVersion: v1
kind: Pod
metadata:
name: busybox
labels:
integration-test: busybox
spec:
containers:
- image: busybox:glibc

View File

@ -0,0 +1,219 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"testing"
"time"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
type PodStore struct {
cache.Store
stopCh chan struct{}
Reflector *cache.Reflector
}
func (s *PodStore) List() []*v1.Pod {
objects := s.Store.List()
pods := make([]*v1.Pod, 0)
for _, o := range objects {
pods = append(pods, o.(*v1.Pod))
}
return pods
}
func (s *PodStore) Stop() {
close(s.stopCh)
}
func GetClient() (kubernetes.Interface, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
config, err := kubeConfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("Error creating kubeConfig: %s", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, errors.Wrap(err, "Error creating new client from kubeConfig.ClientConfig()")
}
return client, nil
}
func NewPodStore(c kubernetes.Interface, namespace string, label labels.Selector, field fields.Selector) *PodStore {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = label.String()
options.FieldSelector = field.String()
obj, err := c.Core().Pods(namespace).List(options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = label.String()
options.FieldSelector = field.String()
return c.Core().Pods(namespace).Watch(options)
},
}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0)
reflector.RunUntil(stopCh)
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}
}
func StartPods(c kubernetes.Interface, namespace string, pod v1.Pod, waitForRunning bool) error {
pod.ObjectMeta.Labels["name"] = pod.Name
if waitForRunning {
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": pod.Name}))
err := WaitForPodsWithLabelRunning(c, namespace, label)
if err != nil {
return fmt.Errorf("Error waiting for pod %s to be running: %v", pod.Name, err)
}
}
return nil
}
// Wait up to 10 minutes for all matching pods to become Running and at least one
// matching pod exists.
func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels.Selector) error {
running := false
PodStore := NewPodStore(c, ns, label, fields.Everything())
defer PodStore.Stop()
waitLoop:
for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(250 * time.Millisecond) {
pods := PodStore.List()
if len(pods) == 0 {
continue waitLoop
}
for _, p := range pods {
if p.Status.Phase != v1.PodRunning {
continue waitLoop
}
}
running = true
break
}
if !running {
return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String())
}
return nil
}
// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status.
func WaitForRCToStabilize(t *testing.T, c kubernetes.Interface, ns, name string, timeout time.Duration) error {
options := metav1.ListOptions{FieldSelector: fields.Set{
"metadata.name": name,
"metadata.namespace": ns,
}.AsSelector().String()}
w, err := c.Core().ReplicationControllers(ns).Watch(options)
if err != nil {
return err
}
_, err = watch.Until(timeout, w, func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "")
}
switch rc := event.Object.(type) {
case *v1.ReplicationController:
if rc.Name == name && rc.Namespace == ns &&
rc.Generation <= rc.Status.ObservedGeneration &&
*(rc.Spec.Replicas) == rc.Status.Replicas {
return true, nil
}
t.Logf("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
name, rc.Generation, rc.Status.ObservedGeneration, *(rc.Spec.Replicas), rc.Status.Replicas)
}
return false, nil
})
return err
}
// WaitForService waits until the service appears (exist == true), or disappears (exist == false)
func WaitForService(t *testing.T, c kubernetes.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
_, err := c.Core().Services(namespace).Get(name, metav1.GetOptions{})
switch {
case err == nil:
t.Logf("Service %s in namespace %s found.", name, namespace)
return exist, nil
case apierrs.IsNotFound(err):
t.Logf("Service %s in namespace %s disappeared.", name, namespace)
return !exist, nil
case !IsRetryableAPIError(err):
t.Logf("Non-retryable failure while getting service.")
return false, err
default:
t.Logf("Get service %s in namespace %s failed: %v", name, namespace, err)
return false, nil
}
})
if err != nil {
stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
return fmt.Errorf("error waiting for service %s/%s %s: %v", namespace, name, stateMsg[exist], err)
}
return nil
}
//WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
func WaitForServiceEndpointsNum(t *testing.T, c kubernetes.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.Poll(interval, timeout, func() (bool, error) {
t.Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
list, err := c.Core().Endpoints(namespace).List(metav1.ListOptions{})
if err != nil {
return false, err
}
for _, e := range list.Items {
if e.Name == serviceName && countEndpointsNum(&e) == expectNum {
return true, nil
}
}
return false, nil
})
}
func countEndpointsNum(e *v1.Endpoints) int {
num := 0
for _, sub := range e.Subsets {
num += len(sub.Addresses)
}
return num
}
func IsRetryableAPIError(err error) bool {
return apierrs.IsTimeout(err) || apierrs.IsServerTimeout(err) || apierrs.IsTooManyRequests(err) || apierrs.IsInternalError(err)
}

View File

@ -29,7 +29,9 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/labels"
commonutil "k8s.io/minikube/pkg/util"
)
@ -41,20 +43,6 @@ type MinikubeRunner struct {
Args string
}
func (k *KubectlRunner) IsPodReady(p *api.Pod) bool {
for _, cond := range p.Status.Conditions {
if cond.Type == "Ready" {
if cond.Status == "True" {
return true
}
k.T.Logf("Pod %s not ready. Ready: %s. Reason: %s", p.Name, cond.Status, cond.Reason)
return false /**/
}
}
k.T.Logf("Unable to find ready pod condition: %v", p.Status.Conditions)
return false
}
func (m *MinikubeRunner) RunCommand(command string, checkError bool) string {
commandArr := strings.Split(command, " ")
path, _ := filepath.Abs(m.BinaryPath)
@ -202,10 +190,51 @@ func (k *KubectlRunner) DeleteNamespace(namespace string) error {
return err
}
func (k *KubectlRunner) GetPod(name, namespace string) (*api.Pod, error) {
p := &api.Pod{}
err := k.RunCommandParseOutput([]string{"get", "pod", name, "--namespace=" + namespace}, p)
return p, err
func WaitForBusyboxRunning(t *testing.T, namespace string) error {
client, err := GetClient()
if err != nil {
return errors.Wrap(err, "getting kubernetes client")
}
selector := labels.SelectorFromSet(labels.Set(map[string]string{"integration-test": "busybox"}))
return WaitForPodsWithLabelRunning(client, namespace, selector)
}
func WaitForDNSRunning(t *testing.T) error {
client, err := GetClient()
if err != nil {
return errors.Wrap(err, "getting kubernetes client")
}
selector := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "kube-dns"}))
if err := WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, "waiting for kube-dns pods")
}
if err := WaitForService(t, client, "kube-system", "kube-dns", true, time.Millisecond*500, time.Minute*10); err != nil {
t.Errorf("Error waiting for kube-dns service to be up")
}
return nil
}
func WaitForDashboardRunning(t *testing.T) error {
client, err := GetClient()
if err != nil {
return errors.Wrap(err, "getting kubernetes client")
}
if err := WaitForRCToStabilize(t, client, "kube-system", "kubernetes-dashboard", time.Minute*10); err != nil {
return errors.Wrap(err, "waiting for dashboard RC to stabilize")
}
if err := WaitForService(t, client, "kube-system", "kubernetes-dashboard", true, time.Millisecond*500, time.Minute*10); err != nil {
return errors.Wrap(err, "waiting for dashboard service to be up")
}
if err := WaitForServiceEndpointsNum(t, client, "kube-system", "kubernetes-dashboard", 1, time.Second*3, time.Minute*10); err != nil {
return errors.Wrap(err, "waiting for one dashboard endpoint to be up")
}
return nil
}
func Retry(t *testing.T, callback func() error, d time.Duration, attempts int) (err error) {