314 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			314 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors All rights reserved.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package util
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	apps "k8s.io/api/apps/v1"
 | 
						|
	core "k8s.io/api/core/v1"
 | 
						|
	apierr "k8s.io/apimachinery/pkg/api/errors"
 | 
						|
	meta "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/fields"
 | 
						|
	"k8s.io/apimachinery/pkg/labels"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/apimachinery/pkg/watch"
 | 
						|
	"k8s.io/client-go/kubernetes"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
	"k8s.io/client-go/tools/clientcmd"
 | 
						|
	watchtools "k8s.io/client-go/tools/watch"
 | 
						|
	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
 | 
						|
	"k8s.io/minikube/pkg/minikube/proxy"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	// ReasonableMutateTime is how long to wait for basic object mutations, such as deletions, to show up
 | 
						|
	ReasonableMutateTime = time.Minute * 2
 | 
						|
	// ReasonableStartTime is how long to wait for pods to start
 | 
						|
	ReasonableStartTime = time.Minute * 5
 | 
						|
)
 | 
						|
 | 
						|
// PodStore stores pods
 | 
						|
type PodStore struct {
 | 
						|
	cache.Store
 | 
						|
	stopCh    chan struct{}
 | 
						|
	Reflector *cache.Reflector
 | 
						|
}
 | 
						|
 | 
						|
// List lists the pods
 | 
						|
func (s *PodStore) List() []*core.Pod {
 | 
						|
	objects := s.Store.List()
 | 
						|
	pods := make([]*core.Pod, 0)
 | 
						|
	for _, o := range objects {
 | 
						|
		pods = append(pods, o.(*core.Pod))
 | 
						|
	}
 | 
						|
	return pods
 | 
						|
}
 | 
						|
 | 
						|
// Stop stops the pods
 | 
						|
func (s *PodStore) Stop() {
 | 
						|
	close(s.stopCh)
 | 
						|
}
 | 
						|
 | 
						|
// GetClient gets the client from config
 | 
						|
func GetClient(kubectlContext ...string) (kubernetes.Interface, error) {
 | 
						|
	loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
 | 
						|
	configOverrides := &clientcmd.ConfigOverrides{}
 | 
						|
	if kubectlContext != nil {
 | 
						|
		configOverrides = &clientcmd.ConfigOverrides{
 | 
						|
			CurrentContext: kubectlContext[0],
 | 
						|
		}
 | 
						|
	}
 | 
						|
	kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
 | 
						|
	config, err := kubeConfig.ClientConfig()
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("Error creating kubeConfig: %v", err)
 | 
						|
	}
 | 
						|
	config = proxy.UpdateTransport(config)
 | 
						|
	client, err := kubernetes.NewForConfig(config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Wrap(err, "Error creating new client from kubeConfig.ClientConfig()")
 | 
						|
	}
 | 
						|
	return client, nil
 | 
						|
}
 | 
						|
 | 
						|
// NewPodStore creates a new PodStore
 | 
						|
func NewPodStore(c kubernetes.Interface, namespace string, label fmt.Stringer, field fmt.Stringer) *PodStore {
 | 
						|
	lw := &cache.ListWatch{
 | 
						|
		ListFunc: func(options meta.ListOptions) (runtime.Object, error) {
 | 
						|
			options.LabelSelector = label.String()
 | 
						|
			options.FieldSelector = field.String()
 | 
						|
			obj, err := c.CoreV1().Pods(namespace).List(options)
 | 
						|
			return runtime.Object(obj), err
 | 
						|
		},
 | 
						|
		WatchFunc: func(options meta.ListOptions) (watch.Interface, error) {
 | 
						|
			options.LabelSelector = label.String()
 | 
						|
			options.FieldSelector = field.String()
 | 
						|
			return c.CoreV1().Pods(namespace).Watch(options)
 | 
						|
		},
 | 
						|
	}
 | 
						|
	store := cache.NewStore(cache.MetaNamespaceKeyFunc)
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	reflector := cache.NewReflector(lw, &core.Pod{}, store, 0)
 | 
						|
	go reflector.Run(stopCh)
 | 
						|
	return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}
 | 
						|
}
 | 
						|
 | 
						|
// StartPods starts all pods
 | 
						|
func StartPods(c kubernetes.Interface, namespace string, pod core.Pod, waitForRunning bool) error {
 | 
						|
	pod.ObjectMeta.Labels["name"] = pod.Name
 | 
						|
	if waitForRunning {
 | 
						|
		label := labels.SelectorFromSet(labels.Set(map[string]string{"name": pod.Name}))
 | 
						|
		err := WaitForPodsWithLabelRunning(c, namespace, label)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("Error waiting for pod %s to be running: %v", pod.Name, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// WaitForPodsWithLabelRunning waits for all matching pods to become Running and at least one matching pod exists.
 | 
						|
func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels.Selector) error {
 | 
						|
	glog.Infof("Waiting for pod with label %q in ns %q ...", ns, label)
 | 
						|
	lastKnownPodNumber := -1
 | 
						|
	return wait.PollImmediate(constants.APICallRetryInterval, ReasonableStartTime, func() (bool, error) {
 | 
						|
		listOpts := meta.ListOptions{LabelSelector: label.String()}
 | 
						|
		pods, err := c.CoreV1().Pods(ns).List(listOpts)
 | 
						|
		if err != nil {
 | 
						|
			glog.Infof("error getting Pods with label selector %q [%v]\n", label.String(), err)
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
 | 
						|
		if lastKnownPodNumber != len(pods.Items) {
 | 
						|
			glog.Infof("Found %d Pods for label selector %s\n", len(pods.Items), label.String())
 | 
						|
			lastKnownPodNumber = len(pods.Items)
 | 
						|
		}
 | 
						|
 | 
						|
		if len(pods.Items) == 0 {
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
 | 
						|
		for _, pod := range pods.Items {
 | 
						|
			if pod.Status.Phase != core.PodRunning {
 | 
						|
				return false, nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		return true, nil
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// WaitForPodDelete waits for a pod to be deleted
 | 
						|
func WaitForPodDelete(c kubernetes.Interface, ns string, label fmt.Stringer) error {
 | 
						|
	return wait.PollImmediate(constants.APICallRetryInterval, ReasonableMutateTime, func() (bool, error) {
 | 
						|
		listOpts := meta.ListOptions{LabelSelector: label.String()}
 | 
						|
		pods, err := c.CoreV1().Pods(ns).List(listOpts)
 | 
						|
		if err != nil {
 | 
						|
			glog.Infof("error getting Pods with label selector %q [%v]\n", label.String(), err)
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
		return len(pods.Items) == 0, nil
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// WaitForEvent waits for the given event to appear
 | 
						|
func WaitForEvent(c kubernetes.Interface, ns string, reason string) error {
 | 
						|
	return wait.PollImmediate(constants.APICallRetryInterval, ReasonableMutateTime, func() (bool, error) {
 | 
						|
		events, err := c.EventsV1beta1().Events("default").List(meta.ListOptions{})
 | 
						|
		if err != nil {
 | 
						|
			glog.Infof("error getting events: %v", err)
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
		for _, e := range events.Items {
 | 
						|
			if e.Reason == reason {
 | 
						|
				return true, nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return false, nil
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status.
 | 
						|
func WaitForRCToStabilize(c kubernetes.Interface, ns, name string, timeout time.Duration) error {
 | 
						|
	options := meta.ListOptions{FieldSelector: fields.Set{
 | 
						|
		"metadata.name":      name,
 | 
						|
		"metadata.namespace": ns,
 | 
						|
	}.AsSelector().String()}
 | 
						|
	w, err := c.CoreV1().ReplicationControllers(ns).Watch(options)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
 | 
						|
		if event.Type == watch.Deleted {
 | 
						|
			return false, apierr.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "")
 | 
						|
		}
 | 
						|
 | 
						|
		rc, ok := event.Object.(*core.ReplicationController)
 | 
						|
		if ok {
 | 
						|
			if rc.Name == name && rc.Namespace == ns &&
 | 
						|
				rc.Generation <= rc.Status.ObservedGeneration &&
 | 
						|
				*(rc.Spec.Replicas) == rc.Status.Replicas {
 | 
						|
				return true, nil
 | 
						|
			}
 | 
						|
			glog.Infof("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
 | 
						|
				name, rc.Generation, rc.Status.ObservedGeneration, *(rc.Spec.Replicas), rc.Status.Replicas)
 | 
						|
		}
 | 
						|
		return false, nil
 | 
						|
	})
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// WaitForDeploymentToStabilize waits till the Deployment has a matching generation/replica count between spec and status.
 | 
						|
func WaitForDeploymentToStabilize(c kubernetes.Interface, ns, name string, timeout time.Duration) error {
 | 
						|
	options := meta.ListOptions{FieldSelector: fields.Set{
 | 
						|
		"metadata.name":      name,
 | 
						|
		"metadata.namespace": ns,
 | 
						|
	}.AsSelector().String()}
 | 
						|
	w, err := c.AppsV1().Deployments(ns).Watch(options)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
 | 
						|
		if event.Type == watch.Deleted {
 | 
						|
			return false, apierr.NewNotFound(schema.GroupResource{Resource: "deployments"}, "")
 | 
						|
		}
 | 
						|
		dp, ok := event.Object.(*apps.Deployment)
 | 
						|
		if ok {
 | 
						|
			if dp.Name == name && dp.Namespace == ns &&
 | 
						|
				dp.Generation <= dp.Status.ObservedGeneration &&
 | 
						|
				*(dp.Spec.Replicas) == dp.Status.Replicas {
 | 
						|
				return true, nil
 | 
						|
			}
 | 
						|
			glog.Infof("Waiting for deployment %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
 | 
						|
				name, dp.Generation, dp.Status.ObservedGeneration, *(dp.Spec.Replicas), dp.Status.Replicas)
 | 
						|
		}
 | 
						|
		return false, nil
 | 
						|
	})
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// WaitForService waits until the service appears (exist == true), or disappears (exist == false)
 | 
						|
func WaitForService(c kubernetes.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
 | 
						|
	err := wait.PollImmediate(interval, timeout, func() (bool, error) {
 | 
						|
		_, err := c.CoreV1().Services(namespace).Get(name, meta.GetOptions{})
 | 
						|
		switch {
 | 
						|
		case err == nil:
 | 
						|
			glog.Infof("Service %s in namespace %s found.", name, namespace)
 | 
						|
			return exist, nil
 | 
						|
		case apierr.IsNotFound(err):
 | 
						|
			glog.Infof("Service %s in namespace %s disappeared.", name, namespace)
 | 
						|
			return !exist, nil
 | 
						|
		case !IsRetryableAPIError(err):
 | 
						|
			glog.Info("Non-retryable failure while getting service.")
 | 
						|
			return false, err
 | 
						|
		default:
 | 
						|
			glog.Infof("Get service %s in namespace %s failed: %v", name, namespace, err)
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
 | 
						|
		return fmt.Errorf("error waiting for service %s/%s %s: %v", namespace, name, stateMsg[exist], err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
 | 
						|
func WaitForServiceEndpointsNum(c kubernetes.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
 | 
						|
	return wait.Poll(interval, timeout, func() (bool, error) {
 | 
						|
		glog.Infof("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
 | 
						|
		list, err := c.CoreV1().Endpoints(namespace).List(meta.ListOptions{})
 | 
						|
		if err != nil {
 | 
						|
			return false, err
 | 
						|
		}
 | 
						|
 | 
						|
		for _, e := range list.Items {
 | 
						|
			if e.Name == serviceName && countEndpointsNum(&e) == expectNum {
 | 
						|
				return true, nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return false, nil
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func countEndpointsNum(e *core.Endpoints) int {
 | 
						|
	num := 0
 | 
						|
	for _, sub := range e.Subsets {
 | 
						|
		num += len(sub.Addresses)
 | 
						|
	}
 | 
						|
	return num
 | 
						|
}
 | 
						|
 | 
						|
// IsRetryableAPIError returns if this error is retryable or not
 | 
						|
func IsRetryableAPIError(err error) bool {
 | 
						|
	return apierr.IsTimeout(err) || apierr.IsServerTimeout(err) || apierr.IsTooManyRequests(err) || apierr.IsInternalError(err)
 | 
						|
}
 |