Merge pull request #5075 from medyagh/file_lock_write

Improve parallel run reliability by putting lock on files before writing
pull/5089/head
Medya Ghazizadeh 2019-08-14 12:37:47 -07:00 committed by GitHub
commit 14db2d7f1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 276 additions and 258 deletions

View File

@ -40,7 +40,7 @@ import (
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/proxy"
"k8s.io/minikube/pkg/minikube/service"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
var (
@ -110,7 +110,8 @@ var dashboardCmd = &cobra.Command{
ns := "kube-system"
svc := "kubernetes-dashboard"
out.ErrT(out.Verifying, "Verifying dashboard health ...")
if err = util.RetryAfter(180, func() error { return service.CheckService(ns, svc) }, 1*time.Second); err != nil {
checkSVC := func() error { return service.CheckService(ns, svc) }
if err = retry.Expo(checkSVC, 1*time.Second, time.Minute*3); err != nil {
exit.WithCodeT(exit.Unavailable, "dashboard service is not running: {{.error}}", out.V{"error": err})
}
@ -122,7 +123,8 @@ var dashboardCmd = &cobra.Command{
url := dashboardURL(hostPort, ns, svc)
out.ErrT(out.Verifying, "Verifying proxy health ...")
if err = util.RetryAfter(60, func() error { return checkURL(url) }, 1*time.Second); err != nil {
chkURL := func() error { return checkURL(url) }
if err = retry.Expo(chkURL, 1*time.Second, 3*time.Minute); err != nil {
exit.WithCodeT(exit.Unavailable, "{{.url}} is not accessible: {{.error}}", out.V{"url": url, "error": err})
}
@ -219,7 +221,7 @@ func checkURL(url string) error {
return errors.Wrap(err, "checkURL")
}
if resp.StatusCode != http.StatusOK {
return &util.RetriableError{
return &retry.RetriableError{
Err: fmt.Errorf("unexpected response code: %d", resp.StatusCode),
}
}

View File

@ -79,7 +79,7 @@ func init() {
serviceCmd.Flags().BoolVar(&serviceURLMode, "url", false, "Display the kubernetes service URL in the CLI instead of opening it in the default browser")
serviceCmd.Flags().BoolVar(&https, "https", false, "Open the service URL with https instead of http")
serviceCmd.Flags().IntVar(&wait, "wait", constants.DefaultWait, "Amount of time to wait for a service in seconds")
serviceCmd.Flags().IntVar(&interval, "interval", constants.DefaultInterval, "The time interval for each check that wait performs in seconds")
serviceCmd.Flags().IntVar(&interval, "interval", constants.DefaultInterval, "The initial time interval for each check that wait performs in seconds")
serviceCmd.PersistentFlags().StringVar(&serviceURLFormat, "format", defaultServiceFormatTemplate, "Format to output service URL in. This format will be applied to each url individually and they will be printed one at a time.")

View File

@ -18,7 +18,6 @@ package cmd
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
@ -58,6 +57,8 @@ import (
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/proxy"
pkgutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/lock"
"k8s.io/minikube/pkg/util/retry"
"k8s.io/minikube/pkg/version"
)
@ -756,7 +757,8 @@ func startHost(api libmachine.API, mc cfg.MachineConfig) (*host.Host, bool) {
}
return err
}
if err = pkgutil.RetryAfter(3, start, 2*time.Second); err != nil {
if err = retry.Expo(start, 2*time.Second, 3*time.Minute, 5); err != nil {
exit.WithError("Unable to start VM", err)
}
return host, exists
@ -912,7 +914,7 @@ func configureMounts() {
if err := mountCmd.Start(); err != nil {
exit.WithError("Error starting mount", err)
}
if err := ioutil.WriteFile(filepath.Join(constants.GetMinipath(), constants.MountProcessFileName), []byte(strconv.Itoa(mountCmd.Process.Pid)), 0644); err != nil {
if err := lock.WriteFile(filepath.Join(constants.GetMinipath(), constants.MountProcessFileName), []byte(strconv.Itoa(mountCmd.Process.Pid)), 0644); err != nil {
exit.WithError("Error writing mount pid", err)
}
}

View File

@ -30,7 +30,7 @@ import (
"k8s.io/minikube/pkg/minikube/kubeconfig"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/out"
pkgutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
// stopCmd represents the stop command
@ -63,7 +63,8 @@ func runStop(cmd *cobra.Command, args []string) {
return err
}
}
if err := pkgutil.RetryAfter(3, stop, 2*time.Second); err != nil {
if err := retry.Expo(stop, 5*time.Second, 3*time.Minute, 5); err != nil {
exit.WithError("Unable to stop VM", err)
}

1
go.mod
View File

@ -33,6 +33,7 @@ require (
github.com/intel-go/cpuid v0.0.0-20181003105527-1a4a6f06a1c6 // indirect
github.com/jimmidyson/go-download v0.0.0-20161028105827-7f9a90c8c95b
github.com/johanneswuerbach/nfsexports v0.0.0-20181204082207-1aa528dcb345
github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b
github.com/libvirt/libvirt-go v3.4.0+incompatible
github.com/machine-drivers/docker-machine-driver-vmware v0.1.1
github.com/mattn/go-isatty v0.0.8

2
go.sum
View File

@ -275,6 +275,8 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jteeuwen/go-bindata v0.0.0-20151023091102-a0ff2567cfb7/go.mod h1:JVvhzYOiGBnFSYRyV00iY8q7/0PThjIYav1p9h5dmKs=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b h1:FQ7+9fxhyp82ks9vAuyPzG0/vVbWwMwLJ+P6yJI5FN8=
github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b/go.mod h1:HMcgvsgd0Fjj4XXDkbjdmlbI505rUPBs6WBMYg2pXks=
github.com/kardianos/osext v0.0.0-20150410034420-8fef92e41e22/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/karrick/godirwalk v1.7.5/go.mod h1:2c9FRhkDxdIbgkOnCEvnSWs71Bhugbl46shStcFDJ34=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=

View File

@ -31,7 +31,7 @@ import (
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/cruntime"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
const driverName = constants.DriverNone
@ -239,7 +239,7 @@ func stopKubelet(exec command.Runner) error {
return nil
}
if err := util.RetryAfter(3, stop, 2*time.Second); err != nil {
if err := retry.Expo(stop, 2*time.Second, time.Minute*3, 5); err != nil {
return errors.Wrapf(err, "error stopping kubelet")
}

View File

@ -43,6 +43,7 @@ import (
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
// enum to differentiate kubeadm command line parameters from kubeadm config file parameters (see the
@ -233,13 +234,15 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
if version.LT(semver.MustParse("1.10.0-alpha.0")) {
// TODO(r2d4): get rid of global here
master = k8s.NodeName
if err := util.RetryAfter(200, unmarkMaster, time.Second*1); err != nil {
if err := retry.Expo(unmarkMaster, time.Millisecond*500, time.Second*113); err != nil {
return errors.Wrap(err, "timed out waiting to unmark master")
}
}
glog.Infof("Configuring cluster permissions ...")
if err := util.RetryAfter(100, elevateKubeSystemPrivileges, time.Millisecond*500); err != nil {
if err := retry.Expo(elevateKubeSystemPrivileges, time.Millisecond*500, 40*time.Second); err != nil {
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
}

View File

@ -30,7 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/service"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
const (
@ -119,7 +119,7 @@ func elevateKubeSystemPrivileges() error {
if err != nil {
netErr, ok := err.(net.Error)
if ok && netErr.Timeout() {
return &util.RetriableError{Err: errors.Wrap(err, "creating clusterrolebinding")}
return &retry.RetriableError{Err: errors.Wrap(err, "creating clusterrolebinding")}
}
return errors.Wrap(err, "creating clusterrolebinding")
}

View File

@ -48,8 +48,8 @@ import (
"k8s.io/minikube/pkg/minikube/exit"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/registry"
"k8s.io/minikube/pkg/util"
pkgutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
// hostRunner is a minimal host.Host based interface for running commands
@ -169,7 +169,7 @@ func configureHost(h *host.Host, e *engine.Options) error {
if !localDriver(h.Driver.DriverName()) {
glog.Infof("Configuring auth for driver %s ...", h.Driver.DriverName())
if err := h.ConfigureAuth(); err != nil {
return &util.RetriableError{Err: errors.Wrap(err, "Error configuring auth on host")}
return &retry.RetriableError{Err: errors.Wrap(err, "Error configuring auth on host")}
}
return ensureSyncedGuestClock(h)
}
@ -264,7 +264,7 @@ func StopHost(api libmachine.API) error {
if ok && alreadyInStateError.State == state.Stopped {
return nil
}
return &util.RetriableError{Err: errors.Wrapf(err, "Stop: %s", cfg.GetMachineName())}
return &retry.RetriableError{Err: errors.Wrapf(err, "Stop: %s", cfg.GetMachineName())}
}
return nil
}

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/glog"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/util/lock"
)
// isValid checks if the profile has the essential info needed for a profile
@ -69,7 +70,7 @@ func CreateProfile(name string, cfg *Config, miniHome ...string) error {
// If no config file exists, don't worry about swapping paths
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := ioutil.WriteFile(path, data, 0600); err != nil {
if err := lock.WriteFile(path, data, 0600); err != nil {
return err
}
return nil
@ -81,7 +82,7 @@ func CreateProfile(name string, cfg *Config, miniHome ...string) error {
}
defer os.Remove(tf.Name())
if err = ioutil.WriteFile(tf.Name(), data, 0600); err != nil {
if err = lock.WriteFile(tf.Name(), data, 0600); err != nil {
return err
}

View File

@ -31,6 +31,7 @@ import (
"github.com/golang-collections/collections/stack"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/util/lock"
)
// blacklist is a list of strings to explicitly omit from translation files.
@ -474,7 +475,7 @@ func writeStringsToFiles(e *state, output string) error {
if err != nil {
return errors.Wrap(err, "marshalling translations")
}
err = ioutil.WriteFile(path, c, info.Mode())
err = lock.WriteFile(path, c, info.Mode())
if err != nil {
return errors.Wrap(err, "writing translation file")
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/client-go/tools/clientcmd/api/latest"
"k8s.io/minikube/pkg/minikube/constants"
pkgutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/lock"
)
// IsClusterInConfig verifies the ip stored in kubeconfig.
@ -186,9 +187,10 @@ func writeToFile(config runtime.Object, configPath ...string) error {
}
// write with restricted permissions
if err := ioutil.WriteFile(fPath, data, 0600); err != nil {
if err := lock.WriteFile(fPath, data, 0600); err != nil {
return errors.Wrapf(err, "Error writing file %s", fPath)
}
if err := pkgutil.MaybeChownDirRecursiveToMinikubeUser(dir); err != nil {
return errors.Wrapf(err, "Error recursively changing ownership for dir: %s", dir)
}

View File

@ -142,7 +142,6 @@ func TestUpdate(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
t.Parallel()
tmpDir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("Error making temp directory %v", err)

View File

@ -32,6 +32,7 @@ import (
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/util/lock"
"k8s.io/minikube/pkg/version"
)
@ -132,7 +133,7 @@ func GetAllVersionsFromURL(url string) (Releases, error) {
}
func writeTimeToFile(path string, inputTime time.Time) error {
err := ioutil.WriteFile(path, []byte(inputTime.Format(timeLayout)), 0644)
err := lock.WriteFile(path, []byte(inputTime.Format(timeLayout)), 0644)
if err != nil {
return errors.Wrap(err, "Error writing current update time to file: ")
}

View File

@ -44,7 +44,7 @@ import (
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/proxy"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
// K8sClient represents a kubernetes client
@ -218,7 +218,7 @@ func CheckService(namespace string, service string) error {
svc, err := client.Services(namespace).Get(service, meta.GetOptions{})
if err != nil {
return &util.RetriableError{
return &retry.RetriableError{
Err: errors.Wrapf(err, "Error getting service %s", service),
}
}
@ -263,8 +263,9 @@ func WaitAndMaybeOpenService(api libmachine.API, namespace string, service strin
if interval == 0 {
interval = 1
}
attempts := wait/interval + 1
if err := util.RetryAfter(attempts, func() error { return CheckService(namespace, service) }, time.Duration(interval)*time.Second); err != nil {
chkSVC := func() error { return CheckService(namespace, service) }
if err := retry.Expo(chkSVC, time.Duration(interval)*time.Second, time.Duration(wait)*time.Second); err != nil {
return errors.Wrapf(err, "Could not find finalized endpoint being pointed to by %s", service)
}
@ -307,7 +308,7 @@ func WaitAndMaybeOpenService(api libmachine.API, namespace string, service strin
func GetServiceListByLabel(namespace string, key string, value string) (*core.ServiceList, error) {
client, err := K8s.GetCoreClient()
if err != nil {
return &core.ServiceList{}, &util.RetriableError{Err: err}
return &core.ServiceList{}, &retry.RetriableError{Err: err}
}
return getServiceListFromServicesByLabel(client.Services(namespace), key, value)
}
@ -316,7 +317,7 @@ func getServiceListFromServicesByLabel(services typed_core.ServiceInterface, key
selector := labels.SelectorFromSet(labels.Set(map[string]string{key: value}))
serviceList, err := services.List(meta.ListOptions{LabelSelector: selector.String()})
if err != nil {
return &core.ServiceList{}, &util.RetriableError{Err: err}
return &core.ServiceList{}, &retry.RetriableError{Err: err}
}
return serviceList, nil
@ -326,7 +327,7 @@ func getServiceListFromServicesByLabel(services typed_core.ServiceInterface, key
func CreateSecret(namespace, name string, dataValues map[string]string, labels map[string]string) error {
client, err := K8s.GetCoreClient()
if err != nil {
return &util.RetriableError{Err: err}
return &retry.RetriableError{Err: err}
}
secrets := client.Secrets(namespace)
secret, _ := secrets.Get(name, meta.GetOptions{})
@ -335,7 +336,7 @@ func CreateSecret(namespace, name string, dataValues map[string]string, labels m
if len(secret.Name) > 0 {
err = DeleteSecret(namespace, name)
if err != nil {
return &util.RetriableError{Err: err}
return &retry.RetriableError{Err: err}
}
}
@ -357,7 +358,7 @@ func CreateSecret(namespace, name string, dataValues map[string]string, labels m
_, err = secrets.Create(secretObj)
if err != nil {
return &util.RetriableError{Err: err}
return &retry.RetriableError{Err: err}
}
return nil
@ -367,13 +368,13 @@ func CreateSecret(namespace, name string, dataValues map[string]string, labels m
func DeleteSecret(namespace, name string) error {
client, err := K8s.GetCoreClient()
if err != nil {
return &util.RetriableError{Err: err}
return &retry.RetriableError{Err: err}
}
secrets := client.Secrets(namespace)
err = secrets.Delete(name, &meta.DeleteOptions{})
if err != nil {
return &util.RetriableError{Err: err}
return &retry.RetriableError{Err: err}
}
return nil

View File

@ -41,6 +41,7 @@ import (
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
// BuildrootProvisioner provisions the custom system based on Buildroot
@ -174,13 +175,14 @@ func (p *BuildrootProvisioner) Provision(swarmOptions swarm.Options, authOptions
log.Debugf("set auth options %+v", p.AuthOptions)
log.Debugf("setting up certificates")
configureAuth := func() error {
configAuth := func() error {
if err := configureAuth(p); err != nil {
return &util.RetriableError{Err: err}
return &retry.RetriableError{Err: err}
}
return nil
}
err := util.RetryAfter(5, configureAuth, time.Second*10)
err := retry.Expo(configAuth, time.Second, 2*time.Minute)
if err != nil {
log.Debugf("Error configuring auth during provisioning %v", err)
return err

View File

@ -114,7 +114,7 @@ func convertKind(e reflect.Value, v string) error {
func convertInt(e reflect.Value, v string) error {
i, err := strconv.Atoi(v)
if err != nil {
return fmt.Errorf("Error converting input %s to an integer: %v", v, err)
return fmt.Errorf("error converting input %s to an integer: %v", v, err)
}
e.SetInt(int64(i))
return nil
@ -128,7 +128,7 @@ func convertString(e reflect.Value, v string) error {
func convertFloat(e reflect.Value, v string) error {
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return fmt.Errorf("Error converting input %s to a float: %v", v, err)
return fmt.Errorf("error converting input %s to a float: %v", v, err)
}
e.SetFloat(f)
return nil
@ -137,7 +137,7 @@ func convertFloat(e reflect.Value, v string) error {
func convertBool(e reflect.Value, v string) error {
b, err := strconv.ParseBool(v)
if err != nil {
return fmt.Errorf("Error converting input %s to a bool: %v", v, err)
return fmt.Errorf("error converting input %s to a bool: %v", v, err)
}
e.SetBool(b)
return nil
@ -146,7 +146,7 @@ func convertBool(e reflect.Value, v string) error {
func convertIP(e reflect.Value, v string) error {
ip := net.ParseIP(v)
if ip == nil {
return fmt.Errorf("Error converting input %s to an IP", v)
return fmt.Errorf("error converting input %s to an IP", v)
}
e.Set(reflect.ValueOf(ip))
return nil
@ -155,7 +155,7 @@ func convertIP(e reflect.Value, v string) error {
func convertCIDR(e reflect.Value, v string) error {
_, cidr, err := net.ParseCIDR(v)
if err != nil {
return fmt.Errorf("Error converting input %s to a CIDR: %v", v, err)
return fmt.Errorf("error converting input %s to a CIDR: %v", v, err)
}
e.Set(reflect.ValueOf(*cidr))
return nil
@ -164,7 +164,7 @@ func convertCIDR(e reflect.Value, v string) error {
func convertPortRange(e reflect.Value, v string) error {
pr, err := utilnet.ParsePortRange(v)
if err != nil {
return fmt.Errorf("Error converting input %s to PortRange: %v", v, err)
return fmt.Errorf("error converting input %s to PortRange: %v", v, err)
}
e.Set(reflect.ValueOf(*pr))
return nil
@ -173,7 +173,7 @@ func convertPortRange(e reflect.Value, v string) error {
func convertDuration(e reflect.Value, v string) error {
dur, err := time.ParseDuration(v)
if err != nil {
return fmt.Errorf("Error converting input %s to Duration: %v", v, err)
return fmt.Errorf("error converting input %s to Duration: %v", v, err)
}
e.Set(reflect.ValueOf(dur))
return nil

View File

@ -31,6 +31,7 @@ import (
"time"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/util/lock"
)
// GenerateCACert generates a CA certificate and RSA key for a common name
@ -151,14 +152,14 @@ func writeCertsAndKeys(template *x509.Certificate, certPath string, signeeKey *r
if err := os.MkdirAll(filepath.Dir(certPath), os.FileMode(0755)); err != nil {
return errors.Wrap(err, "Error creating certificate directory")
}
if err := ioutil.WriteFile(certPath, certBuffer.Bytes(), os.FileMode(0644)); err != nil {
if err := lock.WriteFile(certPath, certBuffer.Bytes(), os.FileMode(0644)); err != nil {
return errors.Wrap(err, "Error writing certificate to cert path")
}
if err := os.MkdirAll(filepath.Dir(keyPath), os.FileMode(0755)); err != nil {
return errors.Wrap(err, "Error creating key directory")
}
if err := ioutil.WriteFile(keyPath, keyBuffer.Bytes(), os.FileMode(0600)); err != nil {
if err := lock.WriteFile(keyPath, keyBuffer.Bytes(), os.FileMode(0600)); err != nil {
return errors.Wrap(err, "Error writing key file")
}

View File

@ -82,12 +82,12 @@ func GetClient(kubectlContext ...string) (kubernetes.Interface, error) {
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
config, err := kubeConfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("Error creating kubeConfig: %v", err)
return nil, fmt.Errorf("error creating kubeConfig: %v", err)
}
config = proxy.UpdateTransport(config)
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, errors.Wrap(err, "Error creating new client from kubeConfig.ClientConfig()")
return nil, errors.Wrap(err, "error creating new client from kubeConfig.ClientConfig()")
}
return client, nil
}
@ -121,7 +121,7 @@ func StartPods(c kubernetes.Interface, namespace string, pod core.Pod, waitForRu
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": pod.Name}))
err := WaitForPodsWithLabelRunning(c, namespace, label)
if err != nil {
return fmt.Errorf("Error waiting for pod %s to be running: %v", pod.Name, err)
return fmt.Errorf("error waiting for pod %s to be running: %v", pod.Name, err)
}
}
return nil

61
pkg/util/lock/lock.go Normal file
View File

@ -0,0 +1,61 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lock
import (
"io/ioutil"
"os"
"time"
"github.com/golang/glog"
"github.com/juju/fslock"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/util/retry"
)
// WriteWithLock decorates ioutil.WriteFile with a file lock and retry
func WriteFile(filename string, data []byte, perm os.FileMode) (err error) {
lock := fslock.New(filename)
glog.Infof("attempting to write to file %q with filemode %v", filename, perm)
getLock := func() error {
lockErr := lock.TryLock()
if lockErr != nil {
glog.Warningf("temporary error : %v", lockErr.Error())
return errors.Wrapf(lockErr, "falied to acquire lock for %s > ", filename)
}
return nil
}
defer func() { // release the lock
err = lock.Unlock()
if err != nil {
err = errors.Wrapf(err, "error releasing lock for file: %s", filename)
}
}()
err = retry.Expo(getLock, 500*time.Millisecond, 13*time.Second)
if err != nil {
return errors.Wrapf(err, "error acquiring lock for %s", filename)
}
if err = ioutil.WriteFile(filename, data, perm); err != nil {
return errors.Wrapf(err, "error writing file %s", filename)
}
return err
}

51
pkg/util/retry/retry.go Normal file
View File

@ -0,0 +1,51 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package retry
import (
"time"
"github.com/cenkalti/backoff"
)
const defaultMaxRetries = 113
// Expo is expontential backoff retry.
// initInterval is the initial waiting time to start with.
// maxTime is the max time allowed to spend on the all the retries.
// maxRetries is the optional max number of retries allowed with default of 13.
func Expo(callback func() error, initInterval time.Duration, maxTime time.Duration, maxRetries ...uint64) error {
maxRetry := uint64(defaultMaxRetries) // max number of times to retry
if maxRetries != nil {
maxRetry = maxRetries[0]
}
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = maxTime
b.InitialInterval = initInterval
b.RandomizationFactor = 0.5
b.Multiplier = 1.5
bm := backoff.WithMaxRetries(b, maxRetry)
return backoff.Retry(callback, bm)
}
// RetriableError is an error that can be tried again
type RetriableError struct {
Err error
}
func (r RetriableError) Error() string { return "Temporary Error: " + r.Err.Error() }

View File

@ -0,0 +1,53 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package retry
import (
"errors"
"testing"
)
// Returns a function that will return n errors, then return successfully forever.
func errorGenerator(n int, retryable bool) func() error {
errorCount := 0
return func() (err error) {
if errorCount < n {
errorCount++
e := errors.New("Error")
if retryable {
return &RetriableError{Err: e}
}
return e
}
return nil
}
}
func TestErrorGenerator(t *testing.T) {
errors := 3
f := errorGenerator(errors, false)
for i := 0; i < errors-1; i++ {
if err := f(); err == nil {
t.Fatalf("Error should have been thrown at iteration %v", i)
}
}
if err := f(); err == nil {
t.Fatalf("Error should not have been thrown this call!")
}
}

View File

@ -29,7 +29,6 @@ import (
"time"
units "github.com/docker/go-units"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/exit"
"k8s.io/minikube/pkg/minikube/out"
@ -45,13 +44,6 @@ const (
downloadURL = "https://storage.googleapis.com/minikube/releases/%s/minikube-%s-amd64%s"
)
// RetriableError is an error that can be tried again
type RetriableError struct {
Err error
}
func (r RetriableError) Error() string { return "Temporary Error: " + r.Err.Error() }
// CalculateSizeInMB returns the number of MB in the human readable string
func CalculateSizeInMB(humanReadableSize string) int {
_, err := strconv.ParseInt(humanReadableSize, 10, 64)
@ -106,33 +98,6 @@ func CanReadFile(path string) bool {
return true
}
// Retry retries a number of attempts
func Retry(attempts int, callback func() error) (err error) {
return RetryAfter(attempts, callback, 0)
}
// RetryAfter retries a number of attempts, after a delay
func RetryAfter(attempts int, callback func() error, d time.Duration) (err error) {
m := MultiError{}
for i := 0; i < attempts; i++ {
if i > 0 {
glog.V(1).Infof("retry loop %d", i)
}
err = callback()
if err == nil {
return nil
}
m.Collect(err)
if _, ok := err.(*RetriableError); !ok {
glog.Infof("non-retriable error: %v", err)
return m.ToError()
}
glog.V(2).Infof("error: %v - sleeping %s", err, d)
time.Sleep(d)
}
return m.ToError()
}
// GetBinaryDownloadURL returns a suitable URL for the platform
func GetBinaryDownloadURL(version, platform string) string {
switch platform {
@ -143,31 +108,6 @@ func GetBinaryDownloadURL(version, platform string) string {
}
}
// MultiError holds multiple errors
type MultiError struct {
Errors []error
}
// Collect adds the error
func (m *MultiError) Collect(err error) {
if err != nil {
m.Errors = append(m.Errors, err)
}
}
// ToError converts all errors into one
func (m MultiError) ToError() error {
if len(m.Errors) == 0 {
return nil
}
errStrings := []string{}
for _, err := range m.Errors {
errStrings = append(errStrings, err.Error())
}
return errors.New(strings.Join(errStrings, "\n"))
}
// IsDirectory checks if path is a directory
func IsDirectory(path string) (bool, error) {
fileInfo, err := os.Stat(path)

View File

@ -22,88 +22,8 @@ import (
"strings"
"sync"
"testing"
"github.com/pkg/errors"
)
// Returns a function that will return n errors, then return successfully forever.
func errorGenerator(n int, retryable bool) func() error {
errorCount := 0
return func() (err error) {
if errorCount < n {
errorCount++
e := errors.New("Error")
if retryable {
return &RetriableError{Err: e}
}
return e
}
return nil
}
}
func TestErrorGenerator(t *testing.T) {
errors := 3
f := errorGenerator(errors, false)
for i := 0; i < errors-1; i++ {
if err := f(); err == nil {
t.Fatalf("Error should have been thrown at iteration %v", i)
}
}
if err := f(); err == nil {
t.Fatalf("Error should not have been thrown this call!")
}
}
func TestRetry(t *testing.T) {
f := errorGenerator(4, true)
if err := Retry(5, f); err != nil {
t.Fatalf("Error should not have been raised by retry.")
}
f = errorGenerator(5, true)
if err := Retry(4, f); err == nil {
t.Fatalf("Error should have been raised by retry.")
}
}
func TestRetryNotRetriableError(t *testing.T) {
f := errorGenerator(4, false)
if err := Retry(5, f); err == nil {
t.Fatalf("Error should have been raised by retry.")
}
f = errorGenerator(5, false)
if err := Retry(4, f); err == nil {
t.Fatalf("Error should have been raised by retry.")
}
f = errorGenerator(0, false)
if err := Retry(5, f); err != nil {
t.Fatalf("Error should not have been raised by retry.")
}
}
func TestMultiError(t *testing.T) {
m := MultiError{}
m.Collect(errors.New("Error 1"))
m.Collect(errors.New("Error 2"))
err := m.ToError()
expected := `Error 1
Error 2`
if err.Error() != expected {
t.Fatalf("%s != %s", err.Error(), expected)
}
m = MultiError{}
if err := m.ToError(); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
func TestGetBinaryDownloadURL(t *testing.T) {
testData := []struct {
version string

View File

@ -31,7 +31,7 @@ import (
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/constants"
pkgutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/test/integration/util"
"k8s.io/minikube/pkg/util/retry"
)
// Note this test runs before all because filename is alphabetically first
@ -78,7 +78,7 @@ func downloadMinikubeBinary(t *testing.T, dest string, version string) error {
return getter.GetFile(dest, url)
}
if err := util.RetryX(download, 13*time.Second, 5*time.Minute); err != nil {
if err := retry.Expo(download, 3*time.Second, 3*time.Minute); err != nil {
return errors.Wrap(err, "Failed to get latest release binary")
}
if runtime.GOOS != "windows" {

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
commonutil "k8s.io/minikube/pkg/util"
pkgutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
"k8s.io/minikube/test/integration/util"
)
@ -157,7 +158,7 @@ func testIngressController(t *testing.T) {
return nil
}
if err := util.Retry(t, checkIngress, 2*time.Second, 5); err != nil {
if err := retry.Expo(checkIngress, 500*time.Millisecond, time.Minute); err != nil {
t.Fatalf(err.Error())
}
@ -183,7 +184,7 @@ func testServicesList(t *testing.T) {
}
return nil
}
if err := util.Retry(t, checkServices, 2*time.Second, 5); err != nil {
if err := retry.Expo(checkServices, 500*time.Millisecond, time.Minute); err != nil {
t.Fatalf(err.Error())
}
}
@ -232,10 +233,9 @@ func testRegistry(t *testing.T) {
return nil
}
if err := util.Retry(t, checkExternalAccess, 2*time.Second, 5); err != nil {
if err := retry.Expo(checkExternalAccess, 500*time.Millisecond, 2*time.Minute); err != nil {
t.Fatalf(err.Error())
}
t.Log("checking registry access from inside cluster")
kr := util.NewKubectlRunner(t, p)
// TODO: Fix this
@ -252,12 +252,12 @@ func testRegistry(t *testing.T) {
internalCheckOutput := string(out)
expectedStr := "200"
if !strings.Contains(internalCheckOutput, expectedStr) {
t.Fatalf("ExpectedStr internalCheckOutput to be: %s. Output was: %s", expectedStr, internalCheckOutput)
t.Errorf("ExpectedStr internalCheckOutput to be: %s. Output was: %s", expectedStr, internalCheckOutput)
}
defer func() {
if _, err := kr.RunCommand([]string{"delete", "pod", "registry-test"}); err != nil {
t.Fatalf("failed to delete pod registry-test")
t.Errorf("failed to delete pod registry-test")
}
}()
mk.RunCommand("addons disable registry", true)

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
pkgutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
"k8s.io/minikube/test/integration/util"
)
@ -52,7 +53,8 @@ func testClusterDNS(t *testing.T) {
out, err = kr.RunCommand([]string{"exec", busybox, "nslookup", "kubernetes.default"})
return err
}
if err := util.Retry(t, nslookup, 3*time.Second, 60); err != nil {
if err = retry.Expo(nslookup, 500*time.Millisecond, time.Minute); err != nil {
t.Fatalf(err.Error())
}

View File

@ -24,7 +24,7 @@ import (
"testing"
"time"
"k8s.io/minikube/test/integration/util"
"k8s.io/minikube/pkg/util/retry"
)
// Assert that docker-env subcommand outputs usable information for "docker ps"
@ -61,7 +61,7 @@ func testClusterEnv(t *testing.T) {
}
return nil
}
if err := util.Retry(t, dockerPs, 3*time.Second, 5); err != nil {
if err := retry.Expo(dockerPs, 500*time.Millisecond, time.Minute); err != nil {
t.Fatalf("Error running command: %s. Error: %v Output: %s", "docker ps", err, output)
}
}

View File

@ -24,6 +24,7 @@ import (
"time"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/minikube/pkg/util/retry"
"k8s.io/minikube/test/integration/util"
)
@ -54,7 +55,7 @@ func testClusterStatus(t *testing.T) {
return nil
}
if err := util.Retry(t, healthy, 1*time.Second, 5); err != nil {
if err := retry.Expo(healthy, 500*time.Millisecond, time.Minute); err != nil {
t.Fatalf("Cluster is not healthy: %v", err)
}
}

View File

@ -30,6 +30,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
pkgutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/lock"
"k8s.io/minikube/pkg/util/retry"
"k8s.io/minikube/test/integration/util"
)
@ -77,6 +79,7 @@ func testMounting(t *testing.T) {
}
return nil
}
defer func() {
t.Logf("Deleting pod from: %s", podPath)
if out, err := kr.RunCommand([]string{"delete", "-f", podPath}); err != nil {
@ -84,7 +87,7 @@ func testMounting(t *testing.T) {
}
}()
if err := util.Retry(t, setupTest, 5*time.Second, 40); err != nil {
if err = retry.Expo(setupTest, 500*time.Millisecond, 4*time.Minute); err != nil {
t.Fatal("mountTest failed with error:", err)
}
@ -100,7 +103,8 @@ func testMounting(t *testing.T) {
return nil
}
if err := util.Retry(t, mountTest, 5*time.Second, 40); err != nil {
if err = retry.Expo(mountTest, 500*time.Millisecond, 4*time.Minute); err != nil {
t.Fatalf("mountTest failed with error: %v", err)
}
@ -119,7 +123,7 @@ func getMountCmd(mk util.MinikubeRunner, mountDir string) string {
func writeFilesFromHost(mountedDir string, files []string, content string) error {
for _, file := range files {
path := filepath.Join(mountedDir, file)
err := ioutil.WriteFile(path, []byte(content), 0644)
err := lock.WriteFile(path, []byte(content), 0644)
if err != nil {
return fmt.Errorf("unexpected error while writing file %s: %v", path, err)
}

View File

@ -30,6 +30,7 @@ import (
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/labels"
commonutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
"k8s.io/minikube/test/integration/util"
)
@ -65,7 +66,7 @@ func testProvisioning(t *testing.T) {
return fmt.Errorf("no default StorageClass yet")
}
if err := util.Retry(t, checkStorageClass, 10*time.Second, 10); err != nil {
if err := retry.Expo(checkStorageClass, time.Second, 90*time.Second); err != nil {
t.Fatalf("no default storage class after retry: %v", err)
}
@ -84,7 +85,7 @@ func testProvisioning(t *testing.T) {
return nil
}
if err := util.Retry(t, checkPodRunning, 2*time.Second, 5); err != nil {
if err := retry.Expo(checkPodRunning, 2*time.Second, 2*time.Minute); err != nil {
t.Fatalf("Check storage-provisioner pod running failed with error: %v", err)
}
@ -107,7 +108,7 @@ func testProvisioning(t *testing.T) {
return fmt.Errorf("PV not attached to PVC: %v", pvc)
}
if err := util.Retry(t, checkStorage, 2*time.Second, 5); err != nil {
if err := retry.Expo(checkStorage, 2*time.Second, 2*time.Minute); err != nil {
t.Fatalf("PV Creation failed with error: %v", err)
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/minikube/pkg/minikube/tunnel"
commonutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
"k8s.io/minikube/test/integration/util"
)
@ -146,16 +147,16 @@ func getResponseBody(address string) (string, error) {
var resp *http.Response
var err error
request := func() error {
req := func() error {
resp, err = httpClient.Get(fmt.Sprintf("http://%s", address))
if err != nil {
retriable := &commonutil.RetriableError{Err: err}
retriable := &retry.RetriableError{Err: err}
return retriable
}
return nil
}
if err = commonutil.RetryAfter(5, request, 1*time.Second); err != nil {
if err = retry.Expo(req, time.Millisecond*500, 2*time.Minute, 6); err != nil {
return "", err
}

View File

@ -27,7 +27,7 @@ import (
"github.com/docker/machine/libmachine/state"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/test/integration/util"
"k8s.io/minikube/pkg/util/retry"
)
func TestStartStop(t *testing.T) {
@ -105,7 +105,7 @@ func TestStartStop(t *testing.T) {
return mk.CheckStatusNoFail(state.Stopped.String())
}
err = util.RetryX(stop, 10*time.Second, 2*time.Minute)
err = retry.Expo(stop, 10*time.Second, 5*time.Minute)
mk.CheckStatus(state.Stopped.String())
stdout, stderr, err = mk.Start(tc.args...)

View File

@ -21,7 +21,6 @@ import (
"testing"
"time"
"github.com/cenkalti/backoff"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/labels"
commonutil "k8s.io/minikube/pkg/util"
@ -37,41 +36,6 @@ func WaitForBusyboxRunning(t *testing.T, namespace string, miniProfile string) e
return commonutil.WaitForPodsWithLabelRunning(client, namespace, selector)
}
// Retry tries the callback for a number of attempts, with a delay between attempts
func Retry(t *testing.T, callback func() error, d time.Duration, attempts int) (err error) {
for i := 0; i < attempts; i++ {
err = callback()
if err == nil {
return nil
}
time.Sleep(d)
}
return err
}
// Retry2 tries the callback for a number of attempts, with a delay without *testing.T
func Retry2(callback func() error, d time.Duration, attempts int) (err error) {
for i := 0; i < attempts; i++ {
err = callback()
if err == nil {
return nil
}
time.Sleep(d)
}
return err
}
// RetryX is expontential backoff retry
func RetryX(callback func() error, initInterv time.Duration, maxTime time.Duration) error {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = maxTime
b.InitialInterval = initInterv
b.RandomizationFactor = 0.5
b.Multiplier = 1.5
b.Reset()
return backoff.Retry(callback, b)
}
// Logf writes logs to stdout if -v is set.
func Logf(str string, args ...interface{}) {
if !testing.Verbose() {

View File

@ -25,7 +25,7 @@ import (
"testing"
"time"
commonutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
const kubectlBinary = "kubectl"
@ -79,14 +79,14 @@ func (k *KubectlRunner) RunCommand(args []string, useKubeContext ...bool) (stdou
cmd := exec.Command(k.BinaryPath, args...)
stdout, err = cmd.CombinedOutput()
if err != nil {
retriable := &commonutil.RetriableError{Err: fmt.Errorf("error running command %s: %v. Stdout: \n %s", args, err, stdout)}
retriable := &retry.RetriableError{Err: fmt.Errorf("error running command %s: %v. Stdout: \n %s", args, err, stdout)}
k.T.Log(retriable)
return retriable
}
return nil
}
err = commonutil.RetryAfter(3, inner, 2*time.Second)
err = retry.Expo(inner, time.Millisecond*500, 1*time.Minute, 5)
return stdout, err
}

View File

@ -33,6 +33,7 @@ import (
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/assets"
commonutil "k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
)
// MinikubeRunner runs a command
@ -234,7 +235,7 @@ func (m *MinikubeRunner) Start(opts ...string) (stdout string, stderr string, er
stdout, stderr, err = m.RunCommandRetriable(cmd)
return err
}
err = RetryX(s, 10*time.Second, m.TimeOutStart)
err = retry.Expo(s, 10*time.Second, m.TimeOutStart)
return stdout, stderr, err
}
@ -282,7 +283,7 @@ func (m *MinikubeRunner) Status() (status string, stderr string, err error) {
status = strings.TrimRight(status, "\n")
return err
}
err = RetryX(s, 15*time.Second, 1*time.Minute)
err = retry.Expo(s, 3*time.Second, 2*time.Minute)
if err != nil && (status == state.None.String() || status == state.Stopped.String()) {
err = nil // because https://github.com/kubernetes/minikube/issues/4932
}

View File

@ -28,7 +28,7 @@ import (
"github.com/docker/machine/libmachine/state"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/test/integration/util"
"k8s.io/minikube/pkg/util/retry"
)
func fileExists(fname string) error {
@ -38,12 +38,12 @@ func fileExists(fname string) error {
return err
}
if info.IsDir() {
return fmt.Errorf("Error expect file got dir")
return fmt.Errorf("error expect file got dir")
}
return nil
}
if err := util.Retry2(check, 1*time.Second, 3); err != nil {
if err := retry.Expo(check, 1*time.Second, 3); err != nil {
return errors.Wrap(err, fmt.Sprintf("Failed check if file (%q) exists,", fname))
}
return nil