Upgrade to k8s v1.6.3

pull/1479/head
Matt Rickard 2017-05-11 13:28:10 -07:00
parent 85582c469c
commit 2bdca814a2
79 changed files with 2936 additions and 1948 deletions

2323
Godeps/Godeps.json generated

File diff suppressed because it is too large Load Diff

11
vendor/github.com/renstrom/dedent/.travis.yml generated vendored Normal file
View File

@ -0,0 +1,11 @@
language: go
go:
- 1.0
- 1.1
- 1.2
- 1.3
- 1.4
- 1.5
sudo: false

21
vendor/github.com/renstrom/dedent/LICENSE generated vendored Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2015 Peter Renström
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

50
vendor/github.com/renstrom/dedent/README.md generated vendored Normal file
View File

@ -0,0 +1,50 @@
# Dedent
[![Build Status](https://travis-ci.org/renstrom/dedent.svg?branch=master)](https://travis-ci.org/renstrom/dedent)
[![Godoc](https://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/renstrom/dedent)
Removes common leading whitespace from multiline strings. Inspired by [`textwrap.dedent`](https://docs.python.org/3/library/textwrap.html#textwrap.dedent) in Python.
## Usage / example
Imagine the following snippet that prints a multiline string. You want the indentation to both look nice in the code as well as in the actual output.
```go
package main
import (
"fmt"
"github.com/renstrom/dedent"
)
func main() {
s := `Lorem ipsum dolor sit amet,
consectetur adipiscing elit.
Curabitur justo tellus, facilisis nec efficitur dictum,
fermentum vitae ligula. Sed eu convallis sapien.`
fmt.Println(dedent.Dedent(s))
fmt.Println("-------------")
fmt.Println(s)
}
```
To illustrate the difference, here's the output:
```bash
$ go run main.go
Lorem ipsum dolor sit amet,
consectetur adipiscing elit.
Curabitur justo tellus, facilisis nec efficitur dictum,
fermentum vitae ligula. Sed eu convallis sapien.
-------------
Lorem ipsum dolor sit amet,
consectetur adipiscing elit.
Curabitur justo tellus, facilisis nec efficitur dictum,
fermentum vitae ligula. Sed eu convallis sapien.
```
## License
MIT

49
vendor/github.com/renstrom/dedent/dedent.go generated vendored Normal file
View File

@ -0,0 +1,49 @@
package dedent
import (
"regexp"
"strings"
)
var (
whitespaceOnly = regexp.MustCompile("(?m)^[ \t]+$")
leadingWhitespace = regexp.MustCompile("(?m)(^[ \t]*)(?:[^ \t\n])")
)
// Dedent removes any common leading whitespace from every line in text.
//
// This can be used to make multiline strings to line up with the left edge of
// the display, while still presenting them in the source code in indented
// form.
func Dedent(text string) string {
var margin string
text = whitespaceOnly.ReplaceAllString(text, "")
indents := leadingWhitespace.FindAllStringSubmatch(text, -1)
// Look for the longest leading string of spaces and tabs common to all
// lines.
for i, indent := range indents {
if i == 0 {
margin = indent[1]
} else if strings.HasPrefix(indent[1], margin) {
// Current line more deeply indented than previous winner:
// no change (previous winner is still on top).
continue
} else if strings.HasPrefix(margin, indent[1]) {
// Current line consistent with and no deeper than previous winner:
// it's the new winner.
margin = indent[1]
} else {
// Current line and previous winner have no common whitespace:
// there is no margin.
margin = ""
break
}
}
if margin != "" {
text = regexp.MustCompile("(?m)^"+margin).ReplaceAllString(text, "")
}
return text
}

View File

@ -86,6 +86,9 @@ func toYAML(v interface{}) (string, error) {
// different values in any key. All keys are required to be strings. Since patches of the
// same Type have congruent keys, this is valid for multiple patch types. This method
// supports JSON merge patch semantics.
//
// NOTE: Numbers with different types (e.g. int(0) vs int64(0)) will be detected as conflicts.
// Make sure the unmarshaling of left and right are consistent (e.g. use the same library).
func HasConflicts(left, right interface{}) (bool, error) {
switch typedLeft := left.(type) {
case map[string]interface{}:
@ -94,9 +97,11 @@ func HasConflicts(left, right interface{}) (bool, error) {
for key, leftValue := range typedLeft {
rightValue, ok := typedRight[key]
if !ok {
return false, nil
continue
}
if conflict, err := HasConflicts(leftValue, rightValue); err != nil || conflict {
return conflict, err
}
return HasConflicts(leftValue, rightValue)
}
return false, nil
@ -111,7 +116,9 @@ func HasConflicts(left, right interface{}) (bool, error) {
}
for i := range typedLeft {
return HasConflicts(typedLeft[i], typedRight[i])
if conflict, err := HasConflicts(typedLeft[i], typedRight[i]); err != nil || conflict {
return conflict, err
}
}
return false, nil

View File

@ -476,6 +476,7 @@ func StrategicMergePatch(original, patch []byte, dataStruct interface{}) ([]byte
// StrategicMergePatch applies a strategic merge patch. The original and patch documents
// must be JSONMap. A patch can be created from an original and modified document by
// calling CreateTwoWayMergeMapPatch.
// Warning: the original and patch JSONMap objects are mutated by this function and should not be reused.
func StrategicMergeMapPatch(original, patch JSONMap, dataStruct interface{}) (JSONMap, error) {
t, err := getTagStructType(dataStruct)
if err != nil {

View File

@ -86,21 +86,21 @@ func strategicPatchObject(
patchJS []byte,
objToUpdate runtime.Object,
versionedObj runtime.Object,
) (originalObjMap map[string]interface{}, patchMap map[string]interface{}, retErr error) {
originalObjMap = make(map[string]interface{})
) error {
originalObjMap := make(map[string]interface{})
if err := unstructured.DefaultConverter.ToUnstructured(originalObject, &originalObjMap); err != nil {
return nil, nil, err
return err
}
patchMap = make(map[string]interface{})
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
return nil, nil, err
return err
}
if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil {
return nil, nil, err
return err
}
return
return nil
}
// applyPatchToObject applies a strategic merge patch of <patchMap> to

View File

@ -18,7 +18,6 @@ package handlers
import (
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
@ -39,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/mergepatch"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
@ -104,8 +104,8 @@ func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Reque
// may be used to deserialize an options object to pass to the getter.
type getterFunc func(ctx request.Context, name string, req *restful.Request) (runtime.Object, error)
// maxRetryWhenPatchConflicts is the maximum number of conflicts retry during a patch operation before returning failure
const maxRetryWhenPatchConflicts = 5
// MaxRetryWhenPatchConflicts is the maximum number of conflicts retry during a patch operation before returning failure
const MaxRetryWhenPatchConflicts = 5
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
@ -570,7 +570,7 @@ func patchResource(
originalObjJS []byte
originalPatchedObjJS []byte
originalObjMap map[string]interface{}
originalPatchMap map[string]interface{}
getOriginalPatchMap func() (map[string]interface{}, error)
lastConflictErr error
originalResourceVersion string
)
@ -610,6 +610,26 @@ func patchResource(
return nil, err
}
originalObjJS, originalPatchedObjJS = originalJS, patchedJS
// Make a getter that can return a fresh strategic patch map if needed for conflict retries
// We have to rebuild it each time we need it, because the map gets mutated when being applied
var originalPatchBytes []byte
getOriginalPatchMap = func() (map[string]interface{}, error) {
if originalPatchBytes == nil {
// Compute once
originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
if err != nil {
return nil, err
}
}
// Return a fresh map every time
originalPatchMap := make(map[string]interface{})
if err := json.Unmarshal(originalPatchBytes, &originalPatchMap); err != nil {
return nil, err
}
return originalPatchMap, nil
}
case types.StrategicMergePatchType:
// Since the patch is applied on versioned objects, we need to convert the
// current object to versioned representation first.
@ -621,8 +641,12 @@ func patchResource(
if err != nil {
return nil, err
}
originalMap, patchMap, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
if err != nil {
// Capture the original object map and patch for possible retries.
originalMap := make(map[string]interface{})
if err := unstructured.DefaultConverter.ToUnstructured(currentVersionedObject, &originalMap); err != nil {
return nil, err
}
if err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj); err != nil {
return nil, err
}
// Convert the object back to unversioned.
@ -632,8 +656,17 @@ func patchResource(
return nil, err
}
objToUpdate = unversionedObjToUpdate
// Store unstructured representations for possible retries.
originalObjMap, originalPatchMap = originalMap, patchMap
// Store unstructured representation for possible retries.
originalObjMap = originalMap
// Make a getter that can return a fresh strategic patch map if needed for conflict retries
// We have to rebuild it each time we need it, because the map gets mutated when being applied
getOriginalPatchMap = func() (map[string]interface{}, error) {
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
return nil, err
}
return patchMap, nil
}
}
if err := checkName(objToUpdate, name, namespace, namer); err != nil {
return nil, err
@ -669,17 +702,6 @@ func patchResource(
return nil, err
}
} else {
if originalPatchMap == nil {
// Compute original patch, if we already didn't do this in previous retries.
originalPatch, err := strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
if err != nil {
return nil, err
}
originalPatchMap = make(map[string]interface{})
if err := json.Unmarshal(originalPatch, &originalPatchMap); err != nil {
return nil, err
}
}
// Compute current patch.
currentObjJS, err := runtime.Encode(codec, currentObject)
if err != nil {
@ -695,6 +717,12 @@ func patchResource(
}
}
// Get a fresh copy of the original strategic patch each time through, since applying it mutates the map
originalPatchMap, err := getOriginalPatchMap()
if err != nil {
return nil, err
}
hasConflicts, err := mergepatch.HasConflicts(originalPatchMap, currentPatchMap)
if err != nil {
return nil, err
@ -742,7 +770,7 @@ func patchResource(
return finishRequest(timeout, func() (runtime.Object, error) {
updateObject, _, updateErr := patcher.Update(ctx, name, updatedObjectInfo)
for i := 0; i < maxRetryWhenPatchConflicts && (errors.IsConflict(updateErr)); i++ {
for i := 0; i < MaxRetryWhenPatchConflicts && (errors.IsConflict(updateErr)); i++ {
lastConflictErr = updateErr
updateObject, _, updateErr = patcher.Update(ctx, name, updatedObjectInfo)
}

View File

@ -20,6 +20,7 @@ import (
"bufio"
"net"
"net/http"
"regexp"
"strconv"
"strings"
"time"
@ -58,6 +59,7 @@ var (
},
[]string{"verb", "resource"},
)
kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
)
// Register all metrics.
@ -109,9 +111,12 @@ func InstrumentRouteFunc(verb, resource string, routeFunc restful.RouteFunction)
}
func cleanUserAgent(ua string) string {
// We collapse all "web browser"-type user agents into one "browser" to reduce metric cardinality.
if strings.HasPrefix(ua, "Mozilla/") {
return "Browser"
}
// If an old "kubectl.exe" has passed us its full path, we discard the path portion.
ua = kubectlExeRegexp.ReplaceAllString(ua, "$1")
return ua
}

View File

@ -95,6 +95,8 @@ type Config struct {
EnableContentionProfiling bool
EnableMetrics bool
DisabledPostStartHooks sets.String
// Version will enable the /version endpoint if non-nil
Version *version.Info
// AuditWriter is the destination for audit logs. If nil, they will not be written.
@ -197,6 +199,7 @@ func NewConfig() *Config {
RequestContextMapper: apirequest.NewRequestContextMapper(),
BuildHandlerChainsFunc: DefaultBuildHandlerChain,
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
EnableIndex: true,
EnableProfiling: true,
@ -395,8 +398,10 @@ func (c completedConfig) New() (*GenericAPIServer, error) {
swaggerConfig: c.SwaggerConfig,
openAPIConfig: c.OpenAPIConfig,
postStartHooks: map[string]postStartHookEntry{},
healthzChecks: c.HealthzChecks,
postStartHooks: map[string]postStartHookEntry{},
disabledPostStartHooks: c.DisabledPostStartHooks,
healthzChecks: c.HealthzChecks,
}
s.HandlerContainer = mux.NewAPIContainer(http.NewServeMux(), c.Serializer)

View File

@ -138,10 +138,11 @@ type GenericAPIServer struct {
// PostStartHooks are each called after the server has started listening, in a separate go func for each
// with no guarantee of ordering between them. The map key is a name used for error reporting.
// It may kill the process with a panic if it wishes to by returning an error
postStartHookLock sync.Mutex
postStartHooks map[string]postStartHookEntry
postStartHooksCalled bool
// It may kill the process with a panic if it wishes to by returning an error.
postStartHookLock sync.Mutex
postStartHooks map[string]postStartHookEntry
postStartHooksCalled bool
disabledPostStartHooks sets.String
// healthz checks
healthzLock sync.Mutex

View File

@ -65,6 +65,9 @@ func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc)
if hook == nil {
return nil
}
if s.disabledPostStartHooks.Has(name) {
return nil
}
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()

View File

@ -276,10 +276,10 @@ const (
AffinityAnnotationKey string = "scheduler.alpha.kubernetes.io/affinity"
)
// Tries to add a toleration to annotations list. Returns true if something was updated
// false otherwise.
func AddOrUpdateTolerationInPod(pod *Pod, toleration *Toleration) (bool, error) {
podTolerations := pod.Spec.Tolerations
// AddOrUpdateTolerationInPodSpec tries to add a toleration to the toleration list in PodSpec.
// Returns true if something was updated, false otherwise.
func AddOrUpdateTolerationInPodSpec(spec *PodSpec, toleration *Toleration) (bool, error) {
podTolerations := spec.Tolerations
var newTolerations []Toleration
updated := false
@ -300,10 +300,16 @@ func AddOrUpdateTolerationInPod(pod *Pod, toleration *Toleration) (bool, error)
newTolerations = append(newTolerations, *toleration)
}
pod.Spec.Tolerations = newTolerations
spec.Tolerations = newTolerations
return true, nil
}
// AddOrUpdateTolerationInPod tries to add a toleration to the pod's toleration list.
// Returns true if something was updated, false otherwise.
func AddOrUpdateTolerationInPod(pod *Pod, toleration *Toleration) (bool, error) {
return AddOrUpdateTolerationInPodSpec(&pod.Spec, toleration)
}
// MatchToleration checks if the toleration matches tolerationToMatch. Tolerations are unique by <key,effect,operator,value>,
// if the two tolerations have same <key,effect,operator,value> combination, regard as they match.
// TODO: uniqueness check for tolerations in api validations.

View File

@ -233,6 +233,7 @@ type CronJobSpec struct {
StartingDeadlineSeconds *int64
// ConcurrencyPolicy specifies how to treat concurrent executions of a Job.
// Defaults to Allow.
// +optional
ConcurrencyPolicy ConcurrencyPolicy

View File

@ -72,6 +72,7 @@ message CronJobSpec {
optional int64 startingDeadlineSeconds = 2;
// ConcurrencyPolicy specifies how to treat concurrent executions of a Job.
// Defaults to Allow.
// +optional
optional string concurrencyPolicy = 3;

View File

@ -94,6 +94,7 @@ type CronJobSpec struct {
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,2,opt,name=startingDeadlineSeconds"`
// ConcurrencyPolicy specifies how to treat concurrent executions of a Job.
// Defaults to Allow.
// +optional
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`

View File

@ -52,7 +52,7 @@ var map_CronJobSpec = map[string]string{
"": "CronJobSpec describes how the job execution will look like and when it will actually run.",
"schedule": "Schedule contains the schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.",
"startingDeadlineSeconds": "Optional deadline in seconds for starting the job if it misses scheduled time for any reason. Missed jobs executions will be counted as failed ones.",
"concurrencyPolicy": "ConcurrencyPolicy specifies how to treat concurrent executions of a Job.",
"concurrencyPolicy": "ConcurrencyPolicy specifies how to treat concurrent executions of a Job. Defaults to Allow.",
"suspend": "Suspend flag tells the controller to suspend subsequent executions, it does not apply to already started executions. Defaults to false.",
"jobTemplate": "JobTemplate is the object that describes the job that will be created when executing a CronJob.",
"successfulJobsHistoryLimit": "The number of successful finished jobs to retain. This is a pointer to distinguish between explicit zero and not specified.",

View File

@ -51,7 +51,7 @@ var (
// semantic version is a git hash, but the version itself is no
// longer the direct output of "git describe", but a slight
// translation to be semver compliant.
gitVersion string = "v1.6.0-rc.1+$Format:%h$"
gitVersion string = "v1.6.3-beta.0+$Format:%h$"
gitCommit string = "$Format:%H$" // sha1 from git, output of $(git rev-parse HEAD)
gitTreeState string = "not a git tree" // state of git tree, either "clean" or "dirty"

View File

@ -22,7 +22,7 @@ import (
"net"
"net/http"
"os"
"path"
"path/filepath"
gruntime "runtime"
"strings"
"time"
@ -255,19 +255,51 @@ func SetKubernetesDefaults(config *Config) error {
return nil
}
// DefaultKubernetesUserAgent returns the default user agent that clients can use.
// adjustCommit returns sufficient significant figures of the commit's git hash.
func adjustCommit(c string) string {
if len(c) == 0 {
return "unknown"
}
if len(c) > 7 {
return c[:7]
}
return c
}
// adjustVersion strips "alpha", "beta", etc. from version in form
// major.minor.patch-[alpha|beta|etc].
func adjustVersion(v string) string {
if len(v) == 0 {
return "unknown"
}
seg := strings.SplitN(v, "-", 2)
return seg[0]
}
// adjustCommand returns the last component of the
// OS-specific command path for use in User-Agent.
func adjustCommand(p string) string {
// Unlikely, but better than returning "".
if len(p) == 0 {
return "unknown"
}
return filepath.Base(p)
}
// buildUserAgent builds a User-Agent string from given args.
func buildUserAgent(command, version, os, arch, commit string) string {
return fmt.Sprintf(
"%s/%s (%s/%s) kubernetes/%s", command, version, os, arch, commit)
}
// DefaultKubernetesUserAgent returns a User-Agent string built from static global vars.
func DefaultKubernetesUserAgent() string {
commit := version.Get().GitCommit
if len(commit) > 7 {
commit = commit[:7]
}
if len(commit) == 0 {
commit = "unknown"
}
version := version.Get().GitVersion
seg := strings.SplitN(version, "-", 2)
version = seg[0]
return fmt.Sprintf("%s/%s (%s/%s) kubernetes/%s", path.Base(os.Args[0]), version, gruntime.GOOS, gruntime.GOARCH, commit)
return buildUserAgent(
adjustCommand(os.Args[0]),
adjustVersion(version.Get().GitVersion),
gruntime.GOOS,
gruntime.GOARCH,
adjustCommit(version.Get().GitCommit))
}
// InClusterConfig returns a config object which uses the service account

View File

@ -482,13 +482,13 @@ func (config *inClusterClientConfig) Namespace() (string, bool, error) {
// This way assumes you've set the POD_NAMESPACE environment variable using the downward API.
// This check has to be done first for backwards compatibility with the way InClusterConfig was originally set up
if ns := os.Getenv("POD_NAMESPACE"); ns != "" {
return ns, true, nil
return ns, false, nil
}
// Fall back to the namespace associated with the service account token, if available
if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
if ns := strings.TrimSpace(string(data)); len(ns) > 0 {
return ns, true, nil
return ns, false, nil
}
}

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/client-go/pkg/api"
restclient "k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
@ -134,12 +135,26 @@ func (config *DeferredLoadingClientConfig) Namespace() (string, bool, error) {
return "", false, err
}
ns, ok, err := mergedKubeConfig.Namespace()
ns, overridden, err := mergedKubeConfig.Namespace()
// if we get an error and it is not empty config, or if the merged config defined an explicit namespace, or
// if in-cluster config is not possible, return immediately
if (err != nil && !IsEmptyConfig(err)) || ok || !config.icc.Possible() {
if (err != nil && !IsEmptyConfig(err)) || overridden || !config.icc.Possible() {
// return on any error except empty config
return ns, ok, err
return ns, overridden, err
}
if len(ns) > 0 {
// if we got a non-default namespace from the kubeconfig, use it
if ns != api.NamespaceDefault {
return ns, false, nil
}
// if we got a default namespace, determine whether it was explicit or implicit
if raw, err := mergedKubeConfig.RawConfig(); err == nil {
if context := raw.Contexts[raw.CurrentContext]; context != nil && len(context.Namespace) > 0 {
return ns, false, nil
}
}
}
glog.V(4).Infof("Using in-cluster namespace")

View File

@ -31,9 +31,11 @@ go_library(
"//pkg/kubeapiserver:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/kubeapiserver/authenticator:go_default_library",
"//pkg/kubeapiserver/authorizer/modes:go_default_library",
"//pkg/master:go_default_library",
"//pkg/master/tunneler:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/registry/rbac/rest:go_default_library",
"//pkg/version:go_default_library",
"//plugin/pkg/admission/admit:go_default_library",
"//plugin/pkg/admission/alwayspullimages:go_default_library",

View File

@ -63,9 +63,11 @@ import (
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/master/tunneler"
"k8s.io/kubernetes/pkg/registry/cachesize"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/bootstrap"
)
@ -324,6 +326,9 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
if err != nil {
return nil, nil, fmt.Errorf("invalid authentication config: %v", err)
}
if !sets.NewString(s.Authorization.Modes()...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
apiAuthorizer, err := authorizationConfig.New()

View File

@ -0,0 +1,28 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["constants.go"],
tags = ["automanaged"],
deps = ["//vendor:k8s.io/client-go/pkg/api/v1"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,110 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package constants
import (
"path"
"time"
"k8s.io/client-go/pkg/api/v1"
)
const (
// KubernetesDir is the directory kubernetes owns for storing various configuration files
KubernetesDir = "/etc/kubernetes"
CACertAndKeyBaseName = "ca"
CACertName = "ca.crt"
CAKeyName = "ca.key"
APIServerCertAndKeyBaseName = "apiserver"
APIServerCertName = "apiserver.crt"
APIServerKeyName = "apiserver.key"
APIServerKubeletClientCertAndKeyBaseName = "apiserver-kubelet-client"
APIServerKubeletClientCertName = "apiserver-kubelet-client.crt"
APIServerKubeletClientKeyName = "apiserver-kubelet-client.key"
ServiceAccountKeyBaseName = "sa"
ServiceAccountPublicKeyName = "sa.pub"
ServiceAccountPrivateKeyName = "sa.key"
FrontProxyCACertAndKeyBaseName = "front-proxy-ca"
FrontProxyCACertName = "front-proxy-ca.crt"
FrontProxyCAKeyName = "front-proxy-ca.key"
FrontProxyClientCertAndKeyBaseName = "front-proxy-client"
FrontProxyClientCertName = "front-proxy-client.crt"
FrontProxyClientKeyName = "front-proxy-client.key"
AdminKubeConfigFileName = "admin.conf"
KubeletKubeConfigFileName = "kubelet.conf"
ControllerManagerKubeConfigFileName = "controller-manager.conf"
SchedulerKubeConfigFileName = "scheduler.conf"
// Important: a "v"-prefix shouldn't exist here; semver doesn't allow that
MinimumControlPlaneVersion = "1.6.0-beta.3"
// Some well-known users and groups in the core Kubernetes authorization system
ControllerManagerUser = "system:kube-controller-manager"
SchedulerUser = "system:kube-scheduler"
MastersGroup = "system:masters"
NodesGroup = "system:nodes"
// Constants for what we name our ServiceAccounts with limited access to the cluster in case of RBAC
KubeDNSServiceAccountName = "kube-dns"
KubeProxyServiceAccountName = "kube-proxy"
// APICallRetryInterval defines how long kubeadm should wait before retrying a failed API operation
APICallRetryInterval = 500 * time.Millisecond
// DiscoveryRetryInterval specifies how long kubeadm should wait before retrying to connect to the master when doing discovery
DiscoveryRetryInterval = 5 * time.Second
// Minimum amount of nodes the Service subnet should allow.
// We need at least ten, because the DNS service is always at the tenth cluster clusterIP
MinimumAddressesInServiceSubnet = 10
// DefaultTokenDuration specifies the default amount of time that a bootstrap token will be valid
// Default behaviour is "never expire" == 0
DefaultTokenDuration = 0
// LabelNodeRoleMaster specifies that a node is a master
// It's copied over to kubeadm until it's merged in core: https://github.com/kubernetes/kubernetes/pull/39112
LabelNodeRoleMaster = "node-role.kubernetes.io/master"
// MinExternalEtcdVersion indicates minimum external etcd version which kubeadm supports
MinExternalEtcdVersion = "3.0.14"
// DefaultAdmissionControl specifies the default admission control options that will be used
DefaultAdmissionControl = "NamespaceLifecycle,LimitRanger,ServiceAccount,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds"
)
var (
// MasterToleration is the toleration to apply on the PodSpec for being able to run that Pod on the master
MasterToleration = v1.Toleration{
Key: LabelNodeRoleMaster,
Effect: v1.TaintEffectNoSchedule,
}
AuthorizationPolicyPath = path.Join(KubernetesDir, "abac_policy.json")
AuthorizationWebhookConfigPath = path.Join(KubernetesDir, "webhook_authz.conf")
// DefaultTokenUsages specifies the default functions a token will get
DefaultTokenUsages = []string{"signing", "authentication"}
)

View File

@ -276,10 +276,10 @@ const (
AffinityAnnotationKey string = "scheduler.alpha.kubernetes.io/affinity"
)
// Tries to add a toleration to annotations list. Returns true if something was updated
// false otherwise.
func AddOrUpdateTolerationInPod(pod *Pod, toleration *Toleration) (bool, error) {
podTolerations := pod.Spec.Tolerations
// AddOrUpdateTolerationInPodSpec tries to add a toleration to the toleration list in PodSpec.
// Returns true if something was updated, false otherwise.
func AddOrUpdateTolerationInPodSpec(spec *PodSpec, toleration *Toleration) (bool, error) {
podTolerations := spec.Tolerations
var newTolerations []Toleration
updated := false
@ -300,10 +300,16 @@ func AddOrUpdateTolerationInPod(pod *Pod, toleration *Toleration) (bool, error)
newTolerations = append(newTolerations, *toleration)
}
pod.Spec.Tolerations = newTolerations
spec.Tolerations = newTolerations
return true, nil
}
// AddOrUpdateTolerationInPod tries to add a toleration to the pod's toleration list.
// Returns true if something was updated, false otherwise.
func AddOrUpdateTolerationInPod(pod *Pod, toleration *Toleration) (bool, error) {
return AddOrUpdateTolerationInPodSpec(&pod.Spec, toleration)
}
// MatchToleration checks if the toleration matches tolerationToMatch. Tolerations are unique by <key,effect,operator,value>,
// if the two tolerations have same <key,effect,operator,value> combination, regard as they match.
// TODO: uniqueness check for tolerations in api validations.

View File

@ -233,6 +233,7 @@ type CronJobSpec struct {
StartingDeadlineSeconds *int64
// ConcurrencyPolicy specifies how to treat concurrent executions of a Job.
// Defaults to Allow.
// +optional
ConcurrencyPolicy ConcurrencyPolicy

View File

@ -72,6 +72,7 @@ message CronJobSpec {
optional int64 startingDeadlineSeconds = 2;
// ConcurrencyPolicy specifies how to treat concurrent executions of a Job.
// Defaults to Allow.
// +optional
optional string concurrencyPolicy = 3;

View File

@ -94,6 +94,7 @@ type CronJobSpec struct {
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,2,opt,name=startingDeadlineSeconds"`
// ConcurrencyPolicy specifies how to treat concurrent executions of a Job.
// Defaults to Allow.
// +optional
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`

View File

@ -52,7 +52,7 @@ var map_CronJobSpec = map[string]string{
"": "CronJobSpec describes how the job execution will look like and when it will actually run.",
"schedule": "Schedule contains the schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.",
"startingDeadlineSeconds": "Optional deadline in seconds for starting the job if it misses scheduled time for any reason. Missed jobs executions will be counted as failed ones.",
"concurrencyPolicy": "ConcurrencyPolicy specifies how to treat concurrent executions of a Job.",
"concurrencyPolicy": "ConcurrencyPolicy specifies how to treat concurrent executions of a Job. Defaults to Allow.",
"suspend": "Suspend flag tells the controller to suspend subsequent executions, it does not apply to already started executions. Defaults to false.",
"jobTemplate": "JobTemplate is the object that describes the job that will be created when executing a CronJob.",
"successfulJobsHistoryLimit": "The number of successful finished jobs to retain. This is a pointer to distinguish between explicit zero and not specified.",

View File

@ -20,8 +20,8 @@ import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// IsDefaultStorageClassAnnotation represents a StorageClass annotation that
// marks a class as the default StorageClass
//TODO: Update IsDefaultStorageClassannotation and remove Beta when no longer used
const IsDefaultStorageClassAnnotation = "storageclass.beta.kubernetes.io/is-default-class"
//TODO: remove Beta when no longer used
const IsDefaultStorageClassAnnotation = "storageclass.kubernetes.io/is-default-class"
const BetaIsDefaultStorageClassAnnotation = "storageclass.beta.kubernetes.io/is-default-class"
// IsDefaultAnnotationText returns a pretty Yes/No String if

View File

@ -392,6 +392,10 @@ type CloudConfig struct {
// on a different aws account, on a different cloud provider or on-premise.
// If the flag is set also the KubernetesClusterTag must be provided
VPC string
// SubnetID enables using a specific subnet to use for ELB's
SubnetID string
// RouteTableID enables using a specific RouteTable
RouteTableID string
// KubernetesClusterTag is the legacy cluster id we'll use to identify our cluster resources
KubernetesClusterTag string
@ -817,13 +821,14 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
deviceAllocators: make(map[types.NodeName]DeviceAllocator),
}
if cfg.Global.VPC != "" && cfg.Global.KubernetesClusterTag != "" {
if cfg.Global.VPC != "" && cfg.Global.SubnetID != "" && (cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != "") {
// When the master is running on a different AWS account, cloud provider or on-premise
// build up a dummy instance and use the VPC from the nodes account
glog.Info("Master is configured to run on a AWS account, different cloud provider or on-premise")
glog.Info("Master is configured to run on a different AWS account, different cloud provider or on-premise")
awsCloud.selfAWSInstance = &awsInstance{
nodeName: "master-dummy",
vpcID: cfg.Global.VPC,
subnetID: cfg.Global.SubnetID,
}
awsCloud.vpcID = cfg.Global.VPC
} else {

View File

@ -29,17 +29,27 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
// This should be unnecessary (we already filter on TagNameKubernetesCluster,
// and something is broken if cluster name doesn't match, but anyway...
// TODO: All clouds should be cluster-aware by default
request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)}
response, err := c.ec2.DescribeRouteTables(request)
if err != nil {
return nil, err
}
var tables []*ec2.RouteTable
for _, table := range response {
if c.tagging.hasClusterTag(table.Tags) {
tables = append(tables, table)
if c.cfg.Global.RouteTableID != "" {
request := &ec2.DescribeRouteTablesInput{Filters: []*ec2.Filter{newEc2Filter("route-table-id", c.cfg.Global.RouteTableID)}}
response, err := c.ec2.DescribeRouteTables(request)
if err != nil {
return nil, err
}
tables = response
} else {
request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)}
response, err := c.ec2.DescribeRouteTables(request)
if err != nil {
return nil, err
}
for _, table := range response {
if c.tagging.hasClusterTag(table.Tags) {
tables = append(tables, table)
}
}
}

View File

@ -28,6 +28,7 @@ go_library(
"//pkg/api/v1:go_default_library",
"//pkg/api/v1/service:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//vendor:github.com/Azure/azure-sdk-for-go/arm/compute",
"//vendor:github.com/Azure/azure-sdk-for-go/arm/network",

View File

@ -17,14 +17,17 @@ limitations under the License.
package azure
import (
"fmt"
"io"
"io/ioutil"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/version"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/Azure/azure-sdk-for-go/arm/storage"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/ghodss/yaml"
"time"
@ -125,38 +128,54 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
az.SubnetsClient = network.NewSubnetsClient(az.SubscriptionID)
az.SubnetsClient.BaseURI = az.Environment.ResourceManagerEndpoint
az.SubnetsClient.Authorizer = servicePrincipalToken
az.SubnetsClient.PollingDelay = 5 * time.Second
configureUserAgent(&az.SubnetsClient.Client)
az.RouteTablesClient = network.NewRouteTablesClient(az.SubscriptionID)
az.RouteTablesClient.BaseURI = az.Environment.ResourceManagerEndpoint
az.RouteTablesClient.Authorizer = servicePrincipalToken
az.RouteTablesClient.PollingDelay = 5 * time.Second
configureUserAgent(&az.RouteTablesClient.Client)
az.RoutesClient = network.NewRoutesClient(az.SubscriptionID)
az.RoutesClient.BaseURI = az.Environment.ResourceManagerEndpoint
az.RoutesClient.Authorizer = servicePrincipalToken
az.RoutesClient.PollingDelay = 5 * time.Second
configureUserAgent(&az.RoutesClient.Client)
az.InterfacesClient = network.NewInterfacesClient(az.SubscriptionID)
az.InterfacesClient.BaseURI = az.Environment.ResourceManagerEndpoint
az.InterfacesClient.Authorizer = servicePrincipalToken
az.InterfacesClient.PollingDelay = 5 * time.Second
configureUserAgent(&az.InterfacesClient.Client)
az.LoadBalancerClient = network.NewLoadBalancersClient(az.SubscriptionID)
az.LoadBalancerClient.BaseURI = az.Environment.ResourceManagerEndpoint
az.LoadBalancerClient.Authorizer = servicePrincipalToken
az.LoadBalancerClient.PollingDelay = 5 * time.Second
configureUserAgent(&az.LoadBalancerClient.Client)
az.VirtualMachinesClient = compute.NewVirtualMachinesClient(az.SubscriptionID)
az.VirtualMachinesClient.BaseURI = az.Environment.ResourceManagerEndpoint
az.VirtualMachinesClient.Authorizer = servicePrincipalToken
az.VirtualMachinesClient.PollingDelay = 5 * time.Second
configureUserAgent(&az.VirtualMachinesClient.Client)
az.PublicIPAddressesClient = network.NewPublicIPAddressesClient(az.SubscriptionID)
az.PublicIPAddressesClient.BaseURI = az.Environment.ResourceManagerEndpoint
az.PublicIPAddressesClient.Authorizer = servicePrincipalToken
az.PublicIPAddressesClient.PollingDelay = 5 * time.Second
configureUserAgent(&az.PublicIPAddressesClient.Client)
az.SecurityGroupsClient = network.NewSecurityGroupsClient(az.SubscriptionID)
az.SecurityGroupsClient.BaseURI = az.Environment.ResourceManagerEndpoint
az.SecurityGroupsClient.Authorizer = servicePrincipalToken
az.SecurityGroupsClient.PollingDelay = 5 * time.Second
configureUserAgent(&az.SecurityGroupsClient.Client)
az.StorageAccountClient = storage.NewAccountsClientWithBaseURI(az.Environment.ResourceManagerEndpoint, az.SubscriptionID)
az.StorageAccountClient.Authorizer = servicePrincipalToken
return &az, nil
}
@ -194,3 +213,8 @@ func (az *Cloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []stri
func (az *Cloud) ProviderName() string {
return CloudProviderName
}
func configureUserAgent(client *autorest.Client) {
k8sVersion := version.Get().GitVersion
client.UserAgent = fmt.Sprintf("%s; %s", client.UserAgent, k8sVersion)
}

View File

@ -24,9 +24,11 @@ import (
"net/url"
"path"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"time"
"gopkg.in/gcfg.v1"
@ -49,23 +51,33 @@ import (
)
const (
ProviderName = "vsphere"
ActivePowerState = "poweredOn"
SCSIControllerType = "scsi"
LSILogicControllerType = "lsiLogic"
BusLogicControllerType = "busLogic"
PVSCSIControllerType = "pvscsi"
LSILogicSASControllerType = "lsiLogic-sas"
SCSIControllerLimit = 4
SCSIControllerDeviceLimit = 15
SCSIDeviceSlots = 16
SCSIReservedSlot = 7
ThinDiskType = "thin"
PreallocatedDiskType = "preallocated"
EagerZeroedThickDiskType = "eagerZeroedThick"
ZeroedThickDiskType = "zeroedThick"
VolDir = "kubevols"
RoundTripperDefaultCount = 3
ProviderName = "vsphere"
ActivePowerState = "poweredOn"
SCSIControllerType = "scsi"
LSILogicControllerType = "lsiLogic"
BusLogicControllerType = "busLogic"
PVSCSIControllerType = "pvscsi"
LSILogicSASControllerType = "lsiLogic-sas"
SCSIControllerLimit = 4
SCSIControllerDeviceLimit = 15
SCSIDeviceSlots = 16
SCSIReservedSlot = 7
ThinDiskType = "thin"
PreallocatedDiskType = "preallocated"
EagerZeroedThickDiskType = "eagerZeroedThick"
ZeroedThickDiskType = "zeroedThick"
VolDir = "kubevols"
RoundTripperDefaultCount = 3
DummyVMPrefixName = "vsphere-k8s"
VSANDatastoreType = "vsan"
MAC_OUI_VC = "00:50:56"
MAC_OUI_ESX = "00:0c:29"
DiskNotFoundErrMsg = "No vSphere disk ID found"
NoDiskUUIDFoundErrMsg = "No disk UUID found"
NoDevicesFoundErrMsg = "No devices found"
NonSupportedControllerTypeErrMsg = "Disk is attached to non-supported controller type"
FileAlreadyExistErrMsg = "File requested already exist"
CleanUpDummyVMRoutine_Interval = 5
)
// Controller types that are currently supported for hot attach of disks
@ -85,14 +97,17 @@ var diskFormatValidType = map[string]string{
}
var DiskformatValidOptions = generateDiskFormatValidOptions()
var cleanUpRoutineInitialized = false
var ErrNoDiskUUIDFound = errors.New("No disk UUID found")
var ErrNoDiskIDFound = errors.New("No vSphere disk ID found")
var ErrNoDevicesFound = errors.New("No devices found")
var ErrNonSupportedControllerType = errors.New("Disk is attached to non-supported controller type")
var ErrFileAlreadyExist = errors.New("File requested already exist")
var ErrNoDiskUUIDFound = errors.New(NoDiskUUIDFoundErrMsg)
var ErrNoDiskIDFound = errors.New(DiskNotFoundErrMsg)
var ErrNoDevicesFound = errors.New(NoDevicesFoundErrMsg)
var ErrNonSupportedControllerType = errors.New(NonSupportedControllerTypeErrMsg)
var ErrFileAlreadyExist = errors.New(FileAlreadyExistErrMsg)
var clientLock sync.Mutex
var cleanUpRoutineInitLock sync.Mutex
var cleanUpDummyVMLock sync.RWMutex
// VSphere is an implementation of cloud provider Interface for VSphere.
type VSphere struct {
@ -166,11 +181,12 @@ type Volumes interface {
// VolumeOptions specifies capacity, tags, name and diskFormat for a volume.
type VolumeOptions struct {
CapacityKB int
Tags map[string]string
Name string
DiskFormat string
Datastore string
CapacityKB int
Tags map[string]string
Name string
DiskFormat string
Datastore string
StorageProfileData string
}
// Generates Valid Options for Diskformat
@ -687,6 +703,8 @@ func cleanUpController(ctx context.Context, newSCSIController types.BaseVirtualD
// Attaches given virtual disk volume to the compute running kubelet.
func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName k8stypes.NodeName) (diskID string, diskUUID string, err error) {
var newSCSIController types.BaseVirtualDevice
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -722,50 +740,24 @@ func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName k8stypes.NodeName) (di
var diskControllerType = vs.cfg.Disk.SCSIControllerType
// find SCSI controller of particular type from VM devices
allSCSIControllers := getSCSIControllers(vmDevices)
scsiControllersOfRequiredType := getSCSIControllersOfType(vmDevices, diskControllerType)
scsiController := getAvailableSCSIController(scsiControllersOfRequiredType)
var newSCSICreated = false
var newSCSIController types.BaseVirtualDevice
// creating a scsi controller as there is none found of controller type defined
newSCSICreated := false
if scsiController == nil {
if len(allSCSIControllers) >= SCSIControllerLimit {
// we reached the maximum number of controllers we can attach
return "", "", fmt.Errorf("SCSI Controller Limit of %d has been reached, cannot create another SCSI controller", SCSIControllerLimit)
}
glog.V(1).Infof("Creating a SCSI controller of %v type", diskControllerType)
newSCSIController, err := vmDevices.CreateSCSIController(diskControllerType)
newSCSIController, err = createAndAttachSCSIControllerToVM(ctx, vm, diskControllerType)
if err != nil {
k8runtime.HandleError(fmt.Errorf("error creating new SCSI controller: %v", err))
return "", "", err
}
configNewSCSIController := newSCSIController.(types.BaseVirtualSCSIController).GetVirtualSCSIController()
hotAndRemove := true
configNewSCSIController.HotAddRemove = &hotAndRemove
configNewSCSIController.SharedBus = types.VirtualSCSISharing(types.VirtualSCSISharingNoSharing)
// add the scsi controller to virtual machine
err = vm.AddDevice(context.TODO(), newSCSIController)
if err != nil {
glog.V(1).Infof("cannot add SCSI controller to vm - %v", err)
// attempt clean up of scsi controller
if vmDevices, err := vm.Device(ctx); err == nil {
cleanUpController(ctx, newSCSIController, vmDevices, vm)
}
glog.Errorf("Failed to create SCSI controller for VM :%q with err: %+v", vm.Name(), err)
return "", "", err
}
// verify scsi controller in virtual machine
vmDevices, err = vm.Device(ctx)
vmDevices, err := vm.Device(ctx)
if err != nil {
// cannot cleanup if there is no device list
return "", "", err
}
// Get VM device list
_, vmDevices, _, err := getVirtualMachineDevices(ctx, vs.cfg, vs.client, vSphereInstance)
_, vmDevices, _, err = getVirtualMachineDevices(ctx, vs.cfg, vs.client, vSphereInstance)
if err != nil {
glog.Errorf("cannot get vmDevices for VM err=%s", err)
return "", "", fmt.Errorf("cannot get vmDevices for VM err=%s", err)
@ -798,7 +790,7 @@ func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName k8stypes.NodeName) (di
glog.Errorf("Failed while searching for datastore %+q. err %s", datastorePathObj.Datastore, err)
return "", "", err
}
vmDiskPath = removeClusterFromVDiskPath(vmDiskPath)
disk := vmDevices.CreateDisk(scsiController, ds.Reference(), vmDiskPath)
unitNumber, err := getNextUnitNumber(vmDevices, scsiController)
if err != nil {
@ -1045,6 +1037,7 @@ func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *o
// Returns the object key that denotes the controller object to which vmdk is attached.
func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) {
volPath = removeClusterFromVDiskPath(volPath)
volumeUUID, err := getVirtualDiskUUIDByPath(volPath, dc, client)
if err != nil {
@ -1175,7 +1168,7 @@ func (vs *VSphere) DetachDisk(volPath string, nodeName k8stypes.NodeName) error
if err != nil {
return err
}
volPath = removeClusterFromVDiskPath(volPath)
diskID, err := getVirtualDiskID(volPath, vmDevices, dc, vs.client)
if err != nil {
glog.Warningf("disk ID not found for %v ", volPath)
@ -1200,8 +1193,8 @@ func (vs *VSphere) DetachDisk(volPath string, nodeName k8stypes.NodeName) error
// CreateVolume creates a volume of given size (in KiB).
func (vs *VSphere) CreateVolume(volumeOptions *VolumeOptions) (volumePath string, err error) {
var diskFormat string
var datastore string
var destVolPath string
// Default datastore is the datastore in the vSphere config file that is used initialize vSphere cloud provider.
if volumeOptions.Datastore == "" {
@ -1220,8 +1213,6 @@ func (vs *VSphere) CreateVolume(volumeOptions *VolumeOptions) (volumePath string
" Valid options are %s.", volumeOptions.DiskFormat, DiskformatValidOptions)
}
diskFormat = diskFormatValidType[volumeOptions.DiskFormat]
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -1246,43 +1237,105 @@ func (vs *VSphere) CreateVolume(volumeOptions *VolumeOptions) (volumePath string
return "", err
}
// vmdks will be created inside kubevols directory
kubeVolsPath := filepath.Clean(ds.Path(VolDir)) + "/"
err = makeDirectoryInDatastore(vs.client, dc, kubeVolsPath, false)
if err != nil && err != ErrFileAlreadyExist {
glog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err)
return "", err
}
glog.V(4).Infof("Created dir with path as %+q", kubeVolsPath)
// Create a disk with the VSAN storage capabilities specified in the volumeOptions.StorageProfileData.
// This is achieved by following steps:
// 1. Create dummy VM if not already present.
// 2. Add a new disk to the VM by performing VM reconfigure.
// 3. Detach the new disk from the dummy VM.
// 4. Delete the dummy VM.
if volumeOptions.StorageProfileData != "" {
// Check if the datastore is VSAN if any capability requirements are specified.
// VSphere cloud provider now only supports VSAN capabilities requirements
ok, err := checkIfDatastoreTypeIsVSAN(vs.client, ds)
if err != nil {
return "", fmt.Errorf("Failed while determining whether the datastore: %q"+
" is VSAN or not.", datastore)
}
if !ok {
return "", fmt.Errorf("The specified datastore: %q is not a VSAN datastore."+
" The policy parameters will work only with VSAN Datastore."+
" So, please specify a valid VSAN datastore in Storage class definition.", datastore)
}
vmDiskPath := kubeVolsPath + volumeOptions.Name + ".vmdk"
// Acquire a read lock to ensure multiple PVC requests can be processed simultaneously.
cleanUpDummyVMLock.RLock()
defer cleanUpDummyVMLock.RUnlock()
// Create a virtual disk manager
virtualDiskManager := object.NewVirtualDiskManager(vs.client.Client)
// Create a new background routine that will delete any dummy VM's that are left stale.
// This routine will get executed for every 5 minutes and gets initiated only once in its entire lifetime.
cleanUpRoutineInitLock.Lock()
if !cleanUpRoutineInitialized {
go vs.cleanUpDummyVMs(DummyVMPrefixName)
cleanUpRoutineInitialized = true
}
cleanUpRoutineInitLock.Unlock()
// Create specification for new virtual disk
vmDiskSpec := &types.FileBackedVirtualDiskSpec{
VirtualDiskSpec: types.VirtualDiskSpec{
AdapterType: LSILogicControllerType,
DiskType: diskFormat,
},
CapacityKb: int64(volumeOptions.CapacityKB),
// Check if the VM exists in kubernetes cluster folder.
// The kubernetes cluster folder - vs.cfg.Global.WorkingDir is where all the nodes in the kubernetes cluster are created.
dummyVMFullName := DummyVMPrefixName + "-" + volumeOptions.Name
vmRegex := vs.cfg.Global.WorkingDir + dummyVMFullName
dummyVM, err := f.VirtualMachine(ctx, vmRegex)
if err != nil {
// 1. Create a dummy VM and return the VM reference.
dummyVM, err = vs.createDummyVM(ctx, dc, ds, dummyVMFullName)
if err != nil {
return "", err
}
}
// 2. Reconfigure the VM to attach the disk with the VSAN policy configured.
vmDiskPath, err := vs.createVirtualDiskWithPolicy(ctx, dc, ds, dummyVM, volumeOptions)
fileAlreadyExist := false
if err != nil {
vmDiskPath = filepath.Clean(ds.Path(VolDir)) + "/" + volumeOptions.Name + ".vmdk"
errorMessage := fmt.Sprintf("Cannot complete the operation because the file or folder %s already exists", vmDiskPath)
if errorMessage == err.Error() {
//Skip error and continue to detach the disk as the disk was already created on the datastore.
fileAlreadyExist = true
glog.V(1).Infof("File: %v already exists", vmDiskPath)
} else {
glog.Errorf("Failed to attach the disk to VM: %q with err: %+v", dummyVMFullName, err)
return "", err
}
}
dummyVMNodeName := vmNameToNodeName(dummyVMFullName)
// 3. Detach the disk from the dummy VM.
err = vs.DetachDisk(vmDiskPath, dummyVMNodeName)
if err != nil {
if DiskNotFoundErrMsg == err.Error() && fileAlreadyExist {
// Skip error if disk was already detached from the dummy VM but still present on the datastore.
glog.V(1).Infof("File: %v is already detached", vmDiskPath)
} else {
glog.Errorf("Failed to detach the disk: %q from VM: %q with err: %+v", vmDiskPath, dummyVMFullName, err)
return "", fmt.Errorf("Failed to create the volume: %q with err: %+v", volumeOptions.Name, err)
}
}
// 4. Delete the dummy VM
err = deleteVM(ctx, dummyVM)
if err != nil {
return "", fmt.Errorf("Failed to destroy the vm: %q with err: %+v", dummyVMFullName, err)
}
destVolPath = vmDiskPath
} else {
// Create a virtual disk directly if no VSAN storage capabilities are specified by the user.
destVolPath, err = createVirtualDisk(ctx, vs.client, dc, ds, volumeOptions)
if err != nil {
return "", fmt.Errorf("Failed to create the virtual disk having name: %+q with err: %+v", destVolPath, err)
}
}
// Create virtual disk
task, err := virtualDiskManager.CreateVirtualDisk(ctx, vmDiskPath, dc, vmDiskSpec)
if err != nil {
return "", err
if filepath.Base(datastore) != datastore {
// If Datastore is within cluster, add cluster path to the destVolPath
destVolPath = strings.Replace(destVolPath, filepath.Base(datastore), datastore, 1)
}
err = task.Wait(ctx)
if err != nil {
return "", err
}
return vmDiskPath, nil
glog.V(1).Infof("VM Disk path is %+q", destVolPath)
return destVolPath, nil
}
// DeleteVolume deletes a volume given volume name.
// Also, deletes the folder where the volume resides.
func (vs *VSphere) DeleteVolume(vmDiskPath string) error {
// Create context
ctx, cancel := context.WithCancel(context.Background())
@ -1308,7 +1361,24 @@ func (vs *VSphere) DeleteVolume(vmDiskPath string) error {
if filepath.Ext(vmDiskPath) != ".vmdk" {
vmDiskPath += ".vmdk"
}
// Get the vmDisk Name
diskNameWithExt := path.Base(vmDiskPath)
diskName := strings.TrimSuffix(diskNameWithExt, filepath.Ext(diskNameWithExt))
// Search for the dummyVM if present and delete it.
dummyVMFullName := DummyVMPrefixName + "-" + diskName
vmRegex := vs.cfg.Global.WorkingDir + dummyVMFullName
dummyVM, err := f.VirtualMachine(ctx, vmRegex)
if err == nil {
err = deleteVM(ctx, dummyVM)
if err != nil {
return fmt.Errorf("Failed to destroy the vm: %q with err: %+v", dummyVMFullName, err)
}
}
// Delete virtual disk
vmDiskPath = removeClusterFromVDiskPath(vmDiskPath)
task, err := virtualDiskManager.DeleteVirtualDisk(ctx, vmDiskPath, dc)
if err != nil {
return err
@ -1356,6 +1426,341 @@ func (vs *VSphere) NodeExists(c *govmomi.Client, nodeName k8stypes.NodeName) (bo
return false, nil
}
// A background routine which will be responsible for deleting stale dummy VM's.
func (vs *VSphere) cleanUpDummyVMs(dummyVMPrefix string) {
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
time.Sleep(CleanUpDummyVMRoutine_Interval * time.Minute)
// Ensure client is logged in and session is valid
err := vSphereLogin(ctx, vs)
if err != nil {
glog.V(4).Infof("[cleanUpDummyVMs] Unable to login to vSphere with err: %+v", err)
continue
}
// Create a new finder
f := find.NewFinder(vs.client.Client, true)
// Fetch and set data center
dc, err := f.Datacenter(ctx, vs.cfg.Global.Datacenter)
if err != nil {
glog.V(4).Infof("[cleanUpDummyVMs] Unable to fetch the datacenter: %q with err: %+v", vs.cfg.Global.Datacenter, err)
continue
}
f.SetDatacenter(dc)
// Get the folder reference for global working directory where the dummy VM needs to be created.
vmFolder, err := getFolder(ctx, vs.client, vs.cfg.Global.Datacenter, vs.cfg.Global.WorkingDir)
if err != nil {
glog.V(4).Infof("[cleanUpDummyVMs] Unable to get the kubernetes folder: %q reference with err: %+v", vs.cfg.Global.WorkingDir, err)
continue
}
// A write lock is acquired to make sure the cleanUp routine doesn't delete any VM's created by ongoing PVC requests.
cleanUpDummyVMLock.Lock()
dummyVMRefList, err := getDummyVMList(ctx, vs.client, vmFolder, dummyVMPrefix)
if err != nil {
glog.V(4).Infof("[cleanUpDummyVMs] Unable to get dummy VM list in the kubernetes cluster: %q reference with err: %+v", vs.cfg.Global.WorkingDir, err)
cleanUpDummyVMLock.Unlock()
continue
}
for _, dummyVMRef := range dummyVMRefList {
err = deleteVM(ctx, dummyVMRef)
if err != nil {
glog.V(4).Infof("[cleanUpDummyVMs] Unable to delete dummy VM: %q with err: %+v", dummyVMRef.Name(), err)
continue
}
}
cleanUpDummyVMLock.Unlock()
}
}
// Get the dummy VM list from the kubernetes working directory.
func getDummyVMList(ctx context.Context, c *govmomi.Client, vmFolder *object.Folder, dummyVMPrefix string) ([]*object.VirtualMachine, error) {
vmFolders, err := vmFolder.Children(ctx)
if err != nil {
glog.V(4).Infof("Unable to retrieve the virtual machines from the kubernetes cluster: %+v", vmFolder)
return nil, err
}
var dummyVMRefList []*object.VirtualMachine
pc := property.DefaultCollector(c.Client)
for _, vmFolder := range vmFolders {
if vmFolder.Reference().Type == "VirtualMachine" {
var vmRefs []types.ManagedObjectReference
var vmMorefs []mo.VirtualMachine
vmRefs = append(vmRefs, vmFolder.Reference())
err = pc.Retrieve(ctx, vmRefs, []string{"name"}, &vmMorefs)
if err != nil {
return nil, err
}
if strings.HasPrefix(vmMorefs[0].Name, dummyVMPrefix) {
dummyVMRefList = append(dummyVMRefList, object.NewVirtualMachine(c.Client, vmRefs[0]))
}
}
}
return dummyVMRefList, nil
}
func (vs *VSphere) createDummyVM(ctx context.Context, datacenter *object.Datacenter, datastore *object.Datastore, vmName string) (*object.VirtualMachine, error) {
// Create a virtual machine config spec with 1 SCSI adapter.
virtualMachineConfigSpec := types.VirtualMachineConfigSpec{
Name: vmName,
Files: &types.VirtualMachineFileInfo{
VmPathName: "[" + datastore.Name() + "]",
},
NumCPUs: 1,
MemoryMB: 4,
DeviceChange: []types.BaseVirtualDeviceConfigSpec{
&types.VirtualDeviceConfigSpec{
Operation: types.VirtualDeviceConfigSpecOperationAdd,
Device: &types.ParaVirtualSCSIController{
VirtualSCSIController: types.VirtualSCSIController{
SharedBus: types.VirtualSCSISharingNoSharing,
VirtualController: types.VirtualController{
BusNumber: 0,
VirtualDevice: types.VirtualDevice{
Key: 1000,
},
},
},
},
},
},
}
// Get the resource pool for current node. This is where dummy VM will be created.
resourcePool, err := vs.getCurrentNodeResourcePool(ctx, datacenter)
if err != nil {
return nil, err
}
// Get the folder reference for global working directory where the dummy VM needs to be created.
vmFolder, err := getFolder(ctx, vs.client, vs.cfg.Global.Datacenter, vs.cfg.Global.WorkingDir)
if err != nil {
return nil, fmt.Errorf("Failed to get the folder reference for %q with err: %+v", vs.cfg.Global.WorkingDir, err)
}
task, err := vmFolder.CreateVM(ctx, virtualMachineConfigSpec, resourcePool, nil)
if err != nil {
return nil, err
}
dummyVMTaskInfo, err := task.WaitForResult(ctx, nil)
if err != nil {
return nil, err
}
vmRef := dummyVMTaskInfo.Result.(object.Reference)
dummyVM := object.NewVirtualMachine(vs.client.Client, vmRef.Reference())
return dummyVM, nil
}
func (vs *VSphere) getCurrentNodeResourcePool(ctx context.Context, datacenter *object.Datacenter) (*object.ResourcePool, error) {
// Create a new finder
f := find.NewFinder(vs.client.Client, true)
f.SetDatacenter(datacenter)
vmRegex := vs.cfg.Global.WorkingDir + vs.localInstanceID
currentVM, err := f.VirtualMachine(ctx, vmRegex)
if err != nil {
return nil, err
}
currentVMHost, err := currentVM.HostSystem(ctx)
if err != nil {
return nil, err
}
// Get the resource pool for the current node.
// We create the dummy VM in the same resource pool as current node.
resourcePool, err := currentVMHost.ResourcePool(ctx)
if err != nil {
return nil, err
}
return resourcePool, nil
}
// Creates a virtual disk with the policy configured to the disk.
// A call to this function is made only when a user specifies VSAN storage capabilties in the storage class definition.
func (vs *VSphere) createVirtualDiskWithPolicy(ctx context.Context, datacenter *object.Datacenter, datastore *object.Datastore, virtualMachine *object.VirtualMachine, volumeOptions *VolumeOptions) (string, error) {
var diskFormat string
diskFormat = diskFormatValidType[volumeOptions.DiskFormat]
vmDevices, err := virtualMachine.Device(ctx)
if err != nil {
return "", err
}
var diskControllerType = vs.cfg.Disk.SCSIControllerType
// find SCSI controller of particular type from VM devices
scsiControllersOfRequiredType := getSCSIControllersOfType(vmDevices, diskControllerType)
scsiController := scsiControllersOfRequiredType[0]
kubeVolsPath := filepath.Clean(datastore.Path(VolDir)) + "/"
// Create a kubevols directory in the datastore if one doesn't exist.
err = makeDirectoryInDatastore(vs.client, datacenter, kubeVolsPath, false)
if err != nil && err != ErrFileAlreadyExist {
glog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err)
return "", err
}
glog.V(4).Infof("Created dir with path as %+q", kubeVolsPath)
vmDiskPath := kubeVolsPath + volumeOptions.Name + ".vmdk"
disk := vmDevices.CreateDisk(scsiController, datastore.Reference(), vmDiskPath)
unitNumber, err := getNextUnitNumber(vmDevices, scsiController)
if err != nil {
glog.Errorf("cannot attach disk to VM, limit reached - %v.", err)
return "", err
}
*disk.UnitNumber = unitNumber
disk.CapacityInKB = int64(volumeOptions.CapacityKB)
backing := disk.Backing.(*types.VirtualDiskFlatVer2BackingInfo)
backing.DiskMode = string(types.VirtualDiskModeIndependent_persistent)
switch diskFormat {
case ThinDiskType:
backing.ThinProvisioned = types.NewBool(true)
case EagerZeroedThickDiskType:
backing.EagerlyScrub = types.NewBool(true)
default:
backing.ThinProvisioned = types.NewBool(false)
}
// Reconfigure VM
virtualMachineConfigSpec := types.VirtualMachineConfigSpec{}
deviceConfigSpec := &types.VirtualDeviceConfigSpec{
Device: disk,
Operation: types.VirtualDeviceConfigSpecOperationAdd,
FileOperation: types.VirtualDeviceConfigSpecFileOperationCreate,
}
storageProfileSpec := &types.VirtualMachineDefinedProfileSpec{
ProfileId: "",
ProfileData: &types.VirtualMachineProfileRawData{
ExtensionKey: "com.vmware.vim.sps",
ObjectData: volumeOptions.StorageProfileData,
},
}
deviceConfigSpec.Profile = append(deviceConfigSpec.Profile, storageProfileSpec)
virtualMachineConfigSpec.DeviceChange = append(virtualMachineConfigSpec.DeviceChange, deviceConfigSpec)
task, err := virtualMachine.Reconfigure(ctx, virtualMachineConfigSpec)
if err != nil {
glog.Errorf("Failed to reconfigure the VM with the disk with err - %v.", err)
return "", err
}
err = task.Wait(ctx)
if err != nil {
glog.Errorf("Failed to reconfigure the VM with the disk with err - %v.", err)
return "", err
}
return vmDiskPath, nil
}
// creating a scsi controller as there is none found.
func createAndAttachSCSIControllerToVM(ctx context.Context, vm *object.VirtualMachine, diskControllerType string) (types.BaseVirtualDevice, error) {
// Get VM device list
vmDevices, err := vm.Device(ctx)
if err != nil {
return nil, err
}
allSCSIControllers := getSCSIControllers(vmDevices)
if len(allSCSIControllers) >= SCSIControllerLimit {
// we reached the maximum number of controllers we can attach
return nil, fmt.Errorf("SCSI Controller Limit of %d has been reached, cannot create another SCSI controller", SCSIControllerLimit)
}
newSCSIController, err := vmDevices.CreateSCSIController(diskControllerType)
if err != nil {
k8runtime.HandleError(fmt.Errorf("error creating new SCSI controller: %v", err))
return nil, err
}
configNewSCSIController := newSCSIController.(types.BaseVirtualSCSIController).GetVirtualSCSIController()
hotAndRemove := true
configNewSCSIController.HotAddRemove = &hotAndRemove
configNewSCSIController.SharedBus = types.VirtualSCSISharing(types.VirtualSCSISharingNoSharing)
// add the scsi controller to virtual machine
err = vm.AddDevice(context.TODO(), newSCSIController)
if err != nil {
glog.V(1).Infof("cannot add SCSI controller to vm - %v", err)
// attempt clean up of scsi controller
if vmDevices, err := vm.Device(ctx); err == nil {
cleanUpController(ctx, newSCSIController, vmDevices, vm)
}
return nil, err
}
return newSCSIController, nil
}
// Create a virtual disk.
func createVirtualDisk(ctx context.Context, c *govmomi.Client, dc *object.Datacenter, ds *object.Datastore, volumeOptions *VolumeOptions) (string, error) {
kubeVolsPath := filepath.Clean(ds.Path(VolDir)) + "/"
// Create a kubevols directory in the datastore if one doesn't exist.
err := makeDirectoryInDatastore(c, dc, kubeVolsPath, false)
if err != nil && err != ErrFileAlreadyExist {
glog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err)
return "", err
}
glog.V(4).Infof("Created dir with path as %+q", kubeVolsPath)
vmDiskPath := kubeVolsPath + volumeOptions.Name + ".vmdk"
diskFormat := diskFormatValidType[volumeOptions.DiskFormat]
// Create a virtual disk manager
virtualDiskManager := object.NewVirtualDiskManager(c.Client)
// Create specification for new virtual disk
vmDiskSpec := &types.FileBackedVirtualDiskSpec{
VirtualDiskSpec: types.VirtualDiskSpec{
AdapterType: LSILogicControllerType,
DiskType: diskFormat,
},
CapacityKb: int64(volumeOptions.CapacityKB),
}
// Create virtual disk
task, err := virtualDiskManager.CreateVirtualDisk(ctx, vmDiskPath, dc, vmDiskSpec)
if err != nil {
return "", err
}
return vmDiskPath, task.Wait(ctx)
}
// Check if the provided datastore is VSAN
func checkIfDatastoreTypeIsVSAN(c *govmomi.Client, datastore *object.Datastore) (bool, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pc := property.DefaultCollector(c.Client)
// Convert datastores into list of references
var dsRefs []types.ManagedObjectReference
dsRefs = append(dsRefs, datastore.Reference())
// Retrieve summary property for the given datastore
var dsMorefs []mo.Datastore
err := pc.Retrieve(ctx, dsRefs, []string{"summary"}, &dsMorefs)
if err != nil {
return false, err
}
for _, ds := range dsMorefs {
if ds.Summary.Type == VSANDatastoreType {
return true, nil
}
}
return false, nil
}
// Creates a folder using the specified name.
// If the intermediate level folders do not exist,
// and the parameter createParents is true,
@ -1378,3 +1783,70 @@ func makeDirectoryInDatastore(c *govmomi.Client, dc *object.Datacenter, path str
return err
}
// Get the folder for a given VM
func getFolder(ctx context.Context, c *govmomi.Client, datacenterName string, folderName string) (*object.Folder, error) {
f := find.NewFinder(c.Client, true)
// Fetch and set data center
dc, err := f.Datacenter(ctx, datacenterName)
if err != nil {
return nil, err
}
f.SetDatacenter(dc)
folderName = strings.TrimSuffix(folderName, "/")
dcFolders, err := dc.Folders(ctx)
vmFolders, _ := dcFolders.VmFolder.Children(ctx)
var vmFolderRefs []types.ManagedObjectReference
for _, vmFolder := range vmFolders {
vmFolderRefs = append(vmFolderRefs, vmFolder.Reference())
}
// Get only references of type folder.
var folderRefs []types.ManagedObjectReference
for _, vmFolder := range vmFolderRefs {
if vmFolder.Type == "Folder" {
folderRefs = append(folderRefs, vmFolder)
}
}
// Find the specific folder reference matching the folder name.
var resultFolder *object.Folder
pc := property.DefaultCollector(c.Client)
for _, folderRef := range folderRefs {
var refs []types.ManagedObjectReference
var folderMorefs []mo.Folder
refs = append(refs, folderRef)
err = pc.Retrieve(ctx, refs, []string{"name"}, &folderMorefs)
for _, fref := range folderMorefs {
if fref.Name == folderName {
resultFolder = object.NewFolder(c.Client, folderRef)
}
}
}
return resultFolder, nil
}
// Delete the VM.
func deleteVM(ctx context.Context, vm *object.VirtualMachine) error {
destroyTask, err := vm.Destroy(ctx)
if err != nil {
return err
}
return destroyTask.Wait(ctx)
}
// Remove the cluster or folder path from the vDiskPath
// for vDiskPath [DatastoreCluster/sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk, return value is [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk
// for vDiskPath [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk, return value remains same [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk
func removeClusterFromVDiskPath(vDiskPath string) string {
datastore := regexp.MustCompile("\\[(.*?)\\]").FindStringSubmatch(vDiskPath)[1]
if filepath.Base(datastore) != datastore {
vDiskPath = strings.Replace(vDiskPath, datastore, filepath.Base(datastore), 1)
}
return vDiskPath
}

View File

@ -27,10 +27,30 @@ import (
)
// GetPodTemplateWithHash returns copy of provided template with additional
// label which contains hash of provided template
// label which contains hash of provided template and sets default daemon tolerations.
func GetPodTemplateWithGeneration(template v1.PodTemplateSpec, generation int64) v1.PodTemplateSpec {
obj, _ := api.Scheme.DeepCopy(template)
newTemplate := obj.(v1.PodTemplateSpec)
// DaemonSet pods shouldn't be deleted by NodeController in case of node problems.
// Add infinite toleration for taint notReady:NoExecute here
// to survive taint-based eviction enforced by NodeController
// when node turns not ready.
v1.AddOrUpdateTolerationInPodSpec(&newTemplate.Spec, &v1.Toleration{
Key: metav1.TaintNodeNotReady,
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoExecute,
})
// DaemonSet pods shouldn't be deleted by NodeController in case of node problems.
// Add infinite toleration for taint unreachable:NoExecute here
// to survive taint-based eviction enforced by NodeController
// when node turns unreachable.
v1.AddOrUpdateTolerationInPodSpec(&newTemplate.Spec, &v1.Toleration{
Key: metav1.TaintNodeUnreachable,
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoExecute,
})
templateGenerationStr := fmt.Sprint(generation)
newTemplate.ObjectMeta.Labels = labelsutil.CloneAndAddLabel(
template.ObjectMeta.Labels,

View File

@ -52,7 +52,11 @@ import (
const (
// maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
maxRetries = 5
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
// a deployment is going to be requeued:
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)
// controllerKind contains the schema.GroupVersionKind for this controller type.

View File

@ -1059,7 +1059,8 @@ func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.Re
// IsSaturated checks if the new replica set is saturated by comparing its size with its deployment size.
// Both the deployment and the replica set have to believe this replica set can own all of the desired
// replicas in the deployment and the annotation helps in achieving that.
// replicas in the deployment and the annotation helps in achieving that. All pods of the ReplicaSet
// need to be available.
func IsSaturated(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool {
if rs == nil {
return false
@ -1069,7 +1070,9 @@ func IsSaturated(deployment *extensions.Deployment, rs *extensions.ReplicaSet) b
if err != nil {
return false
}
return *(rs.Spec.Replicas) == *(deployment.Spec.Replicas) && int32(desired) == *(deployment.Spec.Replicas)
return *(rs.Spec.Replicas) == *(deployment.Spec.Replicas) &&
int32(desired) == *(deployment.Spec.Replicas) &&
rs.Status.AvailableReplicas == *(deployment.Spec.Replicas)
}
// WaitForObservedDeployment polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.

View File

@ -36,8 +36,9 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util/node"
nodepkg "k8s.io/kubernetes/pkg/util/node"
utilversion "k8s.io/kubernetes/pkg/util/version"
"github.com/golang/glog"
@ -102,12 +103,12 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
// setPodTerminationReason attempts to set a reason and message in the pod status, updates it in the apiserver,
// and returns an error if it encounters one.
func setPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
if pod.Status.Reason == node.NodeUnreachablePodReason {
if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
return pod, nil
}
pod.Status.Reason = node.NodeUnreachablePodReason
pod.Status.Message = fmt.Sprintf(node.NodeUnreachablePodMessage, nodeName, pod.Name)
pod.Status.Reason = nodepkg.NodeUnreachablePodReason
pod.Status.Message = fmt.Sprintf(nodepkg.NodeUnreachablePodMessage, nodeName, pod.Name)
var updatedPod *v1.Pod
var err error
@ -286,3 +287,32 @@ func recordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_st
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, v1.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
}
// Returns true in case of success and false otherwise
func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool {
taintToAdd.TimeAdded = metav1.Now()
err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintToAdd)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to taint %v unresponsive Node %q: %v",
taintToAdd.Key,
node.Name,
err))
return false
}
glog.V(4).Infof("Added %v Taint to Node %v", taintToAdd, node.Name)
err = controller.RemoveTaintOffNode(kubeClient, node.Name, taintToRemove, node)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to remove %v unneeded taint from unresponsive Node %q: %v",
taintToRemove.Key,
node.Name,
err))
return false
}
glog.V(4).Infof("Made sure that Node %v has no %v Taint", node.Name, taintToRemove)
return true
}

View File

@ -478,6 +478,74 @@ func NewNodeController(
return nc, nil
}
func (nc *NodeController) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
}
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.Infof("Pods awaiting deletion due to NodeController eviction")
}
return true, 0
})
}
}
func (nc *NodeController) doTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNotReadyOrUnreachableTainer {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNotReadyOrUnreachableTainer[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
return true, 0
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
// retry in 50 millisecond
return false, 50 * time.Millisecond
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
}
_, condition := v1.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
if condition.Status == v1.ConditionFalse {
taintToAdd = *NotReadyTaintTemplate
oppositeTaint = *UnreachableTaintTemplate
} else if condition.Status == v1.ConditionUnknown {
taintToAdd = *UnreachableTaintTemplate
oppositeTaint = *NotReadyTaintTemplate
} else {
// It seems that the Node is ready again, so there's no need to taint it.
glog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
return true, 0
}
return swapNodeControllerTaint(nc.kubeClient, &taintToAdd, &oppositeTaint, node), 0
})
}
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() {
go func() {
@ -502,101 +570,12 @@ func (nc *NodeController) Run() {
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNotReadyOrUnreachableTainer {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNotReadyOrUnreachableTainer[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
return true, 0
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
// retry in 50 millisecond
return false, 50 * time.Millisecond
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
}
_, condition := v1.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
if condition.Status == v1.ConditionFalse {
taintToAdd = *NotReadyTaintTemplate
oppositeTaint = *UnreachableTaintTemplate
} else if condition.Status == v1.ConditionUnknown {
taintToAdd = *UnreachableTaintTemplate
oppositeTaint = *NotReadyTaintTemplate
} else {
// It seems that the Node is ready again, so there's no need to taint it.
glog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
return true, 0
}
taintToAdd.TimeAdded = metav1.Now()
err = controller.AddOrUpdateTaintOnNode(nc.kubeClient, value.Value, &taintToAdd)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to taint %v unresponsive Node %q: %v",
taintToAdd.Key,
value.Value,
err))
return false, 0
} else {
glog.V(4).Info("Added %v Taint to Node %v", taintToAdd, value.Value)
}
err = controller.RemoveTaintOffNode(nc.kubeClient, value.Value, &oppositeTaint, node)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to remove %v unneeded taint from unresponsive Node %q: %v",
oppositeTaint.Key,
value.Value,
err))
return false, 0
} else {
glog.V(4).Info("Made sure that Node %v has no %v Taint", value.Value, oppositeTaint)
}
return true, 0
})
}
}, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
}
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.Infof("Pods awaiting deletion due to NodeController eviction")
}
return true, 0
})
}
}, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}
}()
}
@ -685,7 +664,13 @@ func (nc *NodeController) monitorNodeStatus() error {
// Check eviction timeout against decisionTimestamp
if observedReadyCondition.Status == v1.ConditionFalse {
if nc.useTaintBasedEvictions {
if nc.markNodeForTainting(node) {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if v1.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
glog.V(2).Infof("Node %v is NotReady as of %v. Adding it to the Taint queue.",
node.Name,
decisionTimestamp,
@ -706,7 +691,13 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if observedReadyCondition.Status == v1.ConditionUnknown {
if nc.useTaintBasedEvictions {
if nc.markNodeForTainting(node) {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if v1.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
glog.V(2).Infof("Node %v is unresponsive as of %v. Adding it to the Taint queue.",
node.Name,
decisionTimestamp,

View File

@ -16,6 +16,7 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//cmd/kubeadm/app/constants:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",

View File

@ -34,6 +34,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -617,10 +618,16 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
return func(node *v1.Node) bool {
// We add the master to the node list, but its unschedulable. So we use this to filter
// the master.
// TODO: Use a node annotation to indicate the master
if node.Spec.Unschedulable {
return false
}
// As of 1.6, we will taint the master, but not necessarily mark it unschedulable.
// Recognize nodes labeled as master, and filter them also, as we were doing previously.
if _, hasMasterRoleLabel := node.Labels[constants.LabelNodeRoleMaster]; hasMasterRoleLabel {
return false
}
// If we have no info, don't accept
if len(node.Status.Conditions) == 0 {
return false

View File

@ -132,6 +132,13 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
if !isCreated(replicas[i]) {
return ssc.podControl.CreateStatefulPod(set, replicas[i])
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to Terminate",
set.Name, replicas[i].Name)
return nil
}
// If we have a Pod that has been created but is not running and ready we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.

View File

@ -221,14 +221,14 @@ func isFailed(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed
}
// isTerminated returns true if pod's deletion Timestamp has been set
func isTerminated(pod *v1.Pod) bool {
// isTerminating returns true if pod's DeletionTimestamp has been set
func isTerminating(pod *v1.Pod) bool {
return pod.DeletionTimestamp != nil
}
// isHealthy returns true if pod is running and ready and has not been terminated
func isHealthy(pod *v1.Pod) bool {
return isRunningAndReady(pod) && !isTerminated(pod)
return isRunningAndReady(pod) && !isTerminating(pod)
}
// newControllerRef returns an ControllerRef pointing to a given StatefulSet.

View File

@ -12227,7 +12227,7 @@ func GetOpenAPIDefinitions(ref openapi.ReferenceCallback) map[string]openapi.Ope
},
"concurrencyPolicy": {
SchemaProps: spec.SchemaProps{
Description: "ConcurrencyPolicy specifies how to treat concurrent executions of a Job.",
Description: "ConcurrencyPolicy specifies how to treat concurrent executions of a Job. Defaults to Allow.",
Type: []string{"string"},
Format: "",
},

View File

@ -75,14 +75,17 @@ func (s *BuiltInAuthorizationOptions) AddFlags(fs *pflag.FlagSet) {
}
func (s *BuiltInAuthorizationOptions) ToAuthorizationConfig(informerFactory informers.SharedInformerFactory) authorizer.AuthorizationConfig {
func (s *BuiltInAuthorizationOptions) Modes() []string {
modes := []string{}
if len(s.Mode) > 0 {
modes = strings.Split(s.Mode, ",")
}
return modes
}
func (s *BuiltInAuthorizationOptions) ToAuthorizationConfig(informerFactory informers.SharedInformerFactory) authorizer.AuthorizationConfig {
return authorizer.AuthorizationConfig{
AuthorizationModes: modes,
AuthorizationModes: s.Modes(),
PolicyFile: s.PolicyFile,
WebhookConfigFile: s.WebhookConfigFile,
WebhookCacheAuthorizedTTL: s.WebhookCacheAuthorizedTTL,

View File

@ -85,7 +85,7 @@ func (m *containerManager) doWork() {
glog.Errorf("Unable to get docker version: %v", err)
return
}
version, err := utilversion.ParseSemantic(v.Version)
version, err := utilversion.ParseGeneric(v.Version)
if err != nil {
glog.Errorf("Unable to parse docker version %q: %v", v.Version, err)
return

View File

@ -388,6 +388,9 @@ func (ds *dockerService) getDockerAPIVersion() (*semver.Version, error) {
} else {
dv, err = ds.getDockerVersion()
}
if err != nil {
return nil, err
}
apiVersion, err := semver.Parse(dv.APIVersion)
if err != nil {

View File

@ -163,7 +163,6 @@ func modifyHostNetworkOptionForContainer(hostNetwork bool, sandboxID string, hc
hc.NetworkMode = dockercontainer.NetworkMode(sandboxNSMode)
hc.IpcMode = dockercontainer.IpcMode(sandboxNSMode)
hc.UTSMode = ""
hc.PidMode = ""
if hostNetwork {
hc.UTSMode = namespaceModeHost

View File

@ -371,23 +371,22 @@ func (m *managerImpl) reclaimNodeLevelResources(resourceToReclaim v1.ResourceNam
for _, nodeReclaimFunc := range nodeReclaimFuncs {
// attempt to reclaim the pressured resource.
reclaimed, err := nodeReclaimFunc()
if err == nil {
// update our local observations based on the amount reported to have been reclaimed.
// note: this is optimistic, other things could have been still consuming the pressured resource in the interim.
signal := resourceToSignal[resourceToReclaim]
value, ok := observations[signal]
if !ok {
glog.Errorf("eviction manager: unable to find value associated with signal %v", signal)
continue
}
value.available.Add(*reclaimed)
if err != nil {
glog.Warningf("eviction manager: unexpected error when attempting to reduce %v pressure: %v", resourceToReclaim, err)
}
// update our local observations based on the amount reported to have been reclaimed.
// note: this is optimistic, other things could have been still consuming the pressured resource in the interim.
signal := resourceToSignal[resourceToReclaim]
value, ok := observations[signal]
if !ok {
glog.Errorf("eviction manager: unable to find value associated with signal %v", signal)
continue
}
value.available.Add(*reclaimed)
// evaluate all current thresholds to see if with adjusted observations, we think we have met min reclaim goals
if len(thresholdsMet(m.thresholdsMet, observations, true)) == 0 {
return true
}
} else {
glog.Errorf("eviction manager: unexpected error when attempting to reduce %v pressure: %v", resourceToReclaim, err)
// evaluate all current thresholds to see if with adjusted observations, we think we have met min reclaim goals
if len(thresholdsMet(m.thresholdsMet, observations, true)) == 0 {
return true
}
}
return false

View File

@ -76,7 +76,8 @@ type NodeProvider interface {
// ImageGC is responsible for performing garbage collection of unused images.
type ImageGC interface {
// DeleteUnusedImages deletes unused images and returns the number of bytes freed, or an error.
// DeleteUnusedImages deletes unused images and returns the number of bytes freed, and an error.
// This returns the bytes freed even if an error is returned.
DeleteUnusedImages() (int64, error)
}
@ -118,6 +119,8 @@ type thresholdsObservedAt map[evictionapi.Threshold]time.Time
type nodeConditionsObservedAt map[v1.NodeConditionType]time.Time
// nodeReclaimFunc is a function that knows how to reclaim a resource from the node without impacting pods.
// Returns the quantity of resources reclaimed and an error, if applicable.
// nodeReclaimFunc return the resources reclaimed even if an error occurs.
type nodeReclaimFunc func() (*resource.Quantity, error)
// nodeReclaimFuncs is an ordered list of nodeReclaimFunc

View File

@ -2053,7 +2053,7 @@ func (kl *Kubelet) updateRuntimeUp() {
}
// Only check specific conditions when runtime integration type is cri,
// because the old integration doesn't populate any runtime condition.
if kl.kubeletConfiguration.EnableCRI {
if kl.kubeletConfiguration.EnableCRI && kl.kubeletConfiguration.ContainerRuntime != "rkt" {
if s == nil {
glog.Errorf("Container runtime status is nil")
return

View File

@ -135,7 +135,28 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
return nil, err
}
if mount.SubPath != "" {
fileinfo, err := os.Lstat(hostPath)
if err != nil {
return nil, err
}
perm := fileinfo.Mode()
hostPath = filepath.Join(hostPath, mount.SubPath)
// Create the sub path now because if it's auto-created later when referenced, it may have an
// incorrect ownership and mode. For example, the sub path directory must have at least g+rwx
// when the pod specifies an fsGroup, and if the directory is not created here, Docker will
// later auto-create it with the incorrect mode 0750
if err := os.MkdirAll(hostPath, perm); err != nil {
glog.Errorf("failed to mkdir:%s", hostPath)
return nil, err
}
// chmod the sub path because umask may have prevented us from making the sub path with the same
// permissions as the mounter path
if err := os.Chmod(hostPath, perm); err != nil {
return nil, err
}
}
// Docker Volume Mounts fail on Windows if it is not of the form C:/

View File

@ -357,10 +357,10 @@ func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessag
// readLastStringFromContainerLogs attempts to read up to the max log length from the end of the CRI log represented
// by path. It reads up to max log lines.
func readLastStringFromContainerLogs(path string) string {
func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) string {
value := int64(kubecontainer.MaxContainerTerminationMessageLogLines)
buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
if err := ReadLogs(path, &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
if err := m.ReadLogs(path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
return fmt.Sprintf("Error on reading termination message from logs: %v", err)
}
return buf.String()
@ -414,7 +414,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, n
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
if checkLogs {
path := buildFullContainerLogsPath(uid, labeledInfo.ContainerName, annotatedInfo.RestartCount)
tMessage = readLastStringFromContainerLogs(path)
tMessage = m.readLastStringFromContainerLogs(path)
}
// Use the termination message written by the application is not empty
if len(tMessage) != 0 {
@ -688,7 +688,7 @@ func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *v1.Pod, containerID ku
labeledInfo := getContainerInfoFromLabels(status.Labels)
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
path := buildFullContainerLogsPath(pod.UID, labeledInfo.ContainerName, annotatedInfo.RestartCount)
return ReadLogs(path, logOptions, stdout, stderr)
return m.ReadLogs(path, containerID.ID, logOptions, stdout, stderr)
}
// GetExec gets the endpoint the runtime will serve the exec request from.

View File

@ -32,6 +32,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/util/tail"
)
@ -54,6 +55,11 @@ const (
timeFormat = time.RFC3339Nano
// blockSize is the block size used in tail.
blockSize = 1024
// stateCheckPeriod is the period to check container state while following
// the container log. Kubelet should not keep following the log when the
// container is not running.
stateCheckPeriod = 5 * time.Second
)
var (
@ -110,7 +116,9 @@ func newLogOptions(apiOpts *v1.PodLogOptions, now time.Time) *logOptions {
}
// ReadLogs read the container log and redirect into stdout and stderr.
func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
// Note that containerID is only needed when following the log, or else
// just pass in empty string "".
func (m *kubeGenericRuntimeManager) ReadLogs(path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", path, err)
@ -166,8 +174,8 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer)
}
}
// Wait until the next log change.
if err := waitLogs(watcher); err != nil {
return fmt.Errorf("failed to wait logs for log file %q: %v", path, err)
if found, err := m.waitLogs(containerID, watcher); !found {
return err
}
continue
}
@ -196,6 +204,41 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer)
}
}
// waitLogs wait for the next log write. It returns a boolean and an error. The boolean
// indicates whether a new log is found; the error is error happens during waiting new logs.
func (m *kubeGenericRuntimeManager) waitLogs(id string, w *fsnotify.Watcher) (bool, error) {
errRetry := 5
for {
select {
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:
return true, nil
default:
glog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
}
case err := <-w.Errors:
glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
if errRetry == 0 {
return false, err
}
errRetry--
case <-time.After(stateCheckPeriod):
s, err := m.runtimeService.ContainerStatus(id)
if err != nil {
return false, err
}
// Only keep following container log when it is running.
if s.State != runtimeapi.ContainerState_CONTAINER_RUNNING {
glog.Errorf("Container %q is not running (state=%q)", id, s.State)
// Do not return error because it's normal that the container stops
// during waiting.
return false, nil
}
}
}
}
// parseFunc is a function parsing one log line to the internal log type.
// Notice that the caller must make sure logMessage is not nil.
type parseFunc func([]byte, *logMessage) error
@ -267,28 +310,6 @@ func getParseFunc(log []byte) (parseFunc, error) {
return nil, fmt.Errorf("unsupported log format: %q", log)
}
// waitLogs wait for the next log write.
func waitLogs(w *fsnotify.Watcher) error {
errRetry := 5
for {
select {
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:
return nil
default:
glog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
}
case err := <-w.Errors:
glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
if errRetry == 0 {
return err
}
errRetry--
}
}
}
// logWriter controls the writing into the stream based on the log options.
type logWriter struct {
stdout io.Writer

View File

@ -41,24 +41,24 @@ func (m *kubeGenericRuntimeManager) determineEffectiveSecurityContext(pod *v1.Po
}
// set namespace options and supplemental groups.
podSc := pod.Spec.SecurityContext
if podSc == nil {
return synthesized
}
synthesized.NamespaceOptions = &runtimeapi.NamespaceOption{
HostNetwork: pod.Spec.HostNetwork,
HostIpc: pod.Spec.HostIPC,
HostPid: pod.Spec.HostPID,
}
if podSc.FSGroup != nil {
synthesized.SupplementalGroups = append(synthesized.SupplementalGroups, *podSc.FSGroup)
podSc := pod.Spec.SecurityContext
if podSc != nil {
if podSc.FSGroup != nil {
synthesized.SupplementalGroups = append(synthesized.SupplementalGroups, *podSc.FSGroup)
}
if podSc.SupplementalGroups != nil {
synthesized.SupplementalGroups = append(synthesized.SupplementalGroups, podSc.SupplementalGroups...)
}
}
if groups := m.runtimeHelper.GetExtraSupplementalGroupsForPod(pod); len(groups) > 0 {
synthesized.SupplementalGroups = append(synthesized.SupplementalGroups, groups...)
}
if podSc.SupplementalGroups != nil {
synthesized.SupplementalGroups = append(synthesized.SupplementalGroups, podSc.SupplementalGroups...)
}
return synthesized
}

View File

@ -209,7 +209,11 @@ func (r *RemoteRuntimeService) StartContainer(containerID string) error {
// StopContainer stops a running container with a grace period (i.e., timeout).
func (r *RemoteRuntimeService) StopContainer(containerID string, timeout int64) error {
ctx, cancel := getContextWithTimeout(r.timeout)
ctx, cancel := getContextWithTimeout(time.Duration(timeout) * time.Second)
if timeout == 0 {
// Use default timeout if stop timeout is 0.
ctx, cancel = getContextWithTimeout(r.timeout)
}
defer cancel()
_, err := r.runtimeClient.StopContainer(ctx, &runtimeapi.StopContainerRequest{

View File

@ -11,20 +11,17 @@ load(
go_library(
name = "go_default_library",
srcs = [
"api.go",
"doc.go",
"healthcheck.go",
"http.go",
"listener.go",
"worker.go",
],
tags = ["automanaged"],
deps = [
"//vendor:github.com/golang/glog",
"//vendor:github.com/renstrom/dedent",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/record",
],
)
@ -34,6 +31,7 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//vendor:github.com/davecgh/go-spew/spew",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
],

View File

@ -1,65 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
)
// All public API Methods for this package
// UpdateEndpoints Update the set of local endpoints for a service
func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) {
req := &proxyMutationRequest{
serviceName: serviceName,
endpointUids: &endpointUids,
}
healthchecker.mutationRequestChannel <- req
}
func updateServiceListener(serviceName types.NamespacedName, listenPort int, add bool) bool {
responseChannel := make(chan bool)
req := &proxyListenerRequest{
serviceName: serviceName,
listenPort: uint16(listenPort),
add: add,
responseChannel: responseChannel,
}
healthchecker.listenerRequestChannel <- req
return <-responseChannel
}
// AddServiceListener Request addition of a listener for a service's health check
func AddServiceListener(serviceName types.NamespacedName, listenPort int) bool {
return updateServiceListener(serviceName, listenPort, true)
}
// DeleteServiceListener Request deletion of a listener for a service's health check
func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) bool {
return updateServiceListener(serviceName, listenPort, false)
}
// Run Start the healthchecker main loop
func Run() {
healthchecker = proxyHealthCheckFactory()
// Wrap with a wait.Forever to handle panics.
go wait.Forever(func() {
healthchecker.handlerLoop()
}, 0)
}

View File

@ -14,5 +14,5 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies
// Package healthcheck provides tools for serving kube-proxy healthchecks.
package healthcheck

View File

@ -20,108 +20,216 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync"
"github.com/golang/glog"
"github.com/renstrom/dedent"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/pkg/api"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
)
// proxyMutationRequest: Message to request addition/deletion of endpoints for a service
type proxyMutationRequest struct {
serviceName types.NamespacedName
endpointUids *sets.String
// Server serves HTTP endpoints for each service name, with results
// based on the endpoints. If there are 0 endpoints for a service, it returns a
// 503 "Service Unavailable" error (telling LBs not to use this node). If there
// are 1 or more endpoints, it returns a 200 "OK".
type Server interface {
// Make the new set of services be active. Services that were open before
// will be closed. Services that are new will be opened. Service that
// existed and are in the new set will be left alone. The value of the map
// is the healthcheck-port to listen on.
SyncServices(newServices map[types.NamespacedName]uint16) error
// Make the new set of endpoints be active. Endpoints for services that do
// not exist will be dropped. The value of the map is the number of
// endpoints the service has on this node.
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
}
// proxyListenerRequest: Message to request addition/deletion of a service responder on a listening port
type proxyListenerRequest struct {
serviceName types.NamespacedName
listenPort uint16
add bool
responseChannel chan bool
// Listener allows for testing of Server. If the Listener argument
// to NewServer() is nil, the real net.Listen function will be used.
type Listener interface {
// Listen is very much like net.Listen, except the first arg (network) is
// fixed to be "tcp".
Listen(addr string) (net.Listener, error)
}
// serviceEndpointsList: A list of endpoints for a service
type serviceEndpointsList struct {
serviceName types.NamespacedName
endpoints *sets.String
// HTTPServerFactory allows for testing of Server. If the
// HTTPServerFactory argument to NewServer() is nil, the real
// http.Server type will be used.
type HTTPServerFactory interface {
// New creates an instance of a type satisfying HTTPServer. This is
// designed to include http.Server.
New(addr string, handler http.Handler) HTTPServer
}
// serviceResponder: Contains net/http datastructures necessary for responding to each Service's health check on its aux nodePort
type serviceResponder struct {
serviceName types.NamespacedName
listenPort uint16
listener *net.Listener
server *http.Server
// HTTPServer allows for testing of Server.
type HTTPServer interface {
// Server is designed so that http.Server satifies this interface,
Serve(listener net.Listener) error
}
// proxyHC: Handler structure for health check, endpoint add/delete and service listener add/delete requests
type proxyHC struct {
serviceEndpointsMap cache.ThreadSafeStore
serviceResponderMap map[types.NamespacedName]serviceResponder
mutationRequestChannel chan *proxyMutationRequest
listenerRequestChannel chan *proxyListenerRequest
}
// handleHealthCheckRequest - received a health check request - lookup and respond to HC.
func (h *proxyHC) handleHealthCheckRequest(rw http.ResponseWriter, serviceName string) {
s, ok := h.serviceEndpointsMap.Get(serviceName)
if !ok {
glog.V(4).Infof("Service %s not found or has no local endpoints", serviceName)
sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "No Service Endpoints Found")
return
// NewServer allocates a new healthcheck server manager. If either
// of the injected arguments are nil, defaults will be used.
func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server {
if listener == nil {
listener = stdNetListener{}
}
numEndpoints := len(*s.(*serviceEndpointsList).endpoints)
if numEndpoints > 0 {
sendHealthCheckResponse(rw, http.StatusOK, fmt.Sprintf("%d Service Endpoints found", numEndpoints))
return
if httpServerFactory == nil {
httpServerFactory = stdHTTPServerFactory{}
}
return &server{
hostname: hostname,
recorder: recorder,
listener: listener,
httpFactory: httpServerFactory,
services: map[types.NamespacedName]*hcInstance{},
}
sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "0 local Endpoints are alive")
}
// handleMutationRequest - receive requests to mutate the table entry for a service
func (h *proxyHC) handleMutationRequest(req *proxyMutationRequest) {
numEndpoints := len(*req.endpointUids)
glog.V(4).Infof("LB service health check mutation request Service: %s - %d Endpoints %v",
req.serviceName, numEndpoints, (*req.endpointUids).List())
if numEndpoints == 0 {
if _, ok := h.serviceEndpointsMap.Get(req.serviceName.String()); ok {
glog.V(4).Infof("Deleting endpoints map for service %s, all local endpoints gone", req.serviceName.String())
h.serviceEndpointsMap.Delete(req.serviceName.String())
}
return
// Implement Listener in terms of net.Listen.
type stdNetListener struct{}
func (stdNetListener) Listen(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
var _ Listener = stdNetListener{}
// Implement HTTPServerFactory in terms of http.Server.
type stdHTTPServerFactory struct{}
func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
return &http.Server{
Addr: addr,
Handler: handler,
}
var entry *serviceEndpointsList
e, exists := h.serviceEndpointsMap.Get(req.serviceName.String())
if exists {
entry = e.(*serviceEndpointsList)
if entry.endpoints.Equal(*req.endpointUids) {
return
}
// Compute differences just for printing logs about additions and removals
deletedEndpoints := entry.endpoints.Difference(*req.endpointUids)
newEndpoints := req.endpointUids.Difference(*entry.endpoints)
for _, e := range newEndpoints.List() {
glog.V(4).Infof("Adding local endpoint %s to LB health check for service %s",
e, req.serviceName.String())
}
for _, d := range deletedEndpoints.List() {
glog.V(4).Infof("Deleted endpoint %s from service %s LB health check (%d endpoints left)",
d, req.serviceName.String(), len(*entry.endpoints))
}
var _ HTTPServerFactory = stdHTTPServerFactory{}
type server struct {
hostname string
recorder record.EventRecorder // can be nil
listener Listener
httpFactory HTTPServerFactory
lock sync.Mutex
services map[types.NamespacedName]*hcInstance
}
func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
hcs.lock.Lock()
defer hcs.lock.Unlock()
// Remove any that are not needed any more.
for nsn, svc := range hcs.services {
if port, found := newServices[nsn]; !found || port != svc.port {
glog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.listener.Close(); err != nil {
glog.Errorf("Close(%v): %v", svc.listener.Addr(), err)
}
delete(hcs.services, nsn)
}
}
entry = &serviceEndpointsList{serviceName: req.serviceName, endpoints: req.endpointUids}
h.serviceEndpointsMap.Add(req.serviceName.String(), entry)
// Add any that are needed.
for nsn, port := range newServices {
if hcs.services[nsn] != nil {
glog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port)
continue
}
glog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port)
svc := &hcInstance{port: port}
addr := fmt.Sprintf(":%d", port)
svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs})
var err error
svc.listener, err = hcs.listener.Listen(addr)
if err != nil {
msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
if hcs.recorder != nil {
hcs.recorder.Eventf(
&clientv1.ObjectReference{
Kind: "Service",
Namespace: nsn.Namespace,
Name: nsn.Name,
UID: types.UID(nsn.String()),
}, api.EventTypeWarning, "FailedToStartHealthcheck", msg)
}
glog.Error(msg)
continue
}
hcs.services[nsn] = svc
go func(nsn types.NamespacedName, svc *hcInstance) {
// Serve() will exit when the listener is closed.
glog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.server.Serve(svc.listener); err != nil {
glog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
return
}
glog.V(3).Infof("Healthcheck %q closed", nsn.String())
}(nsn, svc)
}
return nil
}
// proxyHealthCheckRequest - Factory method to instantiate the health check handler
func proxyHealthCheckFactory() *proxyHC {
glog.V(2).Infof("Initializing kube-proxy health checker")
phc := &proxyHC{
serviceEndpointsMap: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
serviceResponderMap: make(map[types.NamespacedName]serviceResponder),
mutationRequestChannel: make(chan *proxyMutationRequest, 1024),
listenerRequestChannel: make(chan *proxyListenerRequest, 1024),
}
return phc
type hcInstance struct {
port uint16
listener net.Listener
server HTTPServer
endpoints int // number of local endpoints for a service
}
type hcHandler struct {
name types.NamespacedName
hcs *server
}
var _ http.Handler = hcHandler{}
func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
h.hcs.lock.Lock()
count := h.hcs.services[h.name].endpoints
h.hcs.lock.Unlock()
resp.Header().Set("Content-Type", "application/json")
if count == 0 {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
}
fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
{
"service": {
"namespace": %q,
"name": %q
},
"localEndpoints": %d
}
`, h.name.Namespace, h.name.Name, count)), "\n"))
}
func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
hcs.lock.Lock()
defer hcs.lock.Unlock()
for nsn, count := range newEndpoints {
if hcs.services[nsn] == nil {
glog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String())
continue
}
glog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String())
hcs.services[nsn].endpoints = count
}
for nsn, hci := range hcs.services {
if _, found := newEndpoints[nsn]; !found {
hci.endpoints = 0
}
}
return nil
}

View File

@ -1,46 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
import (
"fmt"
"net/http"
"github.com/golang/glog"
)
// A healthCheckHandler serves http requests on /healthz on the service health check node port,
// and responds to every request with either:
// 200 OK and the count of endpoints for the given service that are local to this node.
// or
// 503 Service Unavailable If the count is zero or the service does not exist
type healthCheckHandler struct {
svcNsName string
}
// HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object
func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error string) {
rw.Header().Set("Content-Type", "text/plain")
rw.WriteHeader(statusCode)
fmt.Fprint(rw, error)
}
// ServeHTTP: Interface callback method for net.Listener Handlers
func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) {
glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcNsName)
healthchecker.handleHealthCheckRequest(response, h.svcNsName)
}

View File

@ -1,77 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
// Create/Delete dynamic listeners on the required nodePorts
import (
"fmt"
"net"
"net/http"
"github.com/golang/glog"
)
// handleServiceListenerRequest: receive requests to add/remove service health check listening ports
func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool {
sr, serviceFound := h.serviceResponderMap[req.serviceName]
if !req.add {
if !serviceFound {
return false
}
glog.Infof("Deleting HealthCheckListenPort for service %s port %d",
req.serviceName, req.listenPort)
delete(h.serviceResponderMap, req.serviceName)
(*sr.listener).Close()
return true
} else if serviceFound {
if req.listenPort == sr.listenPort {
// Addition requested but responder for service already exists and port is unchanged
return true
}
// Addition requested but responder for service already exists but the listen port has changed
glog.Infof("HealthCheckListenPort for service %s changed from %d to %d - closing old listening port",
req.serviceName, sr.listenPort, req.listenPort)
delete(h.serviceResponderMap, req.serviceName)
(*sr.listener).Close()
}
// Create a service responder object and start listening and serving on the provided port
glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort)
server := http.Server{
Addr: fmt.Sprintf(":%d", req.listenPort),
Handler: healthCheckHandler{svcNsName: req.serviceName.String()},
}
listener, err := net.Listen("tcp", server.Addr)
if err != nil {
glog.Warningf("FAILED to listen on address %s (%s)\n", server.Addr, err)
return false
}
h.serviceResponderMap[req.serviceName] = serviceResponder{serviceName: req.serviceName,
listenPort: req.listenPort,
listener: &listener,
server: &server}
go func() {
// Anonymous goroutine to block on Serve for this listen port - Serve will exit when the listener is closed
glog.V(3).Infof("Goroutine blocking on serving health checks for %s on port %d", req.serviceName, req.listenPort)
if err := server.Serve(listener); err != nil {
glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed with error %s\n", req.listenPort, req.serviceName, err)
return
}
glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed\n", req.listenPort, req.serviceName)
}()
return true
}

View File

@ -1,53 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies
package healthcheck
import (
"time"
"github.com/golang/glog"
)
var healthchecker *proxyHC
// handlerLoop Serializes all requests to prevent concurrent access to the maps
func (h *proxyHC) handlerLoop() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case req := <-h.mutationRequestChannel:
h.handleMutationRequest(req)
case req := <-h.listenerRequestChannel:
req.responseChannel <- h.handleServiceListenerRequest(req)
case <-ticker.C:
go h.sync()
}
}
}
func (h *proxyHC) sync() {
glog.V(4).Infof("%d Health Check Listeners", len(h.serviceResponderMap))
glog.V(4).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List()))
for _, svc := range h.serviceEndpointsMap.ListKeys() {
if e, ok := h.serviceEndpointsMap.Get(svc); ok {
endpointList := e.(*serviceEndpointsList)
glog.V(4).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len())
}
}
}

View File

@ -50,7 +50,6 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
],
)

View File

@ -213,7 +213,7 @@ type Proxier struct {
nodeIP net.IP
portMapper portOpener
recorder record.EventRecorder
healthChecker healthChecker
healthChecker healthcheck.Server
}
type localPort struct {
@ -245,17 +245,6 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
return openLocalPort(lp)
}
type healthChecker interface {
UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String)
}
// TODO: the healthcheck pkg should offer a type
type globalHealthChecker struct{}
func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {
healthcheck.UpdateEndpoints(serviceName, endpointUIDs)
}
// Proxier implements ProxyProvider
var _ proxy.ProxyProvider = &Proxier{}
@ -309,8 +298,7 @@ func NewProxier(ipt utiliptables.Interface,
glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
}
healthChecker := globalHealthChecker{}
go healthcheck.Run()
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
var throttle flowcontrol.RateLimiter
// Defaulting back to not limit sync rate when minSyncPeriod is 0.
@ -444,18 +432,12 @@ func (proxier *Proxier) SyncLoop() {
}
}
type healthCheckPort struct {
namespace types.NamespacedName
nodeport int
}
// Accepts a list of Services and the existing service map. Returns the new
// service map, a list of healthcheck ports to add to or remove from the health
// checking listener service, and a set of stale UDP services.
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
// service map, a map of healthcheck ports, and a set of stale UDP
// services.
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) {
newServiceMap := make(proxyServiceMap)
healthCheckAdd := make([]healthCheckPort, 0)
healthCheckDel := make([]healthCheckPort, 0)
hcPorts := make(map[types.NamespacedName]uint16)
for i := range allServices {
service := &allServices[i]
@ -492,12 +474,8 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
}
if !exists || !equal {
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort})
} else {
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
}
if info.onlyNodeLocalEndpoints {
hcPorts[svcName] = uint16(info.healthCheckNodePort)
}
newServiceMap[serviceName] = info
@ -505,6 +483,13 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (
}
}
for nsn, port := range hcPorts {
if port == 0 {
glog.Errorf("Service %q has no healthcheck nodeport", nsn)
delete(hcPorts, nsn)
}
}
staleUDPServices := sets.NewString()
// Remove serviceports missing from the update.
for name, info := range oldServiceMap {
@ -513,13 +498,10 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (
if info.protocol == api.ProtocolUDP {
staleUDPServices.Insert(info.clusterIP.String())
}
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort})
}
}
}
return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices
return newServiceMap, hcPorts, staleUDPServices
}
// OnServiceUpdate tracks the active set of service proxies.
@ -533,19 +515,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
defer proxier.mu.Unlock()
proxier.haveReceivedServiceUpdate = true
newServiceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap)
for _, hc := range hcAdd {
glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport)
// Turn on healthcheck responder to listen on the health check nodePort
// FIXME: handle failures from adding the service
healthcheck.AddServiceListener(hc.namespace, hc.nodeport)
}
for _, hc := range hcDel {
// Remove ServiceListener health check nodePorts from the health checker
// TODO - Stats
glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport)
// FIXME: handle failures from deleting the service
healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport)
newServiceMap, hcPorts, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap)
// update healthcheck ports
if err := proxier.healthChecker.SyncServices(hcPorts); err != nil {
glog.Errorf("Error syncing healtcheck ports: %v", err)
}
if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) {
@ -568,7 +542,13 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.allEndpoints = allEndpoints
// TODO: once service has made this same transform, move this into proxier.syncProxyRules()
newMap, staleConnections := updateEndpoints(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker)
newMap, hcEndpoints, staleConnections := updateEndpoints(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname)
// update healthcheck endpoints
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err)
}
if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) {
proxier.endpointsMap = newMap
proxier.syncProxyRules()
@ -580,11 +560,11 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
}
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, hostname string,
healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) {
func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
// return values
newMap = make(proxyEndpointMap)
hcEndpoints = make(map[types.NamespacedName]int)
staleSet = make(map[endpointServicePair]bool)
// Update endpoints for services.
@ -610,19 +590,30 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, host
}
}
// Update service health check
allSvcPorts := make(map[proxy.ServicePortName]bool)
for svcPort := range curMap {
allSvcPorts[svcPort] = true
}
for svcPort := range newMap {
allSvcPorts[svcPort] = true
}
for svcPort := range allSvcPorts {
updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker)
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}
return newMap, staleSet
// accumulate local IPs per service, ignoring ports
localIPs := map[types.NamespacedName]sets.String{}
for svcPort := range newMap {
for _, ep := range newMap[svcPort] {
if ep.isLocal {
nsn := svcPort.NamespacedName
if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString()
}
ip := strings.Split(ep.endpoint, ":")[0] // just the IP part
localIPs[nsn].Insert(ip)
}
}
}
// produce a count per service
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
}
return newMap, hcEndpoints, staleSet
}
// Gather information about all the endpoint state for a given api.Endpoints.
@ -668,23 +659,6 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
}
}
// updateHealthCheckEntries - send the new set of local endpoints to the health checker
func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}
// Use a set instead of a slice to provide deduplication
epSet := sets.NewString()
for _, portInfo := range endpoints {
if portInfo.isLocal {
// kube-proxy health check only needs local endpoints
epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
}
}
healthChecker.UpdateEndpoints(name, epSet)
}
// portProtoHash takes the ServicePortName and protocol for a service
// returns the associated 16 character hash. This is computed by hashing (sha256)
// then encoding to base32 and truncating to 16 chars. We do this because IPTables

View File

@ -55,6 +55,8 @@ import (
"k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac/bootstrappolicy"
)
const PostStartHookName = "rbac/bootstrap-roles"
type RESTStorageProvider struct {
Authorizer authorizer.Authorizer
}
@ -123,7 +125,7 @@ func (p RESTStorageProvider) storage(version schema.GroupVersion, apiResourceCon
}
func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return "rbac/bootstrap-roles", PostStartHook, nil
return PostStartHookName, PostStartHook, nil
}
func PostStartHook(hookContext genericapiserver.PostStartHookContext) error {

View File

@ -51,7 +51,7 @@ var (
// semantic version is a git hash, but the version itself is no
// longer the direct output of "git describe", but a slight
// translation to be semver compliant.
gitVersion string = "v1.6.0+$Format:%h$"
gitVersion string = "v1.6.3+$Format:%h$"
gitCommit string = "$Format:%H$" // sha1 from git, output of $(git rev-parse HEAD)
gitTreeState string = "not a git tree" // state of git tree, either "clean" or "dirty"

View File

@ -151,7 +151,7 @@ func (plugin *photonPersistentDiskPlugin) ConstructVolumeSpec(volumeSpecName, mo
// Abstract interface to disk operations.
type pdManager interface {
// Creates a volume
CreateVolume(provisioner *photonPersistentDiskProvisioner) (pdID string, volumeSizeGB int, err error)
CreateVolume(provisioner *photonPersistentDiskProvisioner) (pdID string, volumeSizeGB int, fstype string, err error)
// Deletes a volume
DeleteVolume(deleter *photonPersistentDiskDeleter) error
}
@ -342,11 +342,15 @@ func (plugin *photonPersistentDiskPlugin) newProvisionerInternal(options volume.
}
func (p *photonPersistentDiskProvisioner) Provision() (*v1.PersistentVolume, error) {
pdID, sizeGB, err := p.manager.CreateVolume(p)
pdID, sizeGB, fstype, err := p.manager.CreateVolume(p)
if err != nil {
return nil, err
}
if fstype == "" {
fstype = "ext4"
}
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: p.options.PVName,
@ -364,7 +368,7 @@ func (p *photonPersistentDiskProvisioner) Provision() (*v1.PersistentVolume, err
PersistentVolumeSource: v1.PersistentVolumeSource{
PhotonPersistentDisk: &v1.PhotonPersistentDiskVolumeSource{
PdID: pdID,
FSType: "ext4",
FSType: fstype,
},
},
},

View File

@ -80,11 +80,11 @@ func verifyDevicePath(path string) (string, error) {
}
// CreateVolume creates a PhotonController persistent disk.
func (util *PhotonDiskUtil) CreateVolume(p *photonPersistentDiskProvisioner) (pdID string, capacityGB int, err error) {
func (util *PhotonDiskUtil) CreateVolume(p *photonPersistentDiskProvisioner) (pdID string, capacityGB int, fstype string, err error) {
cloud, err := getCloudProvider(p.plugin.host.GetCloudProvider())
if err != nil {
glog.Errorf("Photon Controller Util: CreateVolume failed to get cloud provider. Error [%v]", err)
return "", 0, err
return "", 0, "", err
}
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
@ -102,20 +102,23 @@ func (util *PhotonDiskUtil) CreateVolume(p *photonPersistentDiskProvisioner) (pd
switch strings.ToLower(parameter) {
case "flavor":
volumeOptions.Flavor = value
case "fstype":
fstype = value
glog.V(4).Infof("Photon Controller Util: Setting fstype to %s", fstype)
default:
glog.Errorf("Photon Controller Util: invalid option %s for volume plugin %s.", parameter, p.plugin.GetPluginName())
return "", 0, fmt.Errorf("Photon Controller Util: invalid option %s for volume plugin %s.", parameter, p.plugin.GetPluginName())
return "", 0, "", fmt.Errorf("Photon Controller Util: invalid option %s for volume plugin %s.", parameter, p.plugin.GetPluginName())
}
}
pdID, err = cloud.CreateDisk(volumeOptions)
if err != nil {
glog.Errorf("Photon Controller Util: failed to CreateDisk. Error [%v]", err)
return "", 0, err
return "", 0, "", err
}
glog.V(4).Infof("Successfully created Photon Controller persistent disk %s", name)
return pdID, volSizeGB, nil
return pdID, volSizeGB, "", nil
}
// DeleteVolume deletes a vSphere volume.

View File

@ -152,10 +152,9 @@ func GetClassForVolume(kubeClient clientset.Interface, pv *v1.PersistentVolume)
if kubeClient == nil {
return nil, fmt.Errorf("Cannot get kube client")
}
// TODO: replace with a real attribute after beta
className, found := pv.Annotations["volume.beta.kubernetes.io/storage-class"]
if !found {
return nil, fmt.Errorf("Volume has no class annotation")
className := v1.GetPersistentVolumeClass(pv)
if className == "" {
return nil, fmt.Errorf("Volume has no storage class")
}
class, err := kubeClient.StorageV1beta1().StorageClasses().Get(className, metav1.GetOptions{})

View File

@ -152,7 +152,7 @@ func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath str
// Abstract interface to disk operations.
type vdManager interface {
// Creates a volume
CreateVolume(provisioner *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeGB int, err error)
CreateVolume(provisioner *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeGB int, fstype string, err error)
// Deletes a volume
DeleteVolume(deleter *vsphereVolumeDeleter) error
}
@ -188,6 +188,7 @@ type vsphereVolumeMounter struct {
func (b *vsphereVolumeMounter) GetAttributes() volume.Attributes {
return volume.Attributes{
SupportsSELinux: true,
Managed: true,
}
}
@ -343,11 +344,15 @@ func (plugin *vsphereVolumePlugin) newProvisionerInternal(options volume.VolumeO
}
func (v *vsphereVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
vmDiskPath, sizeKB, err := v.manager.CreateVolume(v)
vmDiskPath, sizeKB, fstype, err := v.manager.CreateVolume(v)
if err != nil {
return nil, err
}
if fstype == "" {
fstype = "ext4"
}
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: v.options.PVName,
@ -365,7 +370,7 @@ func (v *vsphereVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
PersistentVolumeSource: v1.PersistentVolumeSource{
VsphereVolume: &v1.VsphereVirtualDiskVolumeSource{
VolumePath: vmDiskPath,
FSType: "ext4",
FSType: fstype,
},
},
},

View File

@ -19,6 +19,7 @@ package vsphere_volume
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
@ -35,6 +36,27 @@ const (
checkSleepDuration = time.Second
diskByIDPath = "/dev/disk/by-id/"
diskSCSIPrefix = "wwn-0x"
Fstype = "fstype"
diskformat = "diskformat"
datastore = "datastore"
HostFailuresToTolerateCapability = "hostfailurestotolerate"
ForceProvisioningCapability = "forceprovisioning"
CacheReservationCapability = "cachereservation"
DiskStripesCapability = "diskstripes"
ObjectSpaceReservationCapability = "objectspacereservation"
IopsLimitCapability = "iopslimit"
HostFailuresToTolerateCapabilityMin = 0
HostFailuresToTolerateCapabilityMax = 3
ForceProvisioningCapabilityMin = 0
ForceProvisioningCapabilityMax = 1
CacheReservationCapabilityMin = 0
CacheReservationCapabilityMax = 100
DiskStripesCapabilityMin = 1
DiskStripesCapabilityMax = 12
ObjectSpaceReservationCapabilityMin = 0
ObjectSpaceReservationCapabilityMax = 100
IopsLimitCapabilityMin = 0
)
var ErrProbeVolume = errors.New("Error scanning attached volumes")
@ -52,10 +74,11 @@ func verifyDevicePath(path string) (string, error) {
}
// CreateVolume creates a vSphere volume.
func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeKB int, err error) {
func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeKB int, fstype string, err error) {
cloud, err := getCloudProvider(v.plugin.host.GetCloudProvider())
if err != nil {
return "", 0, err
return "", 0, "", err
}
capacity := v.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
@ -73,27 +96,43 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPa
// the values to the cloud provider.
for parameter, value := range v.options.Parameters {
switch strings.ToLower(parameter) {
case "diskformat":
case diskformat:
volumeOptions.DiskFormat = value
case "datastore":
case datastore:
volumeOptions.Datastore = value
case Fstype:
fstype = value
glog.V(4).Infof("Setting fstype as %q", fstype)
case HostFailuresToTolerateCapability, ForceProvisioningCapability,
CacheReservationCapability, DiskStripesCapability,
ObjectSpaceReservationCapability, IopsLimitCapability:
capabilityData, err := validateVSANCapability(strings.ToLower(parameter), value)
if err != nil {
return "", 0, "", err
} else {
volumeOptions.StorageProfileData += capabilityData
}
default:
return "", 0, fmt.Errorf("invalid option %q for volume plugin %s", parameter, v.plugin.GetPluginName())
return "", 0, "", fmt.Errorf("invalid option %q for volume plugin %s", parameter, v.plugin.GetPluginName())
}
}
if volumeOptions.StorageProfileData != "" {
volumeOptions.StorageProfileData = "(" + volumeOptions.StorageProfileData + ")"
}
glog.V(1).Infof("StorageProfileData in vsphere volume %q", volumeOptions.StorageProfileData)
// TODO: implement PVC.Selector parsing
if v.options.PVC.Spec.Selector != nil {
return "", 0, fmt.Errorf("claim.Spec.Selector is not supported for dynamic provisioning on vSphere")
return "", 0, "", fmt.Errorf("claim.Spec.Selector is not supported for dynamic provisioning on vSphere")
}
vmDiskPath, err = cloud.CreateVolume(volumeOptions)
if err != nil {
glog.V(2).Infof("Error creating vsphere volume: %v", err)
return "", 0, err
return "", 0, "", err
}
glog.V(2).Infof("Successfully created vsphere volume %s", name)
return vmDiskPath, volSizeKB, nil
return vmDiskPath, volSizeKB, fstype, nil
}
// DeleteVolume deletes a vSphere volume.
@ -132,3 +171,71 @@ func getCloudProvider(cloud cloudprovider.Interface) (*vsphere.VSphere, error) {
}
return vs, nil
}
// Validate the capability requirement for the user specified policy attributes.
func validateVSANCapability(capabilityName string, capabilityValue string) (string, error) {
var capabilityData string
capabilityIntVal, ok := verifyCapabilityValueIsInteger(capabilityValue)
if !ok {
return "", fmt.Errorf("Invalid value for %s. The capabilityValue: %s must be a valid integer value", capabilityName, capabilityValue)
}
switch strings.ToLower(capabilityName) {
case HostFailuresToTolerateCapability:
if capabilityIntVal >= HostFailuresToTolerateCapabilityMin && capabilityIntVal <= HostFailuresToTolerateCapabilityMax {
capabilityData = " (\"hostFailuresToTolerate\" i" + capabilityValue + ")"
} else {
return "", fmt.Errorf(`Invalid value for hostFailuresToTolerate.
The default value is %d, minimum value is %d and maximum value is %d.`,
1, HostFailuresToTolerateCapabilityMin, HostFailuresToTolerateCapabilityMax)
}
case ForceProvisioningCapability:
if capabilityIntVal >= ForceProvisioningCapabilityMin && capabilityIntVal <= ForceProvisioningCapabilityMax {
capabilityData = " (\"forceProvisioning\" i" + capabilityValue + ")"
} else {
return "", fmt.Errorf(`Invalid value for forceProvisioning.
The value can be either %d or %d.`,
ForceProvisioningCapabilityMin, ForceProvisioningCapabilityMax)
}
case CacheReservationCapability:
if capabilityIntVal >= CacheReservationCapabilityMin && capabilityIntVal <= CacheReservationCapabilityMax {
capabilityData = " (\"cacheReservation\" i" + strconv.Itoa(capabilityIntVal*10000) + ")"
} else {
return "", fmt.Errorf(`Invalid value for cacheReservation.
The minimum percentage is %d and maximum percentage is %d.`,
CacheReservationCapabilityMin, CacheReservationCapabilityMax)
}
case DiskStripesCapability:
if capabilityIntVal >= DiskStripesCapabilityMin && capabilityIntVal <= DiskStripesCapabilityMax {
capabilityData = " (\"stripeWidth\" i" + capabilityValue + ")"
} else {
return "", fmt.Errorf(`Invalid value for diskStripes.
The minimum value is %d and maximum value is %d.`,
DiskStripesCapabilityMin, DiskStripesCapabilityMax)
}
case ObjectSpaceReservationCapability:
if capabilityIntVal >= ObjectSpaceReservationCapabilityMin && capabilityIntVal <= ObjectSpaceReservationCapabilityMax {
capabilityData = " (\"proportionalCapacity\" i" + capabilityValue + ")"
} else {
return "", fmt.Errorf(`Invalid value for ObjectSpaceReservation.
The minimum percentage is %d and maximum percentage is %d.`,
ObjectSpaceReservationCapabilityMin, ObjectSpaceReservationCapabilityMax)
}
case IopsLimitCapability:
if capabilityIntVal >= IopsLimitCapabilityMin {
capabilityData = " (\"iopsLimit\" i" + capabilityValue + ")"
} else {
return "", fmt.Errorf(`Invalid value for iopsLimit.
The value should be greater than %d.`, IopsLimitCapabilityMin)
}
}
return capabilityData, nil
}
// Verify if the capability value is of type integer.
func verifyCapabilityValueIsInteger(capabilityValue string) (int, bool) {
i, err := strconv.Atoi(capabilityValue)
if err != nil {
return -1, false
}
return i, true
}

View File

@ -43,12 +43,14 @@ go_test(
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apiserver/pkg/admission",
"//vendor:k8s.io/client-go/tools/cache",
],
)

View File

@ -287,18 +287,20 @@ func (s *serviceAccount) getReferencedServiceAccountToken(serviceAccount *api.Se
// getServiceAccountTokens returns all ServiceAccountToken secrets for the given ServiceAccount
func (s *serviceAccount) getServiceAccountTokens(serviceAccount *api.ServiceAccount) ([]*api.Secret, error) {
tokens, err := s.secretLister.Secrets(serviceAccount.Namespace).List(labels.Everything())
secrets, err := s.secretLister.Secrets(serviceAccount.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
for _, token := range tokens {
if token.Type != api.SecretTypeServiceAccountToken {
tokens := []*api.Secret{}
for _, secret := range secrets {
if secret.Type != api.SecretTypeServiceAccountToken {
continue
}
if serviceaccount.InternalIsServiceAccountToken(token, serviceAccount) {
tokens = append(tokens, token)
if serviceaccount.InternalIsServiceAccountToken(secret, serviceAccount) {
tokens = append(tokens, secret)
}
}
return tokens, nil

View File

@ -300,8 +300,8 @@ func ClusterRoles() []rbac.ClusterRole {
eventsRule(),
rbac.NewRule("create").Groups(legacyGroup).Resources("endpoints", "secrets", "serviceaccounts").RuleOrDie(),
rbac.NewRule("delete").Groups(legacyGroup).Resources("secrets").RuleOrDie(),
rbac.NewRule("get").Groups(legacyGroup).Resources("endpoints", "namespaces", "serviceaccounts").RuleOrDie(),
rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(),
rbac.NewRule("get").Groups(legacyGroup).Resources("endpoints", "namespaces", "secrets", "serviceaccounts").RuleOrDie(),
rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "secrets", "serviceaccounts").RuleOrDie(),
// Needed to check API access. These creates are non-mutating
rbac.NewRule("create").Groups(authenticationGroup).Resources("tokenreviews").RuleOrDie(),

View File

@ -1074,14 +1074,13 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
continue
}
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
catchError(err)
return
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector)
if match {
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
nodeResult = append(nodeResult, matchingPodAntiAffinityTerm{term: &term, node: node})
}
}
@ -1109,8 +1108,7 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [
if err != nil {
return nil, err
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector)
if match {
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
result = append(result, matchingPodAntiAffinityTerm{term: &term, node: existingPodNode})
}
}
@ -1128,17 +1126,17 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
} else {
allPods, err := c.podLister.List(labels.Everything())
if err != nil {
glog.V(10).Infof("Failed to get all pods, %+v", err)
glog.Errorf("Failed to get all pods, %+v", err)
return false
}
if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, allPods); err != nil {
glog.V(10).Infof("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
glog.Errorf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
return false
}
}
for _, term := range matchingTerms {
if len(term.term.TopologyKey) == 0 {
glog.V(10).Infof("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
glog.Error("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
return false
}
if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
@ -1167,7 +1165,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
for _, term := range getPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term)
if err != nil {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAffinityTerm %v, err: %v",
glog.Errorf("Cannot schedule pod %+v onto node %v,because of PodAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return false
}
@ -1183,7 +1181,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
glog.V(10).Infof("Cannot parse selector on term %v for pod %v. Details %v",
glog.Errorf("Cannot parse selector on term %v for pod %v. Details %v",
term, podName(pod), err)
return false
}