Upgrade to k8s v1.5

pull/908/head
Matt Rickard 2016-12-12 18:39:38 -08:00
parent 093ab51fbf
commit 9c3da934a7
104 changed files with 5217 additions and 4916 deletions

2273
Godeps/Godeps.json generated

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ minikube start
--insecure-registry stringSlice Insecure Docker registries to pass to the Docker daemon
--iso-url string Location of the minikube iso (default "https://storage.googleapis.com/minikube/minikube-0.7.iso")
--kubernetes-version string The kubernetes version that the minikube VM will use (ex: v1.2.3)
OR a URI which contains a localkube binary (ex: https://storage.googleapis.com/minikube/k8sReleases/v1.3.0/localkube-linux-amd64) (default "v1.5.0-beta.1")
OR a URI which contains a localkube binary (ex: https://storage.googleapis.com/minikube/k8sReleases/v1.3.0/localkube-linux-amd64) (default "v1.5.0")
--kvm-network string The KVM network name. (only supported with KVM driver) (default "default")
--memory int Amount of RAM allocated to the minikube VM (default 2048)
--network-plugin string The name of the network plugin

View File

@ -369,11 +369,30 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := client("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
// TODO: consider using a list-watch + cache here rather than polling
var gvrFn func() ([]unversioned.GroupVersionResource, error)
rsrcs, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
glog.Fatalf("Failed to get group version resources: %v", err)
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
for _, rsrcList := range rsrcs {
for ix := range rsrcList.APIResources {
rsrc := &rsrcList.APIResources[ix]
if rsrc.Kind == "ThirdPartyResource" {
gvrFn = namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
}
}
}
if gvrFn == nil {
gvr, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
if err != nil {
glog.Fatalf("Failed to get resources: %v", err)
}
gvrFn = func() ([]unversioned.GroupVersionResource, error) {
return gvr, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -98,7 +98,7 @@ func isSysFSWritable() (bool, error) {
for _, mountPoint := range mountPoints {
const sysfsDevice = "sysfs"
if mountPoint.Device != sysfsDevice {
if mountPoint.Type != sysfsDevice {
continue
}
// Check whether sysfs is 'rw'

View File

@ -246,6 +246,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.EvictionPressureTransitionPeriod.Duration, "eviction-pressure-transition-period", s.EvictionPressureTransitionPeriod.Duration, "Duration for which the kubelet has to wait before transitioning out of an eviction pressure condition.")
fs.Int32Var(&s.EvictionMaxPodGracePeriod, "eviction-max-pod-grace-period", s.EvictionMaxPodGracePeriod, "Maximum allowed grace period (in seconds) to use when terminating pods in response to a soft eviction threshold being met. If negative, defer to pod specified value.")
fs.StringVar(&s.EvictionMinimumReclaim, "eviction-minimum-reclaim", s.EvictionMinimumReclaim, "A set of minimum reclaims (e.g. imagefs.available=2Gi) that describes the minimum amount of resource the kubelet will reclaim when performing a pod eviction if that resource is under pressure.")
fs.BoolVar(&s.ExperimentalKernelMemcgNotification, "experimental-kernel-memcg-notification", s.ExperimentalKernelMemcgNotification, "If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.")
fs.Int32Var(&s.PodsPerCore, "pods-per-core", s.PodsPerCore, "Number of Pods per core that can run on this Kubelet. The total number of Pods on this Kubelet cannot exceed max-pods, so max-pods will be used if this calculation results in a larger number of Pods allowed on the Kubelet. A value of 0 disables this limit.")
fs.BoolVar(&s.ProtectKernelDefaults, "protect-kernel-defaults", s.ProtectKernelDefaults, "Default kubelet behaviour for kernel tuning. If set, kubelet errors if any of kernel tunables is different than kubelet defaults.")

View File

@ -80,7 +80,15 @@ func BeforeDelete(strategy RESTDeleteStrategy, ctx api.Context, obj runtime.Obje
// if we are already being deleted, we may only shorten the deletion grace period
// this means the object was gracefully deleted previously but deletionGracePeriodSeconds was not set,
// so we force deletion immediately
if objectMeta.DeletionGracePeriodSeconds == nil {
// IMPORTANT:
// The deletion operation happens in two phases.
// 1. Update to set DeletionGracePeriodSeconds and DeletionTimestamp
// 2. Delete the object from storage.
// If the update succeeds, but the delete fails (network error, internal storage error, etc.),
// a resource was previously left in a state that was non-recoverable. We
// check if the existing stored resource has a grace period as 0 and if so
// attempt to delete immediately in order to recover from this scenario.
if objectMeta.DeletionGracePeriodSeconds == nil || *objectMeta.DeletionGracePeriodSeconds == 0 {
return false, false, nil
}
// only a shorter grace period may be provided by a user

View File

@ -289,6 +289,10 @@ type StorageMetadata interface {
// ProducesMIMETypes returns a list of the MIME types the specified HTTP verb (GET, POST, DELETE,
// PATCH) can respond with.
ProducesMIMETypes(verb string) []string
// ProducesObject returns an object the specified HTTP verb respond with. It will overwrite storage object if
// it is not nil. Only the type of the return object matters, the value will be ignored.
ProducesObject(verb string) interface{}
}
// ConnectRequest is an object passed to admission control for Connect operations

View File

@ -1514,7 +1514,7 @@ message NodeSpec {
optional string providerID = 3;
// Unschedulable controls node schedulability of new pods. By default, node is schedulable.
// More info: http://releases.k8s.io/HEAD/docs/admin/node.md#manual-node-administration"`
// More info: http://releases.k8s.io/HEAD/docs/admin/node.md#manual-node-administration"
// +optional
optional bool unschedulable = 4;
}

View File

@ -2818,7 +2818,7 @@ type NodeSpec struct {
// +optional
ProviderID string `json:"providerID,omitempty" protobuf:"bytes,3,opt,name=providerID"`
// Unschedulable controls node schedulability of new pods. By default, node is schedulable.
// More info: http://releases.k8s.io/HEAD/docs/admin/node.md#manual-node-administration"`
// More info: http://releases.k8s.io/HEAD/docs/admin/node.md#manual-node-administration"
// +optional
Unschedulable bool `json:"unschedulable,omitempty" protobuf:"varint,4,opt,name=unschedulable"`
}

View File

@ -901,7 +901,7 @@ var map_NodeSpec = map[string]string{
"podCIDR": "PodCIDR represents the pod IP range assigned to the node.",
"externalID": "External ID of the node assigned by some machine database (e.g. a cloud provider). Deprecated.",
"providerID": "ID of the node assigned by the cloud provider in the format: <ProviderName>://<ProviderSpecificNodeID>",
"unschedulable": "Unschedulable controls node schedulability of new pods. By default, node is schedulable. More info: http://releases.k8s.io/HEAD/docs/admin/node.md#manual-node-administration\"`",
"unschedulable": "Unschedulable controls node schedulability of new pods. By default, node is schedulable. More info: http://releases.k8s.io/HEAD/docs/admin/node.md#manual-node-administration\"",
}
func (NodeSpec) SwaggerDoc() map[string]string {

View File

@ -2665,9 +2665,6 @@ func ValidateServiceUpdate(service, oldService *api.Service) field.ErrorList {
}
}
// TODO(freehan): allow user to update loadbalancerSourceRanges
allErrs = append(allErrs, ValidateImmutableField(service.Spec.LoadBalancerSourceRanges, oldService.Spec.LoadBalancerSourceRanges, field.NewPath("spec", "loadBalancerSourceRanges"))...)
allErrs = append(allErrs, validateServiceFields(service)...)
allErrs = append(allErrs, validateServiceAnnotations(service, oldService)...)
return allErrs

View File

@ -50,6 +50,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&HorizontalPodAutoscaler{},
&HorizontalPodAutoscalerList{},
&api.ListOptions{},
&api.DeleteOptions{},
)
return nil
}

View File

@ -37,3 +37,16 @@ go_library(
"//vendor:github.com/ugorji/go/codec",
],
)
go_test(
name = "go_default_xtest",
srcs = ["defaults_test.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/install:go_default_library",
"//pkg/apis/autoscaling/install:go_default_library",
"//pkg/apis/autoscaling/v1:go_default_library",
"//pkg/runtime:go_default_library",
],
)

View File

@ -52,6 +52,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&CronJob{},
&CronJobList{},
&api.ListOptions{},
&api.DeleteOptions{},
)
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("ScheduledJob"), &CronJob{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("ScheduledJobList"), &CronJobList{})

File diff suppressed because it is too large Load Diff

View File

@ -424,6 +424,9 @@ type KubeletConfiguration struct {
// Comma-delimited list of minimum reclaims (e.g. imagefs.available=2Gi) that describes the minimum amount of resource the kubelet will reclaim when performing a pod eviction if that resource is under pressure.
// +optional
EvictionMinimumReclaim string `json:"evictionMinimumReclaim,omitempty"`
// If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.
// +optional
ExperimentalKernelMemcgNotification bool `json:"experimentalKernelMemcgNotification"`
// Maximum number of pods per core. Cannot exceed MaxPods
PodsPerCore int32 `json:"podsPerCore"`
// enableControllerAttachDetach enables the Attach/Detach controller to

View File

@ -374,6 +374,9 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
if obj.EvictionPressureTransitionPeriod == zeroDuration {
obj.EvictionPressureTransitionPeriod = unversioned.Duration{Duration: 5 * time.Minute}
}
if obj.ExperimentalKernelMemcgNotification == nil {
obj.ExperimentalKernelMemcgNotification = boolVar(false)
}
if obj.SystemReserved == nil {
obj.SystemReserved = make(map[string]string)
}

View File

@ -462,6 +462,8 @@ type KubeletConfiguration struct {
EvictionMaxPodGracePeriod int32 `json:"evictionMaxPodGracePeriod"`
// Comma-delimited list of minimum reclaims (e.g. imagefs.available=2Gi) that describes the minimum amount of resource the kubelet will reclaim when performing a pod eviction if that resource is under pressure.
EvictionMinimumReclaim string `json:"evictionMinimumReclaim"`
// If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.
ExperimentalKernelMemcgNotification *bool `json:"experimentalKernelMemcgNotification"`
// Maximum number of pods per core. Cannot exceed MaxPods
PodsPerCore int32 `json:"podsPerCore"`
// enableControllerAttachDetach enables the Attach/Detach controller to

View File

@ -387,6 +387,9 @@ func autoConvert_v1alpha1_KubeletConfiguration_To_componentconfig_KubeletConfigu
out.EvictionPressureTransitionPeriod = in.EvictionPressureTransitionPeriod
out.EvictionMaxPodGracePeriod = in.EvictionMaxPodGracePeriod
out.EvictionMinimumReclaim = in.EvictionMinimumReclaim
if err := api.Convert_Pointer_bool_To_bool(&in.ExperimentalKernelMemcgNotification, &out.ExperimentalKernelMemcgNotification, s); err != nil {
return err
}
out.PodsPerCore = in.PodsPerCore
if err := api.Convert_Pointer_bool_To_bool(&in.EnableControllerAttachDetach, &out.EnableControllerAttachDetach, s); err != nil {
return err
@ -556,6 +559,9 @@ func autoConvert_componentconfig_KubeletConfiguration_To_v1alpha1_KubeletConfigu
out.EvictionPressureTransitionPeriod = in.EvictionPressureTransitionPeriod
out.EvictionMaxPodGracePeriod = in.EvictionMaxPodGracePeriod
out.EvictionMinimumReclaim = in.EvictionMinimumReclaim
if err := api.Convert_bool_To_Pointer_bool(&in.ExperimentalKernelMemcgNotification, &out.ExperimentalKernelMemcgNotification, s); err != nil {
return err
}
out.PodsPerCore = in.PodsPerCore
if err := api.Convert_bool_To_Pointer_bool(&in.EnableControllerAttachDetach, &out.EnableControllerAttachDetach, s); err != nil {
return err

View File

@ -403,6 +403,13 @@ func DeepCopy_v1alpha1_KubeletConfiguration(in interface{}, out interface{}, c *
out.EvictionPressureTransitionPeriod = in.EvictionPressureTransitionPeriod
out.EvictionMaxPodGracePeriod = in.EvictionMaxPodGracePeriod
out.EvictionMinimumReclaim = in.EvictionMinimumReclaim
if in.ExperimentalKernelMemcgNotification != nil {
in, out := &in.ExperimentalKernelMemcgNotification, &out.ExperimentalKernelMemcgNotification
*out = new(bool)
**out = **in
} else {
out.ExperimentalKernelMemcgNotification = nil
}
out.PodsPerCore = in.PodsPerCore
if in.EnableControllerAttachDetach != nil {
in, out := &in.EnableControllerAttachDetach, &out.EnableControllerAttachDetach

View File

@ -358,6 +358,7 @@ func DeepCopy_componentconfig_KubeletConfiguration(in interface{}, out interface
out.EvictionPressureTransitionPeriod = in.EvictionPressureTransitionPeriod
out.EvictionMaxPodGracePeriod = in.EvictionMaxPodGracePeriod
out.EvictionMinimumReclaim = in.EvictionMinimumReclaim
out.ExperimentalKernelMemcgNotification = in.ExperimentalKernelMemcgNotification
out.PodsPerCore = in.PodsPerCore
out.EnableControllerAttachDetach = in.EnableControllerAttachDetach
if in.SystemReserved != nil {

View File

@ -124,7 +124,6 @@ type ThirdPartyResource struct {
Description string `json:"description,omitempty"`
// Versions are versions for this third party object
// +optional
Versions []APIVersion `json:"versions,omitempty"`
}
@ -143,7 +142,6 @@ type ThirdPartyResourceList struct {
// TODO: we should consider merge this struct with GroupVersion in unversioned.go
type APIVersion struct {
// Name of this version (e.g. 'v1').
// +optional
Name string `json:"name,omitempty"`
}

View File

@ -68,6 +68,9 @@ func ValidateThirdPartyResource(obj *extensions.ThirdPartyResource) field.ErrorL
allErrs = append(allErrs, apivalidation.ValidateObjectMeta(&obj.ObjectMeta, false, ValidateThirdPartyResourceName, field.NewPath("metadata"))...)
versions := sets.String{}
if len(obj.Versions) == 0 {
allErrs = append(allErrs, field.Required(field.NewPath("versions"), "must specify at least one version"))
}
for ix := range obj.Versions {
version := &obj.Versions[ix]
if len(version.Name) == 0 {

View File

@ -197,7 +197,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if err != nil {
return nil, err
}
versionedObject := indirectArbitraryPointer(versionedPtr)
defaultVersionedObject := indirectArbitraryPointer(versionedPtr)
kind := fqKindToRegister.Kind
hasSubresource := len(subresource) > 0
@ -503,6 +503,10 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Kind: fqKindToRegister,
}
for _, action := range actions {
versionedObject := storageMeta.ProducesObject(action.Verb)
if versionedObject == nil {
versionedObject = defaultVersionedObject
}
reqScope.Namer = action.Namer
namespaced := ""
if apiResource.Namespaced {
@ -1022,6 +1026,10 @@ func (defaultStorageMetadata) ProducesMIMETypes(verb string) []string {
return nil
}
func (defaultStorageMetadata) ProducesObject(verb string) interface{} {
return nil
}
// splitSubresource checks if the given storage path is the path of a subresource and returns
// the resource and subresource components.
func splitSubresource(path string) (string, string, error) {

View File

@ -591,11 +591,11 @@ func patchResource(
if err != nil {
return nil, err
}
currentPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, currentObjectJS, versionedObj, strategicpatch.SMPatchVersionLatest)
currentPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, currentObjectJS, versionedObj)
if err != nil {
return nil, err
}
originalPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, originalPatchedObjJS, versionedObj, strategicpatch.SMPatchVersionLatest)
originalPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
if err != nil {
return nil, err
}

View File

@ -259,12 +259,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
r.setLastSyncResourceVersion(resourceVersion)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
glog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {

View File

@ -244,9 +244,7 @@ func (e *eventLogger) eventObserve(newEvent *api.Event) (*api.Event, []byte, err
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
// TODO: need to figure out if we need to let eventObserve() use the new behavior of StrategicMergePatch.
// Currently default to old behavior now. Ref: issue #35936
patch, err = strategicpatch.CreateStrategicMergePatch(oldData, newData, event, strategicpatch.SMPatchVersion_1_0)
patch, err = strategicpatch.CreateStrategicMergePatch(oldData, newData, event)
}
// record our new observation

View File

@ -213,9 +213,11 @@ func (d *DiscoveryClient) serverPreferredResources(namespaced bool) ([]unversion
const maxRetries = 2
var failedGroups map[unversioned.GroupVersion]error
var results []unversioned.GroupVersionResource
var resources map[unversioned.GroupResource]string
RetrieveGroups:
for i := 0; i < maxRetries; i++ {
results = []unversioned.GroupVersionResource{}
resources = map[unversioned.GroupResource]string{}
failedGroups = make(map[unversioned.GroupVersion]error)
serverGroupList, err := d.ServerGroups()
if err != nil {
@ -223,25 +225,40 @@ RetrieveGroups:
}
for _, apiGroup := range serverGroupList.Groups {
preferredVersion := apiGroup.PreferredVersion
groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: preferredVersion.Version}
apiResourceList, err := d.ServerResourcesForGroupVersion(preferredVersion.GroupVersion)
if err != nil {
if i < maxRetries-1 {
continue RetrieveGroups
}
failedGroups[groupVersion] = err
continue
}
for _, apiResource := range apiResourceList.APIResources {
// ignore the root scoped resources if "namespaced" is true.
if namespaced && !apiResource.Namespaced {
versions := apiGroup.Versions
for _, version := range versions {
groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: version.Version}
apiResourceList, err := d.ServerResourcesForGroupVersion(version.GroupVersion)
if err != nil {
if i < maxRetries-1 {
continue RetrieveGroups
}
failedGroups[groupVersion] = err
continue
}
if strings.Contains(apiResource.Name, "/") {
continue
for _, apiResource := range apiResourceList.APIResources {
// ignore the root scoped resources if "namespaced" is true.
if namespaced && !apiResource.Namespaced {
continue
}
if strings.Contains(apiResource.Name, "/") {
continue
}
gvr := groupVersion.WithResource(apiResource.Name)
if _, ok := resources[gvr.GroupResource()]; ok {
if gvr.Version != apiGroup.PreferredVersion.Version {
continue
}
// remove previous entry, because it will be replaced with a preferred one
for i := range results {
if results[i].GroupResource() == gvr.GroupResource() {
results = append(results[:i], results[i+1:]...)
}
}
}
resources[gvr.GroupResource()] = gvr.Version
results = append(results, gvr)
}
results = append(results, groupVersion.WithResource(apiResource.Name))
}
}
if len(failedGroups) == 0 {

View File

@ -151,9 +151,12 @@ func getPrimaryIPConfig(nic network.Interface) (*network.InterfaceIPConfiguratio
return &((*nic.Properties.IPConfigurations)[0]), nil
}
// we're here because we either have multiple ipconfigs and can't determine the primary:
// https://github.com/Azure/azure-rest-api-specs/issues/305
// or somehow we had zero ipconfigs
for _, ref := range *nic.Properties.IPConfigurations {
if *ref.Properties.Primary {
return &ref, nil
}
}
return nil, fmt.Errorf("failed to determine the determine primary ipconfig. nicname=%q", *nic.Name)
}

View File

@ -520,11 +520,11 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime
if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err)
}
glog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID)
if err := r.KubeClient.Core().Pods(namespace).Delete(podID, nil); err != nil {
r.Recorder.Eventf(object, api.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
return fmt.Errorf("unable to delete pods: %v", err)
} else {
glog.V(4).Infof("Controller %v deleted pod %v", accessor.GetName(), podID)
r.Recorder.Eventf(object, api.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID)
}
return nil

View File

@ -231,6 +231,7 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter
}
errList := []error{}
for _, pod := range podList.Items {
glog.V(2).Infof("CronJob controller is deleting Pod %v/%v", pod.Namespace, pod.Name)
if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil {
// ignores the error when the pod isn't found
if !errors.IsNotFound(err) {

View File

@ -35,6 +35,16 @@ import (
"github.com/golang/glog"
)
const (
// namespaceDeletionGracePeriod is the time period to wait before processing a received namespace event.
// This allows time for the following to occur:
// * lifecycle admission plugins on HA apiservers to also observe a namespace
// deletion and prevent new objects from being created in the terminating namespace
// * non-leader etcd servers to observe last-minute object creations in a namespace
// so this controller's cleanup can actually clean up all objects
namespaceDeletionGracePeriod = 5 * time.Second
)
// NamespaceController is responsible for performing actions dependent upon a namespace phase
type NamespaceController struct {
// client that purges namespace content, must have list/delete privileges on all content
@ -47,8 +57,8 @@ type NamespaceController struct {
controller *cache.Controller
// namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface
// list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResources []unversioned.GroupVersionResource
// function to list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error)
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
opCache *operationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
@ -59,7 +69,7 @@ type NamespaceController struct {
func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
groupVersionResources []unversioned.GroupVersionResource,
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error),
resyncPeriod time.Duration,
finalizerToken api.FinalizerName) *NamespaceController {
@ -86,9 +96,9 @@ func NewNamespaceController(
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
groupVersionResources: groupVersionResources,
opCache: opCache,
finalizerToken: finalizerToken,
groupVersionResourcesFn: groupVersionResourcesFn,
opCache: opCache,
finalizerToken: finalizerToken,
}
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
@ -132,7 +142,9 @@ func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
nm.queue.Add(key)
// delay processing namespace events to allow HA api servers to observe namespace deletion,
// and HA etcd servers to observe last minute object creations inside the namespace
nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
}
// worker processes the queue of namespace objects.
@ -191,7 +203,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
return err
}
namespace := obj.(*api.Namespace)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResources, namespace, nm.finalizerToken)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken)
}
// Run starts observing the system with the specified number of workers.

View File

@ -371,7 +371,7 @@ func syncNamespace(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error),
namespace *api.Namespace,
finalizerToken api.FinalizerName,
) error {
@ -422,6 +422,10 @@ func syncNamespace(
}
// there may still be content for us to remove
groupVersionResources, err := groupVersionResourcesFn()
if err != nil {
return err
}
estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err

View File

@ -118,6 +118,7 @@ func setPodTerminationReason(kubeClient clientset.Interface, pod *api.Pod, nodeN
func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
var zero int64
glog.Infof("NodeController is force deleting Pod: %v:%v", pod.Namespace, pod.Name)
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
if err == nil {
glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)

View File

@ -107,7 +107,7 @@ func (p *petSyncer) Sync(pet *pcb) error {
}
// if pet failed - we need to remove old one because of consistent naming
if exists && realPet.pod.Status.Phase == api.PodFailed {
glog.V(4).Infof("Delete evicted pod %v", realPet.pod.Name)
glog.V(2).Infof("Deleting evicted pod %v/%v", realPet.pod.Namespace, realPet.pod.Name)
if err := p.petClient.Delete(realPet); err != nil {
return err
}
@ -156,7 +156,7 @@ func (p *petSyncer) Delete(pet *pcb) error {
// The returned error will force a requeue.
p.blockingPet = realPet
if !p.isDying(realPet.pod) {
glog.V(4).Infof("StatefulSet %v deleting pet %v", pet.parent.Name, pet.pod.Name)
glog.V(2).Infof("StatefulSet %v deleting pet %v/%v", pet.parent.Name, pet.pod.Namespace, pet.pod.Name)
return p.petClient.Delete(pet)
}
glog.V(4).Infof("StatefulSet %v waiting on pet %v to die in %v", pet.parent.Name, realPet.pod.Name, realPet.pod.DeletionTimestamp)

View File

@ -292,9 +292,12 @@ func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPo
rescaleReason := ""
timestamp := time.Now()
rescale := true
if scale.Spec.Replicas == 0 {
// Autoscaling is disabled for this resource
desiredReplicas = 0
rescale = false
} else if currentReplicas > hpa.Spec.MaxReplicas {
rescaleReason = "Current number of replicas above Spec.MaxReplicas"
desiredReplicas = hpa.Spec.MaxReplicas
@ -360,9 +363,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPo
if desiredReplicas > calculateScaleUpLimit(currentReplicas) {
desiredReplicas = calculateScaleUpLimit(currentReplicas)
}
rescale = shouldScale(hpa, currentReplicas, desiredReplicas, timestamp)
}
rescale := shouldScale(hpa, currentReplicas, desiredReplicas, timestamp)
if rescale {
scale.Spec.Replicas = desiredReplicas
_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(hpa.Spec.ScaleTargetRef.Kind, scale)

View File

@ -68,6 +68,7 @@ func NewPodGC(kubeClient clientset.Interface, podInformer cache.SharedIndexInfor
kubeClient: kubeClient,
terminatedPodThreshold: terminatedPodThreshold,
deletePod: func(namespace, name string) error {
glog.Infof("PodGC is force deleting Pod: %v:%v", namespace, name)
return kubeClient.Core().Pods(namespace).Delete(name, api.NewDeleteOptions(0))
},
}

View File

@ -342,8 +342,19 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
}
}
changedToReady := !api.IsPodReady(oldPod) && api.IsPodReady(curPod)
if curRS := rsc.getPodReplicaSet(curPod); curRS != nil {
rsc.enqueueReplicaSet(curRS)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica. For now, we can fake the
// update by resyncing the controller MinReadySeconds after the it is requeued because
// a Pod transitioned to Ready.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the replica set controller).
if changedToReady && curRS.Spec.MinReadySeconds > 0 {
rsc.enqueueReplicaSetAfter(curRS, time.Duration(curRS.Spec.MinReadySeconds)*time.Second)
}
}
}
@ -397,6 +408,23 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
rsc.queue.Add(key)
}
// obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
// TODO: Handle overlapping replica sets better. Either disallow them at admission time or
// deterministically avoid syncing replica sets that fight over pods. Currently, we only
// ensure that the same replica set is synced for a given pod. When we periodically relist
// all replica sets there will still be some replica instability. One way to handle this is
// by querying the store for all replica sets that this replica set overlaps, as well as all
// replica sets that overlap this ReplicaSet, and sorting them.
rsc.queue.AddAfter(key, after)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rsc *ReplicaSetController) worker() {

View File

@ -401,8 +401,19 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
}
}
changedToReady := !api.IsPodReady(oldPod) && api.IsPodReady(curPod)
if curRC := rm.getPodController(curPod); curRC != nil {
rm.enqueueController(curRC)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replication controller
// thus having its status updated with the newly available replica. For now, we can fake the
// update by resyncing the controller MinReadySeconds after the it is requeued because a Pod
// transitioned to Ready.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the replication controller manager).
if changedToReady && curRC.Spec.MinReadySeconds > 0 {
rm.enqueueControllerAfter(curRC, time.Duration(curRC.Spec.MinReadySeconds)*time.Second)
}
}
}
@ -456,6 +467,23 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) {
rm.queue.Add(key)
}
// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
rm.queue.AddAfter(key, after)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {

View File

@ -430,6 +430,13 @@ func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api
oldService.Spec.Type, newService.Spec.Type)
return true
}
if wantsLoadBalancer(newService) && !reflect.DeepEqual(oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadBalancerSourceRanges", "%v -> %v",
oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges)
return true
}
if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
return true
}

View File

@ -68,6 +68,7 @@ go_test(
"//pkg/runtime:go_default_library",
"//pkg/util/rand:go_default_library",
"//pkg/util/sets:go_default_library",
"//vendor:github.com/davecgh/go-spew/spew",
"//vendor:github.com/golang/glog",
],
)

View File

@ -239,6 +239,23 @@ func (adc *attachDetachController) podDelete(obj interface{}) {
func (adc *attachDetachController) nodeAdd(obj interface{}) {
node, ok := obj.(*api.Node)
// TODO: investigate if nodeName is empty then if we can return
// kubernetes/kubernetes/issues/37777
if node == nil || !ok {
return
}
nodeName := types.NodeName(node.Name)
adc.nodeUpdate(nil, obj)
// kubernetes/kubernetes/issues/37586
// This is to workaround the case when a node add causes to wipe out
// the attached volumes field. This function ensures that we sync with
// the actual status.
adc.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
}
func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
node, ok := newObj.(*api.Node)
// TODO: investigate if nodeName is empty then if we can return
if node == nil || !ok {
return
}
@ -249,15 +266,9 @@ func (adc *attachDetachController) nodeAdd(obj interface{}) {
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName)
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
}
func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
// The flow for update is the same as add.
adc.nodeAdd(newObj)
}
func (adc *attachDetachController) nodeDelete(obj interface{}) {
node, ok := obj.(*api.Node)
if node == nil || !ok {

View File

@ -116,6 +116,9 @@ type ActualStateOfWorld interface {
// since volumes should be removed from this list as soon a detach operation
// is considered, before the detach operation is triggered).
GetVolumesToReportAttached() map[types.NodeName][]api.AttachedVolume
// GetNodesToUpdateStatusFor returns the map of nodeNames to nodeToUpdateStatusFor
GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor
}
// AttachedVolume represents a volume that is attached to a node.
@ -457,6 +460,7 @@ func (asw *actualStateOfWorld) updateNodeStatusUpdateNeeded(nodeName types.NodeN
"Failed to set statusUpdateNeeded to needed %t because nodeName=%q does not exist",
needed,
nodeName)
return
}
nodeToUpdate.statusUpdateNeeded = needed
@ -591,6 +595,10 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][
return volumesToReportAttached
}
func (asw *actualStateOfWorld) GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor {
return asw.nodesToUpdateStatusFor
}
func getAttachedVolume(
attachedVolume *attachedVolume,
nodeAttachedTo *nodeAttachedTo) AttachedVolume {

View File

@ -192,11 +192,11 @@ func (rc *reconciler) reconcile() {
if rc.actualStateOfWorld.VolumeNodeExists(
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
glog.V(1).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
} else {
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(1).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)

View File

@ -59,17 +59,15 @@ type nodeStatusUpdater struct {
}
func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
smPatchVersion, err := strategicpatch.GetServerSupportedSMPatchVersion(nsu.kubeClient.Discovery())
if err != nil {
return err
}
// TODO: investigate right behavior if nodeName is empty
// kubernetes/kubernetes/issues/37777
nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()
for nodeName, attachedVolumes := range nodesToUpdate {
nodeObj, exists, err := nsu.nodeInformer.GetStore().GetByKey(string(nodeName))
if nodeObj == nil || !exists || err != nil {
// If node does not exist, its status cannot be updated, log error and
// reset flag statusUpdateNeeded back to true to indicate this node status
// needs to be udpated again
// needs to be updated again
glog.V(2).Infof(
"Could not update node status. Failed to find node %q in NodeInformer cache. %v",
nodeName,
@ -112,7 +110,7 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
}
patchBytes, err :=
strategicpatch.CreateStrategicMergePatch(oldData, newData, node, smPatchVersion)
strategicpatch.CreateStrategicMergePatch(oldData, newData, node)
if err != nil {
return fmt.Errorf(
"failed to CreateStrategicMergePatch for node %q. %v",
@ -123,7 +121,7 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
_, err = nsu.kubeClient.Core().Nodes().PatchStatus(string(nodeName), patchBytes)
if err != nil {
// If update node status fails, reset flag statusUpdateNeeded back to true
// to indicate this node status needs to be udpated again
// to indicate this node status needs to be updated again
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
return fmt.Errorf(
"failed to kubeClient.Core().Nodes().Patch for node %q. %v",

View File

@ -2898,6 +2898,13 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
Format: "",
},
},
"experimentalKernelMemcgNotification": {
SchemaProps: spec.SchemaProps{
Description: "If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.",
Type: []string{"boolean"},
Format: "",
},
},
"podsPerCore": {
SchemaProps: spec.SchemaProps{
Description: "Maximum number of pods per core. Cannot exceed MaxPods",
@ -3011,7 +3018,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"TypeMeta", "podManifestPath", "syncFrequency", "fileCheckFrequency", "httpCheckFrequency", "manifestURL", "manifestURLHeader", "enableServer", "address", "port", "readOnlyPort", "tlsCertFile", "tlsPrivateKeyFile", "certDirectory", "authentication", "authorization", "hostnameOverride", "podInfraContainerImage", "dockerEndpoint", "rootDirectory", "seccompProfileRoot", "allowPrivileged", "hostNetworkSources", "hostPIDSources", "hostIPCSources", "registryPullQPS", "registryBurst", "eventRecordQPS", "eventBurst", "enableDebuggingHandlers", "minimumGCAge", "maxPerPodContainerCount", "maxContainerCount", "cAdvisorPort", "healthzPort", "healthzBindAddress", "oomScoreAdj", "registerNode", "clusterDomain", "masterServiceNamespace", "clusterDNS", "streamingConnectionIdleTimeout", "nodeStatusUpdateFrequency", "imageMinimumGCAge", "imageGCHighThresholdPercent", "imageGCLowThresholdPercent", "lowDiskSpaceThresholdMB", "volumeStatsAggPeriod", "networkPluginName", "networkPluginMTU", "networkPluginDir", "cniConfDir", "cniBinDir", "volumePluginDir", "containerRuntime", "remoteRuntimeEndpoint", "remoteImageEndpoint", "experimentalMounterPath", "lockFilePath", "exitOnLockContention", "hairpinMode", "babysitDaemons", "maxPods", "nvidiaGPUs", "dockerExecHandlerName", "podCIDR", "resolvConf", "cpuCFSQuota", "containerized", "maxOpenFiles", "reconcileCIDR", "registerSchedulable", "contentType", "kubeAPIQPS", "kubeAPIBurst", "serializeImagePulls", "nodeLabels", "nonMasqueradeCIDR", "enableCustomMetrics", "podsPerCore", "enableControllerAttachDetach", "systemReserved", "kubeReserved", "protectKernelDefaults", "makeIPTablesUtilChains", "iptablesMasqueradeBit", "iptablesDropBit", "featureGates", "experimentalFailSwapOn", "ExperimentalCheckNodeCapabilitiesBeforeMount"},
Required: []string{"TypeMeta", "podManifestPath", "syncFrequency", "fileCheckFrequency", "httpCheckFrequency", "manifestURL", "manifestURLHeader", "enableServer", "address", "port", "readOnlyPort", "tlsCertFile", "tlsPrivateKeyFile", "certDirectory", "authentication", "authorization", "hostnameOverride", "podInfraContainerImage", "dockerEndpoint", "rootDirectory", "seccompProfileRoot", "allowPrivileged", "hostNetworkSources", "hostPIDSources", "hostIPCSources", "registryPullQPS", "registryBurst", "eventRecordQPS", "eventBurst", "enableDebuggingHandlers", "minimumGCAge", "maxPerPodContainerCount", "maxContainerCount", "cAdvisorPort", "healthzPort", "healthzBindAddress", "oomScoreAdj", "registerNode", "clusterDomain", "masterServiceNamespace", "clusterDNS", "streamingConnectionIdleTimeout", "nodeStatusUpdateFrequency", "imageMinimumGCAge", "imageGCHighThresholdPercent", "imageGCLowThresholdPercent", "lowDiskSpaceThresholdMB", "volumeStatsAggPeriod", "networkPluginName", "networkPluginMTU", "networkPluginDir", "cniConfDir", "cniBinDir", "volumePluginDir", "containerRuntime", "remoteRuntimeEndpoint", "remoteImageEndpoint", "lockFilePath", "exitOnLockContention", "hairpinMode", "babysitDaemons", "maxPods", "nvidiaGPUs", "dockerExecHandlerName", "podCIDR", "resolvConf", "cpuCFSQuota", "containerized", "maxOpenFiles", "reconcileCIDR", "registerSchedulable", "contentType", "kubeAPIQPS", "kubeAPIBurst", "serializeImagePulls", "nodeLabels", "nonMasqueradeCIDR", "enableCustomMetrics", "podsPerCore", "enableControllerAttachDetach", "systemReserved", "kubeReserved", "protectKernelDefaults", "makeIPTablesUtilChains", "iptablesMasqueradeBit", "iptablesDropBit", "featureGates"},
},
},
Dependencies: []string{
@ -3540,7 +3547,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"type", "status", "lastUpdateTime", "lastTransitionTime", "reason", "message"},
Required: []string{"type", "status"},
},
},
Dependencies: []string{
@ -3678,7 +3685,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"template", "progressDeadlineSeconds"},
Required: []string{"template"},
},
},
Dependencies: []string{
@ -3737,7 +3744,6 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"conditions"},
},
},
Dependencies: []string{
@ -9751,7 +9757,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
"unschedulable": {
SchemaProps: spec.SchemaProps{
Description: "Unschedulable controls node schedulability of new pods. By default, node is schedulable. More info: http://releases.k8s.io/HEAD/docs/admin/node.md#manual-node-administration\"`",
Description: "Unschedulable controls node schedulability of new pods. By default, node is schedulable. More info: http://releases.k8s.io/HEAD/docs/admin/node.md#manual-node-administration\"",
Type: []string{"boolean"},
Format: "",
},
@ -10577,7 +10583,6 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"photonPersistentDisk"},
},
},
Dependencies: []string{
@ -10685,7 +10690,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"pdID", "fsType"},
Required: []string{"pdID"},
},
},
Dependencies: []string{},
@ -12324,15 +12329,8 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
AdditionalProperties: &spec.SchemaOrBool{
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
Type: []string{"array"},
Items: &spec.SchemaOrArray{
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
Type: []string{"integer"},
Format: "byte",
},
},
},
Type: []string{"string"},
Format: "byte",
},
},
},
@ -13198,7 +13196,6 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"photonPersistentDisk"},
},
},
Dependencies: []string{
@ -14705,6 +14702,13 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
Format: "",
},
},
"experimentalKernelMemcgNotification": {
SchemaProps: spec.SchemaProps{
Description: "If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.",
Type: []string{"boolean"},
Format: "",
},
},
"podsPerCore": {
SchemaProps: spec.SchemaProps{
Description: "Maximum number of pods per core. Cannot exceed MaxPods",
@ -14818,7 +14822,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"TypeMeta", "podManifestPath", "syncFrequency", "fileCheckFrequency", "httpCheckFrequency", "manifestURL", "manifestURLHeader", "enableServer", "address", "port", "readOnlyPort", "tlsCertFile", "tlsPrivateKeyFile", "certDirectory", "authentication", "authorization", "hostnameOverride", "podInfraContainerImage", "dockerEndpoint", "rootDirectory", "seccompProfileRoot", "allowPrivileged", "hostNetworkSources", "hostPIDSources", "hostIPCSources", "registryPullQPS", "registryBurst", "eventRecordQPS", "eventBurst", "enableDebuggingHandlers", "minimumGCAge", "maxPerPodContainerCount", "maxContainerCount", "cAdvisorPort", "healthzPort", "healthzBindAddress", "oomScoreAdj", "registerNode", "clusterDomain", "masterServiceNamespace", "clusterDNS", "streamingConnectionIdleTimeout", "nodeStatusUpdateFrequency", "imageMinimumGCAge", "imageGCHighThresholdPercent", "imageGCLowThresholdPercent", "lowDiskSpaceThresholdMB", "volumeStatsAggPeriod", "networkPluginName", "networkPluginDir", "cniConfDir", "cniBinDir", "networkPluginMTU", "volumePluginDir", "cloudProvider", "cloudConfigFile", "kubeletCgroups", "runtimeCgroups", "systemCgroups", "cgroupRoot", "containerRuntime", "remoteRuntimeEndpoint", "remoteImageEndpoint", "runtimeRequestTimeout", "rktPath", "experimentalMounterPath", "rktAPIEndpoint", "rktStage1Image", "lockFilePath", "exitOnLockContention", "hairpinMode", "babysitDaemons", "maxPods", "nvidiaGPUs", "dockerExecHandlerName", "podCIDR", "resolvConf", "cpuCFSQuota", "containerized", "maxOpenFiles", "reconcileCIDR", "registerSchedulable", "contentType", "kubeAPIQPS", "kubeAPIBurst", "serializeImagePulls", "outOfDiskTransitionFrequency", "nodeIP", "nodeLabels", "nonMasqueradeCIDR", "enableCustomMetrics", "evictionHard", "evictionSoft", "evictionSoftGracePeriod", "evictionPressureTransitionPeriod", "evictionMaxPodGracePeriod", "evictionMinimumReclaim", "podsPerCore", "enableControllerAttachDetach", "systemReserved", "kubeReserved", "protectKernelDefaults", "makeIPTablesUtilChains", "iptablesMasqueradeBit", "iptablesDropBit", "featureGates", "experimentalFailSwapOn", "ExperimentalCheckNodeCapabilitiesBeforeMount"},
Required: []string{"TypeMeta", "podManifestPath", "syncFrequency", "fileCheckFrequency", "httpCheckFrequency", "manifestURL", "manifestURLHeader", "enableServer", "address", "port", "readOnlyPort", "tlsCertFile", "tlsPrivateKeyFile", "certDirectory", "authentication", "authorization", "hostnameOverride", "podInfraContainerImage", "dockerEndpoint", "rootDirectory", "seccompProfileRoot", "allowPrivileged", "hostNetworkSources", "hostPIDSources", "hostIPCSources", "registryPullQPS", "registryBurst", "eventRecordQPS", "eventBurst", "enableDebuggingHandlers", "minimumGCAge", "maxPerPodContainerCount", "maxContainerCount", "cAdvisorPort", "healthzPort", "healthzBindAddress", "oomScoreAdj", "registerNode", "clusterDomain", "masterServiceNamespace", "clusterDNS", "streamingConnectionIdleTimeout", "nodeStatusUpdateFrequency", "imageMinimumGCAge", "imageGCHighThresholdPercent", "imageGCLowThresholdPercent", "lowDiskSpaceThresholdMB", "volumeStatsAggPeriod", "networkPluginName", "networkPluginDir", "cniConfDir", "cniBinDir", "networkPluginMTU", "volumePluginDir", "cloudProvider", "cloudConfigFile", "kubeletCgroups", "runtimeCgroups", "systemCgroups", "cgroupRoot", "containerRuntime", "remoteRuntimeEndpoint", "remoteImageEndpoint", "runtimeRequestTimeout", "rktPath", "rktAPIEndpoint", "rktStage1Image", "lockFilePath", "exitOnLockContention", "hairpinMode", "babysitDaemons", "maxPods", "nvidiaGPUs", "dockerExecHandlerName", "podCIDR", "resolvConf", "cpuCFSQuota", "containerized", "maxOpenFiles", "reconcileCIDR", "registerSchedulable", "contentType", "kubeAPIQPS", "kubeAPIBurst", "serializeImagePulls", "outOfDiskTransitionFrequency", "nodeIP", "nodeLabels", "nonMasqueradeCIDR", "enableCustomMetrics", "evictionHard", "evictionSoft", "evictionSoftGracePeriod", "evictionPressureTransitionPeriod", "evictionMaxPodGracePeriod", "evictionMinimumReclaim", "experimentalKernelMemcgNotification", "podsPerCore", "enableControllerAttachDetach", "systemReserved", "kubeReserved", "protectKernelDefaults", "makeIPTablesUtilChains", "iptablesMasqueradeBit", "iptablesDropBit"},
},
},
Dependencies: []string{
@ -15729,7 +15733,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"type", "status", "lastUpdateTime", "lastTransitionTime", "reason", "message"},
Required: []string{"type", "status"},
},
},
Dependencies: []string{
@ -15870,7 +15874,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"template", "progressDeadlineSeconds"},
Required: []string{"template"},
},
},
Dependencies: []string{
@ -15930,7 +15934,6 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"conditions"},
},
},
Dependencies: []string{
@ -15978,7 +15981,6 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"metadata", "deleteOptions"},
},
},
Dependencies: []string{
@ -16955,7 +16957,6 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"metadata", "spec", "status"},
},
},
Dependencies: []string{
@ -16984,7 +16985,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"metadata", "items"},
Required: []string{"items"},
},
},
Dependencies: []string{
@ -17008,7 +17009,6 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
},
},
},
Required: []string{"minAvailable", "selector"},
},
},
Dependencies: []string{

View File

@ -665,7 +665,8 @@ type PodSandboxConfig struct {
// * runtime/default: the default profile for the container runtime
// * unconfined: unconfined profile, ie, no seccomp sandboxing
// * localhost/<profile-name>: the profile installed to the node's
// local seccomp profile root
// local seccomp profile root. Note that profile root is set in
// kubelet, and it is not passed in CRI yet, see https://issues.k8s.io/36997.
//
// 3. Sysctls
//

View File

@ -255,7 +255,8 @@ message PodSandboxConfig {
// * runtime/default: the default profile for the container runtime
// * unconfined: unconfined profile, ie, no seccomp sandboxing
// * localhost/<profile-name>: the profile installed to the node's
// local seccomp profile root
// local seccomp profile root. Note that profile root is set in
// kubelet, and it is not passed in CRI yet, see https://issues.k8s.io/36997.
//
// 3. Sysctls
//

View File

@ -25,6 +25,7 @@ go_library(
"//pkg/api/unversioned:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/kubelet/api/v1alpha1/stats:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",

View File

@ -24,7 +24,9 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
@ -33,7 +35,7 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
)
// managerImpl implements NodeStabilityManager
// managerImpl implements Manager
type managerImpl struct {
// used to track time
clock clock.Clock
@ -65,6 +67,8 @@ type managerImpl struct {
resourceToNodeReclaimFuncs map[api.ResourceName]nodeReclaimFuncs
// last observations from synchronize
lastObservations signalObservations
// notifiersInitialized indicates if the threshold notifiers have been initialized (i.e. synchronize() has been called once)
notifiersInitialized bool
}
// ensure it implements the required interface
@ -139,6 +143,39 @@ func (m *managerImpl) IsUnderDiskPressure() bool {
return hasNodeCondition(m.nodeConditions, api.NodeDiskPressure)
}
func startMemoryThresholdNotifier(thresholds []Threshold, observations signalObservations, hard bool, handler thresholdNotifierHandlerFunc) error {
for _, threshold := range thresholds {
if threshold.Signal != SignalMemoryAvailable || hard != isHardEvictionThreshold(threshold) {
continue
}
observed, found := observations[SignalMemoryAvailable]
if !found {
continue
}
cgroups, err := cm.GetCgroupSubsystems()
if err != nil {
return err
}
// TODO add support for eviction from --cgroup-root
cgpath, found := cgroups.MountPoints["memory"]
if !found || len(cgpath) == 0 {
return fmt.Errorf("memory cgroup mount point not found")
}
attribute := "memory.usage_in_bytes"
quantity := getThresholdQuantity(threshold.Value, observed.capacity)
usageThreshold := resource.NewQuantity(observed.capacity.Value(), resource.DecimalSI)
usageThreshold.Sub(*quantity)
description := fmt.Sprintf("<%s available", formatThresholdValue(threshold.Value))
memcgThresholdNotifier, err := NewMemCGThresholdNotifier(cgpath, attribute, usageThreshold.String(), description, handler)
if err != nil {
return err
}
go memcgThresholdNotifier.Start(wait.NeverStop)
return nil
}
return nil
}
// synchronize is the main control loop that enforces eviction thresholds.
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) {
// if we have nothing to do, just return
@ -166,8 +203,28 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
return
}
// find the list of thresholds that are met independent of grace period
now := m.clock.Now()
// attempt to create a threshold notifier to improve eviction response time
if m.config.KernelMemcgNotification && !m.notifiersInitialized {
glog.Infof("eviction manager attempting to integrate with kernel memcg notification api")
m.notifiersInitialized = true
// start soft memory notification
err = startMemoryThresholdNotifier(m.config.Thresholds, observations, false, func(desc string) {
glog.Infof("soft memory eviction threshold crossed at %s", desc)
// TODO wait grace period for soft memory limit
m.synchronize(diskInfoProvider, podFunc)
})
if err != nil {
glog.Warningf("eviction manager: failed to create hard memory threshold notifier: %v", err)
}
// start hard memory notification
err = startMemoryThresholdNotifier(m.config.Thresholds, observations, true, func(desc string) {
glog.Infof("hard memory eviction threshold crossed at %s", desc)
m.synchronize(diskInfoProvider, podFunc)
})
if err != nil {
glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err)
}
}
// determine the set of thresholds met independent of grace period
thresholds = thresholdsMet(thresholds, observations, false)
@ -182,6 +239,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations)
// track when a threshold was first observed
now := m.clock.Now()
thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)
// the set of node conditions that are triggered by currently observed thresholds
@ -218,7 +276,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
glog.Warningf("eviction manager: attempting to reclaim %v", resourceToReclaim)
// determine if this is a soft or hard eviction associated with the resource
softEviction := isSoftEviction(thresholds, resourceToReclaim)
softEviction := isSoftEvictionThresholds(thresholds, resourceToReclaim)
// record an event about the resources we are now attempting to reclaim via eviction
m.recorder.Eventf(m.nodeRef, api.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim)

View File

@ -848,18 +848,23 @@ func getStarvedResources(thresholds []Threshold) []api.ResourceName {
}
// isSoftEviction returns true if the thresholds met for the starved resource are only soft thresholds
func isSoftEviction(thresholds []Threshold, starvedResource api.ResourceName) bool {
func isSoftEvictionThresholds(thresholds []Threshold, starvedResource api.ResourceName) bool {
for _, threshold := range thresholds {
if resourceToCheck := signalToResource[threshold.Signal]; resourceToCheck != starvedResource {
continue
}
if threshold.GracePeriod == time.Duration(0) {
if isHardEvictionThreshold(threshold) {
return false
}
}
return true
}
// isSoftEviction returns true if the thresholds met for the starved resource are only soft thresholds
func isHardEvictionThreshold(threshold Threshold) bool {
return threshold.GracePeriod == time.Duration(0)
}
// buildResourceToRankFunc returns ranking functions associated with resources
func buildResourceToRankFunc(withImageFs bool) map[api.ResourceName]rankFunc {
resourceToRankFunc := map[api.ResourceName]rankFunc{

View File

@ -0,0 +1,119 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
/*
#include <sys/eventfd.h>
*/
import "C"
import (
"fmt"
"syscall"
"github.com/golang/glog"
)
type memcgThresholdNotifier struct {
watchfd int
controlfd int
eventfd int
handler thresholdNotifierHandlerFunc
description string
}
var _ ThresholdNotifier = &memcgThresholdNotifier{}
// NewMemCGThresholdNotifier sends notifications when a cgroup threshold
// is crossed (in either direction) for a given cgroup attribute
func NewMemCGThresholdNotifier(path, attribute, threshold, description string, handler thresholdNotifierHandlerFunc) (ThresholdNotifier, error) {
watchfd, err := syscall.Open(fmt.Sprintf("%s/%s", path, attribute), syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
syscall.Close(watchfd)
}
}()
controlfd, err := syscall.Open(fmt.Sprintf("%s/cgroup.event_control", path), syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
syscall.Close(controlfd)
}
}()
efd, err := C.eventfd(0, C.EFD_CLOEXEC)
if err != nil {
return nil, err
}
eventfd := int(efd)
if eventfd < 0 {
err = fmt.Errorf("eventfd call failed")
return nil, err
}
defer func() {
if err != nil {
syscall.Close(eventfd)
}
}()
glog.V(2).Infof("eviction: setting notification threshold to %s", threshold)
config := fmt.Sprintf("%d %d %s", eventfd, watchfd, threshold)
_, err = syscall.Write(controlfd, []byte(config))
if err != nil {
return nil, err
}
return &memcgThresholdNotifier{
watchfd: watchfd,
controlfd: controlfd,
eventfd: eventfd,
handler: handler,
description: description,
}, nil
}
func getThresholdEvents(eventfd int, eventCh chan<- int) {
for {
buf := make([]byte, 8)
_, err := syscall.Read(eventfd, buf)
if err != nil {
return
}
eventCh <- 0
}
}
func (n *memcgThresholdNotifier) Start(stopCh <-chan struct{}) {
eventCh := make(chan int, 1)
go getThresholdEvents(n.eventfd, eventCh)
for {
select {
case <-stopCh:
glog.V(2).Infof("eviction: stopping threshold notifier")
syscall.Close(n.watchfd)
syscall.Close(n.controlfd)
syscall.Close(n.eventfd)
close(eventCh)
return
case <-eventCh:
glog.V(2).Infof("eviction: threshold crossed")
n.handler(n.description)
}
}
}

View File

@ -0,0 +1,27 @@
// +build !linux
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
import "fmt"
// NewMemCGThresholdNotifier sends notifications when a cgroup threshold
// is crossed (in either direction) for a given cgroup attribute
func NewMemCGThresholdNotifier(path, attribute, threshold, description string, handler thresholdNotifierHandlerFunc) (ThresholdNotifier, error) {
return nil, fmt.Errorf("threshold notification not supported")
}

View File

@ -69,6 +69,8 @@ type Config struct {
MaxPodGracePeriodSeconds int64
// Thresholds define the set of conditions monitored to trigger eviction.
Thresholds []Threshold
// KernelMemcgNotification if true will integrate with the kernel memcg notification to determine if memory thresholds are crossed.
KernelMemcgNotification bool
}
// ThresholdValue is a value holder that abstracts literal versus percentage based quantity
@ -161,3 +163,11 @@ type nodeReclaimFunc func() (*resource.Quantity, error)
// nodeReclaimFuncs is an ordered list of nodeReclaimFunc
type nodeReclaimFuncs []nodeReclaimFunc
// thresholdNotifierHandlerFunc is a function that takes action in response to a crossed threshold
type thresholdNotifierHandlerFunc func(thresholdDescription string)
// ThresholdNotifier notifies the user when an attribute crosses a threshold value
type ThresholdNotifier interface {
Start(stopCh <-chan struct{})
}

View File

@ -353,6 +353,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: kubeCfg.ExperimentalKernelMemcgNotification,
}
reservation, err := ParseReservation(kubeCfg.KubeReserved, kubeCfg.SystemReserved)
@ -1850,8 +1851,8 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip housekeeping, as we may
// accidentally delete pods from unready sources.
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
glog.V(4).Infof("SyncLoop (housekeeping)")
@ -1910,22 +1911,32 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
if kubepod.IsMirrorPod(pod) {
kl.podManager.AddPod(pod)
kl.handleMirrorPod(pod, start)
continue
}
// Note that allPods excludes the new pod.
allPods := kl.podManager.GetPods()
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(allPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not
// terminated.
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
kl.podManager.AddPod(pod)
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)

View File

@ -298,7 +298,7 @@ func (kl *Kubelet) syncNodeStatus() {
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(); err != nil {
if err := kl.tryUpdateNodeStatus(i); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
@ -309,20 +309,23 @@ func (kl *Kubelet) updateNodeStatus() error {
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
// In large clusters, GET and PUT operations on Node objects coming
// from here are the majority of load on apiserver and etcd.
// To reduce the load on etcd, we are serving GET operations from
// apiserver cache (the data might be slightly delayed but it doesn't
// seem to cause more confilict - the delays are pretty small).
// If it result in a conflict, all retries are served directly from etcd.
// TODO: Currently apiserver doesn't support serving GET operations
// from its cache. Thus we are hacking it by issuing LIST with
// field selector for the name of the node (field selectors with
// specified name are handled efficiently by apiserver). Once
// apiserver supports GET from cache, change it here.
opts := api.ListOptions{
FieldSelector: fields.Set{"metadata.name": string(kl.nodeName)}.AsSelector(),
ResourceVersion: "0",
FieldSelector: fields.Set{"metadata.name": string(kl.nodeName)}.AsSelector(),
}
if tryNumber == 0 {
opts.ResourceVersion = "0"
}
nodes, err := kl.kubeClient.Core().Nodes().List(opts)
if err != nil {

View File

@ -25,7 +25,6 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
@ -105,24 +104,12 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(
glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up", uid)
continue
}
// Check whether volume is still mounted on disk. If so, do not delete directory
// If there are still volume directories, do not delete directory
volumePaths, err := kl.getPodVolumePathListFromDisk(uid)
if err != nil {
if err != nil || len(volumePaths) > 0 {
glog.Errorf("Orphaned pod %q found, but error %v occured during reading volume dir from disk", uid, err)
continue
} else if len(volumePaths) > 0 {
for _, path := range volumePaths {
notMount, err := mount.IsNotMountPoint(path)
if err == nil && notMount {
glog.V(2).Infof("Volume path %q is no longer mounted, remove it", path)
os.Remove(path)
} else {
glog.Errorf("Orphaned pod %q found, but it might still mounted with error %v", uid, err)
}
}
continue
}
glog.V(3).Infof("Orphaned pod %q found, removing", uid)
if err := os.RemoveAll(kl.getPodDir(uid)); err != nil {
glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err)

View File

@ -370,7 +370,7 @@ func (h *handler) cleanupHostportMap(containerPortMap map[api.ContainerPort]targ
for containerPort := range containerPortMap {
hp := hostport{
port: containerPort.HostPort,
protocol: string(containerPort.Protocol),
protocol: strings.ToLower(string(containerPort.Protocol)),
}
currentHostports[hp] = true
}
@ -379,6 +379,7 @@ func (h *handler) cleanupHostportMap(containerPortMap map[api.ContainerPort]targ
for hp, socket := range h.hostPortMap {
if _, ok := currentHostports[hp]; !ok {
socket.Close()
glog.V(3).Infof("Closed local port %s", hp.String())
delete(h.hostPortMap, hp)
}
}

View File

@ -81,7 +81,7 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
glog.Errorf("Failed to parse a pod full name %q", podFullName)
return err
}
glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
glog.V(2).Infof("Deleting a mirror pod %q", podFullName)
// TODO(random-liu): Delete the mirror pod with uid precondition in mirror pod manager
if err := mc.apiserverClient.Core().Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil && !errors.IsNotFound(err) {
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)

View File

@ -438,6 +438,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
deleteOptions := api.NewDeleteOptions(0)
// Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.
deleteOptions.Preconditions = api.NewUIDPreconditions(string(pod.UID))
glog.V(2).Infof("Removing Pod %q from etcd", format.Pod(pod))
if err = m.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
m.deletePodStatus(uid)

View File

@ -129,10 +129,18 @@ func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
}
}
func isPodTerminated(pod *api.Pod) bool {
return pod.Status.Phase == api.PodFailed || pod.Status.Phase == api.PodSucceeded
}
// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
for _, pod := range dswp.podManager.GetPods() {
if isPodTerminated(pod) {
// Do not (re)add volumes for terminated pods
continue
}
dswp.processPodVolumes(pod)
}
}
@ -144,9 +152,18 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
runningPodsFetched := false
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
if _, podExists :=
dswp.podManager.GetPodByUID(volumeToMount.Pod.UID); podExists {
continue
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
if podExists {
// Skip running pods
if !isPodTerminated(pod) {
continue
}
// Skip non-memory backed volumes belonging to terminated pods
volume := volumeToMount.VolumeSpec.Volume
if (volume.EmptyDir == nil || volume.EmptyDir.Medium != api.StorageMediumMemory) &&
volume.ConfigMap == nil && volume.Secret == nil {
continue
}
}
// Once a pod has been deleted from kubelet pod manager, do not delete

View File

@ -246,6 +246,9 @@ func (m *ThirdPartyResourceServer) InstallThirdPartyResource(rsrc *extensions.Th
if err != nil {
return err
}
if len(rsrc.Versions) == 0 {
return fmt.Errorf("ThirdPartyResource %s has no defined versions", rsrc.Name)
}
plural, _ := meta.KindToResource(unversioned.GroupVersionKind{
Group: group,
Version: rsrc.Versions[0].Name,

View File

@ -138,7 +138,7 @@ type serviceInfo struct {
nodePort int
loadBalancerStatus api.LoadBalancerStatus
sessionAffinityType api.ServiceAffinity
stickyMaxAgeSeconds int
stickyMaxAgeMinutes int
externalIPs []string
loadBalancerSourceRanges []string
onlyNodeLocalEndpoints bool
@ -155,7 +155,7 @@ type endpointsInfo struct {
func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
return &serviceInfo{
sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API.
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
}
}
@ -388,6 +388,9 @@ func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port
if info.onlyNodeLocalEndpoints != onlyNodeLocalEndpoints {
return false
}
if !reflect.DeepEqual(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) {
return false
}
return true
}
@ -1165,7 +1168,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(svcChain),
"-m", "comment", "--comment", svcName.String(),
"-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeSeconds), "--reap",
"--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap",
"-j", string(endpointChain))
}
}

View File

@ -26,6 +26,7 @@ go_library(
"//pkg/api/validation:go_default_library",
"//pkg/apis/policy:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/policy/internalversion:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/kubelet/client:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/registry/cachesize:go_default_library",
@ -35,6 +36,7 @@ go_library(
"//pkg/registry/generic/registry:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/storage:go_default_library",
"//pkg/util/wait:go_default_library",
],
)

View File

@ -21,13 +21,16 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/policy"
policyclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion"
"k8s.io/kubernetes/pkg/client/retry"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
const (
@ -40,6 +43,15 @@ const (
MaxDisruptedPodSize = 2000
)
// EvictionsRetry is the retry for a conflict where multiple clients
// are making changes to the same resource.
var EvictionsRetry = wait.Backoff{
Steps: 20,
Duration: 500 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
func newEvictionStorage(store *registry.Store, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) *EvictionREST {
return &EvictionREST{store: store, podDisruptionBudgetClient: podDisruptionBudgetClient}
}
@ -66,41 +78,58 @@ func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Obje
return nil, err
}
pod := obj.(*api.Pod)
pdbs, err := r.getPodDisruptionBudgets(ctx, pod)
var rtStatus *unversioned.Status
var pdbName string
err = retry.RetryOnConflict(EvictionsRetry, func() error {
pdbs, err := r.getPodDisruptionBudgets(ctx, pod)
if err != nil {
return err
}
if len(pdbs) > 1 {
rtStatus = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support.",
Code: 500,
}
return nil
} else if len(pdbs) == 1 {
pdb := pdbs[0]
pdbName = pdb.Name
// Try to verify-and-decrement
// If it was false already, or if it becomes false during the course of our retries,
// raise an error marked as a 429.
ok, err := r.checkAndDecrement(pod.Namespace, pod.Name, pdb)
if err != nil {
return err
}
if !ok {
rtStatus = &unversioned.Status{
Status: unversioned.StatusFailure,
// TODO(mml): Include some more details about why the eviction is disallowed.
// Ideally any such text is generated by the DisruptionController (offline).
Message: "Cannot evict pod as it would violate the pod's disruption budget.",
Code: 429,
// TODO(mml): Add a Retry-After header. Once there are time-based
// budgets, we can sometimes compute a sensible suggested value. But
// even without that, we can give a suggestion (10 minutes?) that
// prevents well-behaved clients from hammering us.
}
}
}
return nil
})
if err == wait.ErrWaitTimeout {
err = errors.NewTimeoutError(fmt.Sprintf("couldn't update PodDisruptionBudget %q due to conflicts", pdbName), 10)
}
if err != nil {
return nil, err
}
if len(pdbs) > 1 {
return &unversioned.Status{
Status: unversioned.StatusFailure,
Message: "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support.",
Code: 500,
}, nil
} else if len(pdbs) == 1 {
pdb := pdbs[0]
// Try to verify-and-decrement
// If it was false already, or if it becomes false during the course of our retries,
// raise an error marked as a 429.
ok, err := r.checkAndDecrement(pod.Namespace, pod.Name, pdb)
if err != nil {
return nil, err
}
if !ok {
return &unversioned.Status{
Status: unversioned.StatusFailure,
// TODO(mml): Include some more details about why the eviction is disallowed.
// Ideally any such text is generated by the DisruptionController (offline).
Message: "Cannot evict pod as it would violate the pod's disruption budget.",
Code: 429,
// TODO(mml): Add a Retry-After header. Once there are time-based
// budgets, we can sometimes compute a sensible suggested value. But
// even without that, we can give a suggestion (10 minutes?) that
// prevents well-behaved clients from hammering us.
}, nil
}
if rtStatus != nil {
return rtStatus, nil
}
// At this point there was either no PDB or we succeded in decrementing
@ -115,15 +144,16 @@ func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Obje
return &unversioned.Status{Status: unversioned.StatusSuccess}, nil
}
// checkAndDecrement checks if the provided PodDisruptionBudget allows any disruption.
func (r *EvictionREST) checkAndDecrement(namespace string, podName string, pdb policy.PodDisruptionBudget) (ok bool, err error) {
if pdb.Status.ObservedGeneration != pdb.Generation {
if pdb.Status.ObservedGeneration < pdb.Generation {
return false, nil
}
if pdb.Status.PodDisruptionsAllowed < 0 {
return false, fmt.Errorf("pdb disruptions allowed is negative")
return false, errors.NewForbidden(policy.Resource("poddisruptionbudget"), pdb.Name, fmt.Errorf("pdb disruptions allowed is negative"))
}
if len(pdb.Status.DisruptedPods) > MaxDisruptedPodSize {
return false, fmt.Errorf("DisrputedPods map too big - too many evictions not confirmed by PDB controller")
return false, errors.NewForbidden(policy.Resource("poddisruptionbudget"), pdb.Name, fmt.Errorf("DisrputedPods map too big - too many evictions not confirmed by PDB controller"))
}
if pdb.Status.PodDisruptionsAllowed == 0 {
return false, nil
@ -144,18 +174,18 @@ func (r *EvictionREST) checkAndDecrement(namespace string, podName string, pdb p
return true, nil
}
// Returns any PDBs that match the pod.
// err is set if there's an error.
func (r *EvictionREST) getPodDisruptionBudgets(ctx api.Context, pod *api.Pod) (pdbs []policy.PodDisruptionBudget, err error) {
// getPodDisruptionBudgets returns any PDBs that match the pod or err if there's an error.
func (r *EvictionREST) getPodDisruptionBudgets(ctx api.Context, pod *api.Pod) ([]policy.PodDisruptionBudget, error) {
if len(pod.Labels) == 0 {
return
return nil, nil
}
pdbList, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(pod.Namespace).List(api.ListOptions{})
if err != nil {
return
return nil, err
}
var pdbs []policy.PodDisruptionBudget
for _, pdb := range pdbList.Items {
if pdb.Namespace != pod.Namespace {
continue

View File

@ -55,6 +55,11 @@ func (r *LogREST) ProducesMIMETypes(verb string) []string {
}
}
// LogREST implements StorageMetadata, return string as the generating object
func (r *LogREST) ProducesObject(verb string) interface{} {
return ""
}
// Get retrieves a runtime.Object that will stream the contents of the pod log
func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtime.Object, error) {
logOpts, ok := opts.(*api.PodLogOptions)

View File

@ -105,7 +105,7 @@ func (c *Repair) runOnce() error {
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
list, err := c.registry.ListServices(ctx, nil)
list, err := c.registry.ListServices(ctx, &api.ListOptions{})
if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}

View File

@ -94,7 +94,7 @@ func (c *Repair) runOnce() error {
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
list, err := c.registry.ListServices(ctx, nil)
list, err := c.registry.ListServices(ctx, &api.ListOptions{})
if err != nil {
return fmt.Errorf("unable to refresh the port block: %v", err)
}

View File

@ -202,7 +202,8 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object,
// ListPredicate returns a list of all the items matching m.
func (e *Store) ListPredicate(ctx api.Context, p storage.SelectionPredicate, options *api.ListOptions) (runtime.Object, error) {
if options == nil {
options = &api.ListOptions{ResourceVersion: "0"}
// By default we should serve the request from etcd.
options = &api.ListOptions{ResourceVersion: ""}
}
list := e.NewListFunc()
if name, ok := p.MatchesSingle(); ok {

View File

@ -531,17 +531,23 @@ func (c *Cacher) dispatchEvents() {
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
// TODO: For now we assume we have a given <timeout> budget for dispatching
// a single event. We should consider changing to the approach with:
// - budget has upper bound at <max_timeout>
// - we add <portion> to current timeout every second
timeout := time.Duration(250) * time.Millisecond
c.Lock()
defer c.Unlock()
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
watcher.add(event)
watcher.add(event, &timeout)
}
if supported {
// Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event)
watcher.add(event, &timeout)
}
}
} else {
@ -554,7 +560,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// Iterate over watchers interested in exact values for all values.
for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers {
watcher.add(event)
watcher.add(event, &timeout)
}
}
}
@ -728,7 +734,7 @@ func (c *cacheWatcher) stop() {
var timerPool sync.Pool
func (c *cacheWatcher) add(event *watchCacheEvent) {
func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
// Try to send the event immediately, without blocking.
select {
case c.input <- *event:
@ -736,20 +742,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
default:
}
// OK, block sending, but only for up to 5 seconds.
// OK, block sending, but only for up to <timeout>.
// cacheWatcher.add is called very often, so arrange
// to reuse timers instead of constantly allocating.
trace := util.NewTrace(
fmt.Sprintf("cacheWatcher %v: waiting for add (initial result size %v)",
reflect.TypeOf(event.Object).String(), len(c.result)))
defer trace.LogIfLong(50 * time.Millisecond)
startTime := time.Now()
const timeout = 5 * time.Second
t, ok := timerPool.Get().(*time.Timer)
if ok {
t.Reset(timeout)
t.Reset(*timeout)
} else {
t = time.NewTimer(timeout)
t = time.NewTimer(*timeout)
}
defer timerPool.Put(t)
@ -768,6 +770,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
c.forget(false)
c.stop()
}
if *timeout = *timeout - time.Since(startTime); *timeout < 0 {
*timeout = 0
}
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!

View File

@ -20,7 +20,7 @@ go_library(
deps = [
"//pkg/util/clock:go_default_library",
"//pkg/util/integer:go_default_library",
"//pkg/util/ratelimit:go_default_library",
"//vendor:github.com/juju/ratelimit",
],
)

View File

@ -19,7 +19,7 @@ package flowcontrol
import (
"sync"
"k8s.io/kubernetes/pkg/util/ratelimit"
"github.com/juju/ratelimit"
)
type RateLimiter interface {

View File

@ -374,6 +374,12 @@ func (runner *runner) checkRule(table Table, chain Chain, args ...string) (bool,
}
}
var hexnumRE = regexp.MustCompile("0x0+([0-9])")
func trimhex(s string) string {
return hexnumRE.ReplaceAllString(s, "0x$1")
}
// Executes the rule check without using the "-C" flag, instead parsing iptables-save.
// Present for compatibility with <1.4.11 versions of iptables. This is full
// of hack and half-measures. We should nix this ASAP.
@ -392,6 +398,7 @@ func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...st
var argsCopy []string
for i := range args {
tmpField := strings.Trim(args[i], "\"")
tmpField = trimhex(tmpField)
argsCopy = append(argsCopy, strings.Fields(tmpField)...)
}
argset := sets.NewString(argsCopy...)
@ -409,6 +416,7 @@ func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...st
// Just remove all quotes.
for i := range fields {
fields[i] = strings.Trim(fields[i], "\"")
fields[i] = trimhex(fields[i])
}
// TODO: This misses reorderings e.g. "-x foo ! -y bar" will match "! -x foo -y bar"

View File

@ -30,7 +30,8 @@ import (
const (
// Default mount command if mounter path is not specified
defaultMountCommand = "mount"
defaultMountCommand = "mount"
MountsInGlobalPDPath = "mounts"
)
type Interface interface {
@ -189,9 +190,15 @@ func getDeviceNameFromMount(mounter Interface, mountPath, pluginDir string) (str
glog.V(4).Infof("Directory %s is not mounted", mountPath)
return "", fmt.Errorf("directory %s is not mounted", mountPath)
}
basemountPath := path.Join(pluginDir, MountsInGlobalPDPath)
for _, ref := range refs {
if strings.HasPrefix(ref, pluginDir) {
return path.Base(ref), nil
if strings.HasPrefix(ref, basemountPath) {
volumeID, err := filepath.Rel(basemountPath, ref)
if err != nil {
glog.Errorf("Failed to get volume id from mount %s - %v", mountPath, err)
return "", err
}
return volumeID, nil
}
}

View File

@ -74,9 +74,9 @@ func (mounter *Mounter) Mount(source string, target string, fstype string, optio
}
return doMount(mounterPath, source, target, fstype, bindRemountOpts)
}
// These filesystem types are expected to be supported by the mount utility on the host across all Linux distros.
var defaultMounterFsTypes = sets.NewString("tmpfs", "ext4", "ext3", "ext2")
if !defaultMounterFsTypes.Has(fstype) {
// The list of filesystems that require containerized mounter on GCI image cluster
fsTypesNeedMounter := sets.NewString("nfs", "glusterfs")
if fsTypesNeedMounter.Has(fstype) {
mounterPath = mounter.mounterPath
}
return doMount(mounterPath, source, target, fstype, options)

View File

@ -23,8 +23,6 @@ import (
"time"
)
var letters = []rune("abcdefghijklmnopqrstuvwxyz0123456789")
var numLetters = len(letters)
var rng = struct {
sync.Mutex
rand *rand.Rand
@ -72,12 +70,16 @@ func Perm(n int) []int {
return rng.rand.Perm(n)
}
// String generates a random alphanumeric string n characters long. This will
// panic if n is less than zero.
// We omit vowels from the set of available characters to reduce the chances
// of "bad words" being formed.
var alphanums = []rune("bcdfghjklmnpqrstvwxz0123456789")
// String generates a random alphanumeric string, without vowels, which is n
// characters long. This will panic if n is less than zero.
func String(length int) string {
b := make([]rune, length)
for i := range b {
b[i] = letters[Intn(numLetters)]
b[i] = alphanums[Intn(len(alphanums))]
}
return string(b)
}

View File

@ -1,25 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["bucket.go"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["bucket_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [],
)

View File

@ -1,170 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ratelimit
import (
"math"
"sync"
"time"
)
// Bucket models a token bucket
type Bucket struct {
unitsPerNano float64
nanosPerUnit float64
capacity int64
mutex sync.Mutex
available int64
lastRefill int64
// fractionalAvailable "buffers" any amounts that flowed into the bucket smaller than one unit
// This lets us retain precision even with pathological refill rates like (1E9 + 1) per second
fractionalAvailable float64
}
// NewBucketWithRate creates a new token bucket, with maximum capacity = initial capacity, and a refill rate of qps
// We use floats for refill calculations, which introduces the possibility of truncation and rounding errors.
// For "sensible" qps values though, is is acceptable: jbeda did some tests here https://play.golang.org/p/LSKUOGz2LG
func NewBucketWithRate(qps float64, capacity int64) *Bucket {
unitsPerNano := qps / 1E9
nanosPerUnit := 1E9 / qps
b := &Bucket{
unitsPerNano: unitsPerNano,
nanosPerUnit: nanosPerUnit,
capacity: capacity,
available: capacity,
lastRefill: time.Now().UnixNano(),
}
return b
}
// Take takes n units from the bucket, reducing the available quantity even below zero,
// but then returns the amount of time we should wait
func (b *Bucket) Take(n int64) time.Duration {
b.mutex.Lock()
defer b.mutex.Unlock()
var d time.Duration
if b.available >= n {
// Fast path when bucket has sufficient availability before refilling
} else {
b.refill()
if b.available < n {
deficit := n - b.available
d = time.Duration(int64(float64(deficit) * b.nanosPerUnit))
}
}
b.available -= n
return d
}
// TakeAvailable immediately takes whatever quantity is available, up to max
func (b *Bucket) TakeAvailable(max int64) int64 {
b.mutex.Lock()
defer b.mutex.Unlock()
var took int64
if b.available >= max {
// Fast path when bucket has sufficient availability before refilling
took = max
} else {
b.refill()
took = b.available
if took < 0 {
took = 0
} else if took > max {
took = max
}
}
if took > 0 {
b.available -= took
}
return took
}
// Wait combines a call to Take with a sleep call
func (b *Bucket) Wait(n int64) {
d := b.Take(n)
if d != 0 {
time.Sleep(d)
}
}
// Capacity returns the maximum capacity of the bucket
func (b *Bucket) Capacity() int64 {
return b.capacity
}
// Available returns the quantity available in the bucket (which may be negative), but does not take it.
// This function is for diagnostic / informational purposes only - the returned capacity may immediately
// be inaccurate if another thread is operating on the bucket concurrently.
func (b *Bucket) Available() int64 {
b.mutex.Lock()
defer b.mutex.Unlock()
b.refill()
return b.available
}
// refill replenishes the bucket based on elapsed time; mutex must be held
func (b *Bucket) refill() {
// Note that we really want a monotonic clock here, but go says no:
// https://github.com/golang/go/issues/12914
now := time.Now().UnixNano()
b.refillAtTimestamp(now)
}
// refillAtTimestamp is the logic of the refill function, for testing
func (b *Bucket) refillAtTimestamp(now int64) {
nanosSinceLastRefill := now - b.lastRefill
if nanosSinceLastRefill <= 0 {
// we really want monotonic
return
}
// Compute units that have flowed into bucket
refillFloat := (float64(nanosSinceLastRefill) * b.unitsPerNano) + b.fractionalAvailable
if refillFloat > float64(b.capacity) {
// float64 > MaxInt64 can be converted to negative int64; side step this
b.available = b.capacity
// Don't worry about the fractional units with huge refill rates
} else {
whole, fraction := math.Modf(refillFloat)
refill := int64(whole)
b.fractionalAvailable = fraction
if refill != 0 {
// Refill with overflow
b.available += refill
if b.available >= b.capacity {
b.available = b.capacity
b.fractionalAvailable = 0
}
}
}
b.lastRefill = now
}

View File

@ -15,7 +15,6 @@ go_library(
srcs = ["patch.go"],
tags = ["automanaged"],
deps = [
"//pkg/client/typed/discovery:go_default_library",
"//pkg/util/json:go_default_library",
"//third_party/forked/golang/json:go_default_library",
"//vendor:github.com/davecgh/go-spew/spew",

View File

@ -21,7 +21,6 @@ import (
"reflect"
"sort"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/util/json"
forkedjson "k8s.io/kubernetes/third_party/forked/golang/json"
@ -39,20 +38,11 @@ import (
// Some of the content of this package was borrowed with minor adaptations from
// evanphx/json-patch and openshift/origin.
type StrategicMergePatchVersion string
const (
directiveMarker = "$patch"
deleteDirective = "delete"
replaceDirective = "replace"
mergeDirective = "merge"
mergePrimitivesListDirective = "mergeprimitiveslist"
// different versions of StrategicMergePatch
SMPatchVersion_1_0 StrategicMergePatchVersion = "v1.0.0"
SMPatchVersion_1_5 StrategicMergePatchVersion = "v1.5.0"
Unknown StrategicMergePatchVersion = "Unknown"
SMPatchVersionLatest = SMPatchVersion_1_5
directiveMarker = "$patch"
deleteDirective = "delete"
replaceDirective = "replace"
mergeDirective = "merge"
)
// IsPreconditionFailed returns true if the provided error indicates
@ -97,7 +87,6 @@ func IsConflict(err error) bool {
var errBadJSONDoc = fmt.Errorf("Invalid JSON document")
var errNoListOfLists = fmt.Errorf("Lists of lists are not supported")
var errNoElementsInSlice = fmt.Errorf("no elements in any of the given slices")
// The following code is adapted from github.com/openshift/origin/pkg/util/jsonmerge.
// Instead of defining a Delta that holds an original, a patch and a set of preconditions,
@ -144,15 +133,15 @@ func RequireMetadataKeyUnchanged(key string) PreconditionFunc {
}
// Deprecated: Use the synonym CreateTwoWayMergePatch, instead.
func CreateStrategicMergePatch(original, modified []byte, dataStruct interface{}, smPatchVersion StrategicMergePatchVersion) ([]byte, error) {
return CreateTwoWayMergePatch(original, modified, dataStruct, smPatchVersion)
func CreateStrategicMergePatch(original, modified []byte, dataStruct interface{}) ([]byte, error) {
return CreateTwoWayMergePatch(original, modified, dataStruct)
}
// CreateTwoWayMergePatch creates a patch that can be passed to StrategicMergePatch from an original
// document and a modified document, which are passed to the method as json encoded content. It will
// return a patch that yields the modified document when applied to the original document, or an error
// if either of the two documents is invalid.
func CreateTwoWayMergePatch(original, modified []byte, dataStruct interface{}, smPatchVersion StrategicMergePatchVersion, fns ...PreconditionFunc) ([]byte, error) {
func CreateTwoWayMergePatch(original, modified []byte, dataStruct interface{}, fns ...PreconditionFunc) ([]byte, error) {
originalMap := map[string]interface{}{}
if len(original) > 0 {
if err := json.Unmarshal(original, &originalMap); err != nil {
@ -172,7 +161,7 @@ func CreateTwoWayMergePatch(original, modified []byte, dataStruct interface{}, s
return nil, err
}
patchMap, err := diffMaps(originalMap, modifiedMap, t, false, false, smPatchVersion)
patchMap, err := diffMaps(originalMap, modifiedMap, t, false, false)
if err != nil {
return nil, err
}
@ -188,7 +177,7 @@ func CreateTwoWayMergePatch(original, modified []byte, dataStruct interface{}, s
}
// Returns a (recursive) strategic merge patch that yields modified when applied to original.
func diffMaps(original, modified map[string]interface{}, t reflect.Type, ignoreChangesAndAdditions, ignoreDeletions bool, smPatchVersion StrategicMergePatchVersion) (map[string]interface{}, error) {
func diffMaps(original, modified map[string]interface{}, t reflect.Type, ignoreChangesAndAdditions, ignoreDeletions bool) (map[string]interface{}, error) {
patch := map[string]interface{}{}
if t.Kind() == reflect.Ptr {
t = t.Elem()
@ -241,7 +230,7 @@ func diffMaps(original, modified map[string]interface{}, t reflect.Type, ignoreC
return nil, err
}
patchValue, err := diffMaps(originalValueTyped, modifiedValueTyped, fieldType, ignoreChangesAndAdditions, ignoreDeletions, smPatchVersion)
patchValue, err := diffMaps(originalValueTyped, modifiedValueTyped, fieldType, ignoreChangesAndAdditions, ignoreDeletions)
if err != nil {
return nil, err
}
@ -259,25 +248,13 @@ func diffMaps(original, modified map[string]interface{}, t reflect.Type, ignoreC
}
if fieldPatchStrategy == mergeDirective {
patchValue, err := diffLists(originalValueTyped, modifiedValueTyped, fieldType.Elem(), fieldPatchMergeKey, ignoreChangesAndAdditions, ignoreDeletions, smPatchVersion)
patchValue, err := diffLists(originalValueTyped, modifiedValueTyped, fieldType.Elem(), fieldPatchMergeKey, ignoreChangesAndAdditions, ignoreDeletions)
if err != nil {
return nil, err
}
if patchValue == nil {
continue
}
switch typedPatchValue := patchValue.(type) {
case []interface{}:
if len(typedPatchValue) > 0 {
patch[key] = typedPatchValue
}
case map[string]interface{}:
if len(typedPatchValue) > 0 {
patch[key] = typedPatchValue
}
default:
return nil, fmt.Errorf("invalid type of patch: %v", reflect.TypeOf(patchValue))
if len(patchValue) > 0 {
patch[key] = patchValue
}
continue
@ -307,7 +284,7 @@ func diffMaps(original, modified map[string]interface{}, t reflect.Type, ignoreC
// Returns a (recursive) strategic merge patch that yields modified when applied to original,
// for a pair of lists with merge semantics.
func diffLists(original, modified []interface{}, t reflect.Type, mergeKey string, ignoreChangesAndAdditions, ignoreDeletions bool, smPatchVersion StrategicMergePatchVersion) (interface{}, error) {
func diffLists(original, modified []interface{}, t reflect.Type, mergeKey string, ignoreChangesAndAdditions, ignoreDeletions bool) ([]interface{}, error) {
if len(original) == 0 {
if len(modified) == 0 || ignoreChangesAndAdditions {
return nil, nil
@ -321,14 +298,12 @@ func diffLists(original, modified []interface{}, t reflect.Type, mergeKey string
return nil, err
}
var patch interface{}
var patch []interface{}
if elementType.Kind() == reflect.Map {
patch, err = diffListsOfMaps(original, modified, t, mergeKey, ignoreChangesAndAdditions, ignoreDeletions, smPatchVersion)
} else if elementType.Kind() == reflect.Slice {
err = errNoListOfLists
} else {
patch, err = diffListsOfScalars(original, modified, ignoreChangesAndAdditions, ignoreDeletions, smPatchVersion)
patch, err = diffListsOfMaps(original, modified, t, mergeKey, ignoreChangesAndAdditions, ignoreDeletions)
} else if !ignoreChangesAndAdditions {
patch, err = diffListsOfScalars(original, modified)
}
if err != nil {
@ -340,23 +315,8 @@ func diffLists(original, modified []interface{}, t reflect.Type, mergeKey string
// Returns a (recursive) strategic merge patch that yields modified when applied to original,
// for a pair of lists of scalars with merge semantics.
func diffListsOfScalars(original, modified []interface{}, ignoreChangesAndAdditions, ignoreDeletions bool, smPatchVersion StrategicMergePatchVersion) (interface{}, error) {
originalScalars := uniqifyAndSortScalars(original)
modifiedScalars := uniqifyAndSortScalars(modified)
switch smPatchVersion {
case SMPatchVersion_1_5:
return diffListsOfScalarsIntoMap(originalScalars, modifiedScalars, ignoreChangesAndAdditions, ignoreDeletions)
case SMPatchVersion_1_0:
return diffListsOfScalarsIntoSlice(originalScalars, modifiedScalars, ignoreChangesAndAdditions, ignoreDeletions)
default:
return nil, fmt.Errorf("Unknown StrategicMergePatchVersion: %v", smPatchVersion)
}
}
func diffListsOfScalarsIntoSlice(originalScalars, modifiedScalars []interface{}, ignoreChangesAndAdditions, ignoreDeletions bool) ([]interface{}, error) {
originalIndex, modifiedIndex := 0, 0
if len(modifiedScalars) == 0 {
func diffListsOfScalars(original, modified []interface{}) ([]interface{}, error) {
if len(modified) == 0 {
// There is no need to check the length of original because there is no way to create
// a patch that deletes a scalar from a list of scalars with merge semantics.
return nil, nil
@ -364,14 +324,18 @@ func diffListsOfScalarsIntoSlice(originalScalars, modifiedScalars []interface{},
patch := []interface{}{}
originalScalars := uniqifyAndSortScalars(original)
modifiedScalars := uniqifyAndSortScalars(modified)
originalIndex, modifiedIndex := 0, 0
loopB:
for ; modifiedIndex < len(modifiedScalars); modifiedIndex++ {
for ; originalIndex < len(originalScalars); originalIndex++ {
originalString := fmt.Sprintf("%v", originalScalars[originalIndex])
modifiedString := fmt.Sprintf("%v", modifiedScalars[modifiedIndex])
originalString := fmt.Sprintf("%v", original[originalIndex])
modifiedString := fmt.Sprintf("%v", modified[modifiedIndex])
if originalString >= modifiedString {
if originalString != modifiedString {
patch = append(patch, modifiedScalars[modifiedIndex])
patch = append(patch, modified[modifiedIndex])
}
continue loopB
@ -385,57 +349,7 @@ loopB:
// Add any remaining items found only in modified
for ; modifiedIndex < len(modifiedScalars); modifiedIndex++ {
patch = append(patch, modifiedScalars[modifiedIndex])
}
return patch, nil
}
func diffListsOfScalarsIntoMap(originalScalars, modifiedScalars []interface{}, ignoreChangesAndAdditions, ignoreDeletions bool) (map[string]interface{}, error) {
originalIndex, modifiedIndex := 0, 0
patch := map[string]interface{}{}
patch[directiveMarker] = mergePrimitivesListDirective
for originalIndex < len(originalScalars) && modifiedIndex < len(modifiedScalars) {
originalString := fmt.Sprintf("%v", originalScalars[originalIndex])
modifiedString := fmt.Sprintf("%v", modifiedScalars[modifiedIndex])
// objects are identical
if originalString == modifiedString {
originalIndex++
modifiedIndex++
continue
}
if originalString > modifiedString {
if !ignoreChangesAndAdditions {
modifiedValue := modifiedScalars[modifiedIndex]
patch[modifiedString] = modifiedValue
}
modifiedIndex++
} else {
if !ignoreDeletions {
patch[originalString] = nil
}
originalIndex++
}
}
// Delete any remaining items found only in original
if !ignoreDeletions {
for ; originalIndex < len(originalScalars); originalIndex++ {
originalString := fmt.Sprintf("%v", originalScalars[originalIndex])
patch[originalString] = nil
}
}
// Add any remaining items found only in modified
if !ignoreChangesAndAdditions {
for ; modifiedIndex < len(modifiedScalars); modifiedIndex++ {
modifiedString := fmt.Sprintf("%v", modifiedScalars[modifiedIndex])
modifiedValue := modifiedScalars[modifiedIndex]
patch[modifiedString] = modifiedValue
}
patch = append(patch, modified[modifiedIndex])
}
return patch, nil
@ -446,7 +360,7 @@ var errBadArgTypeFmt = "expected a %s, but received a %s"
// Returns a (recursive) strategic merge patch that yields modified when applied to original,
// for a pair of lists of maps with merge semantics.
func diffListsOfMaps(original, modified []interface{}, t reflect.Type, mergeKey string, ignoreChangesAndAdditions, ignoreDeletions bool, smPatchVersion StrategicMergePatchVersion) ([]interface{}, error) {
func diffListsOfMaps(original, modified []interface{}, t reflect.Type, mergeKey string, ignoreChangesAndAdditions, ignoreDeletions bool) ([]interface{}, error) {
patch := make([]interface{}, 0)
originalSorted, err := sortMergeListsByNameArray(original, t, mergeKey, false)
@ -492,7 +406,7 @@ loopB:
if originalString >= modifiedString {
if originalString == modifiedString {
// Merge key values are equal, so recurse
patchValue, err := diffMaps(originalMap, modifiedMap, t, ignoreChangesAndAdditions, ignoreDeletions, smPatchVersion)
patchValue, err := diffMaps(originalMap, modifiedMap, t, ignoreChangesAndAdditions, ignoreDeletions)
if err != nil {
return nil, err
}
@ -628,15 +542,7 @@ func mergeMap(original, patch map[string]interface{}, t reflect.Type) (map[strin
return map[string]interface{}{}, nil
}
if v == mergePrimitivesListDirective {
// delete the directiveMarker's key-value pair to avoid delta map and delete map
// overlaping with each other when calculating a ThreeWayDiff for list of Primitives.
// Otherwise, the overlaping will cause it calling LookupPatchMetadata() which will
// return an error since the metadata shows it's a slice but it is actually a map.
delete(original, directiveMarker)
} else {
return nil, fmt.Errorf(errBadPatchTypeFmt, v, patch)
}
return nil, fmt.Errorf(errBadPatchTypeFmt, v, patch)
}
// nil is an accepted value for original to simplify logic in other places.
@ -672,9 +578,7 @@ func mergeMap(original, patch map[string]interface{}, t reflect.Type) (map[strin
// If they're both maps or lists, recurse into the value.
originalType := reflect.TypeOf(original[k])
patchType := reflect.TypeOf(patchV)
// check if we are trying to merge a slice with a map for list of primitives
isMergeSliceOfPrimitivesWithAPatchMap := originalType != nil && patchType != nil && originalType.Kind() == reflect.Slice && patchType.Kind() == reflect.Map
if originalType == patchType || isMergeSliceOfPrimitivesWithAPatchMap {
if originalType == patchType {
// First find the fieldPatchStrategy and fieldPatchMergeKey.
fieldType, fieldPatchStrategy, fieldPatchMergeKey, err := forkedjson.LookupPatchMetadata(t, k)
if err != nil {
@ -696,8 +600,9 @@ func mergeMap(original, patch map[string]interface{}, t reflect.Type) (map[strin
if originalType.Kind() == reflect.Slice && fieldPatchStrategy == mergeDirective {
elemType := fieldType.Elem()
typedOriginal := original[k].([]interface{})
typedPatch := patchV.([]interface{})
var err error
original[k], err = mergeSlice(typedOriginal, patchV, elemType, fieldPatchMergeKey)
original[k], err = mergeSlice(typedOriginal, typedPatch, elemType, fieldPatchMergeKey)
if err != nil {
return nil, err
}
@ -718,34 +623,13 @@ func mergeMap(original, patch map[string]interface{}, t reflect.Type) (map[strin
// Merge two slices together. Note: This may modify both the original slice and
// the patch because getting a deep copy of a slice in golang is highly
// non-trivial.
// The patch could be a map[string]interface{} representing a slice of primitives.
// If the patch map doesn't has the specific directiveMarker (mergePrimitivesListDirective),
// it returns an error. Please check patch_test.go and find the test case named
// "merge lists of scalars for list of primitives" to see what the patch looks like.
// Patch is still []interface{} for all the other types.
func mergeSlice(original []interface{}, patch interface{}, elemType reflect.Type, mergeKey string) ([]interface{}, error) {
t, err := sliceElementType(original)
if err != nil && err != errNoElementsInSlice {
return nil, err
}
if patchMap, ok := patch.(map[string]interface{}); ok {
// We try to merge the original slice with a patch map only when the map has
// a specific directiveMarker. Otherwise, this patch will be treated as invalid.
if directiveValue, ok := patchMap[directiveMarker]; ok && directiveValue == mergePrimitivesListDirective {
return mergeSliceOfScalarsWithPatchMap(original, patchMap)
} else {
return nil, fmt.Errorf("Unable to merge a slice with an invalid map")
}
}
typedPatch := patch.([]interface{})
if len(original) == 0 && len(typedPatch) == 0 {
func mergeSlice(original, patch []interface{}, elemType reflect.Type, mergeKey string) ([]interface{}, error) {
if len(original) == 0 && len(patch) == 0 {
return original, nil
}
// All the values must be of the same type, but not a list.
t, err = sliceElementType(original, typedPatch)
t, err := sliceElementType(original, patch)
if err != nil {
return nil, err
}
@ -754,7 +638,7 @@ func mergeSlice(original []interface{}, patch interface{}, elemType reflect.Type
if t.Kind() != reflect.Map {
// Maybe in the future add a "concat" mode that doesn't
// uniqify.
both := append(original, typedPatch...)
both := append(original, patch...)
return uniqifyScalars(both), nil
}
@ -765,7 +649,7 @@ func mergeSlice(original []interface{}, patch interface{}, elemType reflect.Type
// First look for any special $patch elements.
patchWithoutSpecialElements := []interface{}{}
replace := false
for _, v := range typedPatch {
for _, v := range patch {
typedV := v.(map[string]interface{})
patchType, ok := typedV[directiveMarker]
if ok {
@ -801,10 +685,10 @@ func mergeSlice(original []interface{}, patch interface{}, elemType reflect.Type
return patchWithoutSpecialElements, nil
}
typedPatch = patchWithoutSpecialElements
patch = patchWithoutSpecialElements
// Merge patch into original.
for _, v := range typedPatch {
for _, v := range patch {
// Because earlier we confirmed that all the elements are maps.
typedV := v.(map[string]interface{})
mergeValue, ok := typedV[mergeKey]
@ -837,36 +721,6 @@ func mergeSlice(original []interface{}, patch interface{}, elemType reflect.Type
return original, nil
}
// mergeSliceOfScalarsWithPatchMap merges the original slice with a patch map and
// returns an uniqified and sorted slice of primitives.
// The patch map must have the specific directiveMarker (mergePrimitivesListDirective).
func mergeSliceOfScalarsWithPatchMap(original []interface{}, patch map[string]interface{}) ([]interface{}, error) {
// make sure the patch has the specific directiveMarker ()
if directiveValue, ok := patch[directiveMarker]; ok && directiveValue != mergePrimitivesListDirective {
return nil, fmt.Errorf("Unable to merge a slice with an invalid map")
}
delete(patch, directiveMarker)
output := make([]interface{}, 0, len(original)+len(patch))
for _, value := range original {
valueString := fmt.Sprintf("%v", value)
if v, ok := patch[valueString]; ok {
if v != nil {
output = append(output, v)
}
delete(patch, valueString)
} else {
output = append(output, value)
}
}
for _, value := range patch {
if value != nil {
output = append(output, value)
}
// No action required to delete items that missing from the original slice.
}
return uniqifyAndSortScalars(output), nil
}
// This method no longer panics if any element of the slice is not a map.
func findMapInSliceBasedOnKeyValue(m []interface{}, key string, value interface{}) (map[string]interface{}, int, bool, error) {
for k, v := range m {
@ -1092,7 +946,7 @@ func sliceElementType(slices ...[]interface{}) (reflect.Type, error) {
}
if prevType == nil {
return nil, errNoElementsInSlice
return nil, fmt.Errorf("no elements in any of the given slices")
}
return prevType, nil
@ -1181,10 +1035,6 @@ func mergingMapFieldsHaveConflicts(
if leftMarker != rightMarker {
return true, nil
}
if leftMarker == mergePrimitivesListDirective && rightMarker == mergePrimitivesListDirective {
return false, nil
}
}
// Check the individual keys.
@ -1207,29 +1057,12 @@ func mergingMapFieldsHaveConflicts(
}
func mapsHaveConflicts(typedLeft, typedRight map[string]interface{}, structType reflect.Type) (bool, error) {
isForListOfPrimitives := false
if leftDirective, ok := typedLeft[directiveMarker]; ok {
if rightDirective, ok := typedRight[directiveMarker]; ok {
if leftDirective == mergePrimitivesListDirective && rightDirective == rightDirective {
isForListOfPrimitives = true
}
}
}
for key, leftValue := range typedLeft {
if key != directiveMarker {
if rightValue, ok := typedRight[key]; ok {
var fieldType reflect.Type
var fieldPatchStrategy, fieldPatchMergeKey string
var err error
if isForListOfPrimitives {
fieldType = reflect.TypeOf(leftValue)
fieldPatchStrategy = ""
fieldPatchMergeKey = ""
} else {
fieldType, fieldPatchStrategy, fieldPatchMergeKey, err = forkedjson.LookupPatchMetadata(structType, key)
if err != nil {
return true, err
}
fieldType, fieldPatchStrategy, fieldPatchMergeKey, err := forkedjson.LookupPatchMetadata(structType, key)
if err != nil {
return true, err
}
if hasConflicts, err := mergingMapFieldsHaveConflicts(leftValue, rightValue,
@ -1339,7 +1172,7 @@ func mapsOfMapsHaveConflicts(typedLeft, typedRight map[string]interface{}, struc
// than from original to current. In other words, a conflict occurs if modified changes any key
// in a way that is different from how it is changed in current (e.g., deleting it, changing its
// value).
func CreateThreeWayMergePatch(original, modified, current []byte, dataStruct interface{}, overwrite bool, smPatchVersion StrategicMergePatchVersion, fns ...PreconditionFunc) ([]byte, error) {
func CreateThreeWayMergePatch(original, modified, current []byte, dataStruct interface{}, overwrite bool, fns ...PreconditionFunc) ([]byte, error) {
originalMap := map[string]interface{}{}
if len(original) > 0 {
if err := json.Unmarshal(original, &originalMap); err != nil {
@ -1370,12 +1203,12 @@ func CreateThreeWayMergePatch(original, modified, current []byte, dataStruct int
// from original to modified. To find it, we compute deletions, which are the deletions from
// original to modified, and delta, which is the difference from current to modified without
// deletions, and then apply delta to deletions as a patch, which should be strictly additive.
deltaMap, err := diffMaps(currentMap, modifiedMap, t, false, true, smPatchVersion)
deltaMap, err := diffMaps(currentMap, modifiedMap, t, false, true)
if err != nil {
return nil, err
}
deletionsMap, err := diffMaps(originalMap, modifiedMap, t, true, false, smPatchVersion)
deletionsMap, err := diffMaps(originalMap, modifiedMap, t, true, false)
if err != nil {
return nil, err
}
@ -1395,7 +1228,7 @@ func CreateThreeWayMergePatch(original, modified, current []byte, dataStruct int
// If overwrite is false, and the patch contains any keys that were changed differently,
// then return a conflict error.
if !overwrite {
changedMap, err := diffMaps(originalMap, currentMap, t, false, false, smPatchVersion)
changedMap, err := diffMaps(originalMap, currentMap, t, false, false)
if err != nil {
return nil, err
}
@ -1430,20 +1263,3 @@ func toYAML(v interface{}) (string, error) {
return string(y), nil
}
// GetServerSupportedSMPatchVersion takes a discoveryClient,
// returns the max StrategicMergePatch version supported
func GetServerSupportedSMPatchVersion(discoveryClient discovery.DiscoveryInterface) (StrategicMergePatchVersion, error) {
serverVersion, err := discoveryClient.ServerVersion()
if err != nil {
return Unknown, err
}
serverGitVersion := serverVersion.GitVersion
if serverGitVersion >= string(SMPatchVersion_1_5) {
return SMPatchVersion_1_5, nil
}
if serverGitVersion >= string(SMPatchVersion_1_0) {
return SMPatchVersion_1_0, nil
}
return Unknown, fmt.Errorf("The version is too old: %v\n", serverVersion)
}

View File

@ -25,8 +25,8 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/util/clock:go_default_library",
"//pkg/util/ratelimit:go_default_library",
"//pkg/util/runtime:go_default_library",
"//vendor:github.com/juju/ratelimit",
],
)

View File

@ -21,7 +21,7 @@ import (
"sync"
"time"
"k8s.io/kubernetes/pkg/util/ratelimit"
"github.com/juju/ratelimit"
)
type RateLimiter interface {
@ -35,7 +35,7 @@ type RateLimiter interface {
}
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
// both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),

View File

@ -39,8 +39,8 @@ var (
// them irrelevant. (Next we'll take it out, which may muck with
// scripts consuming the kubectl version output - but most of
// these should be looking at gitVersion already anyways.)
gitMajor string = "1" // major version, always numeric
gitMinor string = "5+" // minor version, numeric possibly followed by "+"
gitMajor string = "1" // major version, always numeric
gitMinor string = "5" // minor version, numeric possibly followed by "+"
// semantic version, derived by build scripts (see
// https://github.com/kubernetes/kubernetes/blob/master/docs/design/versioning.md
@ -51,7 +51,7 @@ var (
// semantic version is a git hash, but the version itself is no
// longer the direct output of "git describe", but a slight
// translation to be semver compliant.
gitVersion string = "v1.5.0-beta.1+$Format:%h$"
gitVersion string = "v1.5.0+$Format:%h$"
gitCommit string = "$Format:%H$" // sha1 from git, output of $(git rev-parse HEAD)
gitTreeState string = "not a git tree" // state of git tree, either "clean" or "dirty"

View File

@ -51,6 +51,7 @@ var _ volume.ProvisionableVolumePlugin = &awsElasticBlockStorePlugin{}
const (
awsElasticBlockStorePluginName = "kubernetes.io/aws-ebs"
awsURLNamePrefix = "aws://"
)
func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
@ -189,10 +190,36 @@ func getVolumeSource(
func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
volumeID, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
if err != nil {
return nil, err
}
// This is a workaround to fix the issue in converting aws volume id from globalPDPath
// There are three aws volume id formats and their volumeID from GetDeviceNameFromMount() are:
// aws:///vol-1234 (aws/vol-1234)
// aws://us-east-1/vol-1234 (aws/us-east-1/vol-1234)
// vol-1234 (vol-1234)
// This code is for converting volume id to aws style volume id for the first two cases.
sourceName := volumeID
if strings.HasPrefix(volumeID, "aws/") {
names := strings.Split(volumeID, "/")
length := len(names)
if length < 2 || length > 3 {
return nil, fmt.Errorf("Failed to get AWS volume id from mount path %q: invalid volume name format %q", mountPath, volumeID)
}
volName := names[length-1]
if !strings.HasPrefix(volName, "vol-") {
return nil, fmt.Errorf("Invalid volume name format for AWS volume (%q) retrieved from mount path %q", volName, mountPath)
}
if length == 2 {
sourceName = awsURLNamePrefix + "" + "/" + volName // empty zone label
}
if length == 3 {
sourceName = awsURLNamePrefix + names[1] + "/" + volName // names[1] is the zone label
}
glog.V(4).Infof("Convert aws volume name from %q to %q ", volumeID, sourceName)
}
awsVolume := &api.Volume{
Name: volName,
VolumeSource: api.VolumeSource{
@ -324,12 +351,12 @@ func makeGlobalPDPath(host volume.VolumeHost, volumeID aws.KubernetesVolumeID) s
// Clean up the URI to be more fs-friendly
name := string(volumeID)
name = strings.Replace(name, "://", "/", -1)
return path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts", name)
return path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), mount.MountsInGlobalPDPath, name)
}
// Reverses the mapping done in makeGlobalPDPath
func getVolumeIDFromGlobalMount(host volume.VolumeHost, globalPath string) (string, error) {
basePath := path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts")
basePath := path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), mount.MountsInGlobalPDPath)
rel, err := filepath.Rel(basePath, globalPath)
if err != nil {
glog.Errorf("Failed to get volume id from global mount %s - %v", globalPath, err)

View File

@ -293,7 +293,7 @@ func (b *azureDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
}
func makeGlobalPDPath(host volume.VolumeHost, volume string) string {
return path.Join(host.GetPluginDir(azureDataDiskPluginName), "mounts", volume)
return path.Join(host.GetPluginDir(azureDataDiskPluginName), mount.MountsInGlobalPDPath, volume)
}
func (azure *azureDisk) GetPath() string {

View File

@ -365,7 +365,7 @@ func (b *cinderVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
}
func makeGlobalPDName(host volume.VolumeHost, devName string) string {
return path.Join(host.GetPluginDir(cinderVolumePluginName), "mounts", devName)
return path.Join(host.GetPluginDir(cinderVolumePluginName), mount.MountsInGlobalPDPath, devName)
}
func (cd *cinderVolume) GetPath() string {

View File

@ -314,7 +314,7 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
}
func makeGlobalPDName(host volume.VolumeHost, devName string) string {
return path.Join(host.GetPluginDir(gcePersistentDiskPluginName), "mounts", devName)
return path.Join(host.GetPluginDir(gcePersistentDiskPluginName), mount.MountsInGlobalPDPath, devName)
}
func (b *gcePersistentDiskMounter) GetPath() string {

View File

@ -29,7 +29,6 @@ go_library(
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/heketi/heketi/client/api/go-client",
"//vendor:github.com/heketi/heketi/pkg/glusterfs/api",

View File

@ -18,12 +18,9 @@ package glusterfs
import (
"fmt"
"math/rand"
"os"
"path"
"strconv"
dstrings "strings"
"time"
"github.com/golang/glog"
gcli "github.com/heketi/heketi/client/api/go-client"
@ -38,7 +35,6 @@ import (
"k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
"runtime"
)
@ -64,8 +60,6 @@ const (
volPrefix = "vol_"
dynamicEpSvcPrefix = "glusterfs-dynamic-"
replicaCount = 3
gidMax = 600000
gidMin = 2000
durabilityType = "replicate"
secretKeyName = "key" // key name used in secret
annGlusterURL = "glusterfs.kubernetes.io/url"
@ -487,8 +481,6 @@ func (d *glusterfsVolumeDeleter) Delete() error {
func (r *glusterfsVolumeProvisioner) Provision() (*api.PersistentVolume, error) {
var err error
var reqGid int64
gidRandomizer := rand.New(rand.NewSource(time.Now().UnixNano()))
if r.options.PVC.Spec.Selector != nil {
glog.V(4).Infof("glusterfs: not able to parse your claim Selector")
return nil, fmt.Errorf("glusterfs: not able to parse your claim Selector")
@ -500,9 +492,9 @@ func (r *glusterfsVolumeProvisioner) Provision() (*api.PersistentVolume, error)
return nil, err
}
r.provisioningConfig = *cfg
glog.V(4).Infof("glusterfs: creating volume with configuration %+v", r.provisioningConfig)
reqGid = gidMin + gidRandomizer.Int63n(gidMax)
glusterfs, sizeGB, err := r.CreateVolume(reqGid)
glusterfs, sizeGB, err := r.CreateVolume()
if err != nil {
glog.Errorf("glusterfs: create volume err: %v.", err)
return nil, fmt.Errorf("glusterfs: create volume err: %v.", err)
@ -514,15 +506,13 @@ func (r *glusterfsVolumeProvisioner) Provision() (*api.PersistentVolume, error)
if len(pv.Spec.AccessModes) == 0 {
pv.Spec.AccessModes = r.plugin.GetAccessModes()
}
sGid := strconv.FormatInt(reqGid, 10)
pv.Annotations = map[string]string{volumehelper.VolumeGidAnnotationKey: sGid}
pv.Spec.Capacity = api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGB)),
}
return pv, nil
}
func (p *glusterfsVolumeProvisioner) CreateVolume(reqGid int64) (r *api.GlusterfsVolumeSource, size int, err error) {
func (p *glusterfsVolumeProvisioner) CreateVolume() (r *api.GlusterfsVolumeSource, size int, err error) {
capacity := p.options.PVC.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)]
volSizeBytes := capacity.Value()
sz := int(volume.RoundUpSize(volSizeBytes, 1024*1024*1024))
@ -536,7 +526,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(reqGid int64) (r *api.Glusterf
glog.Errorf("glusterfs: failed to create glusterfs rest client")
return nil, 0, fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed")
}
volumeReq := &gapi.VolumeCreateRequest{Size: sz, Gid: reqGid, Durability: gapi.VolumeDurabilityInfo{Type: durabilityType, Replicate: gapi.ReplicaDurability{Replica: replicaCount}}}
volumeReq := &gapi.VolumeCreateRequest{Size: sz, Durability: gapi.VolumeDurabilityInfo{Type: durabilityType, Replicate: gapi.ReplicaDurability{Replica: replicaCount}}}
volume, err := cli.VolumeCreate(volumeReq)
if err != nil {
glog.Errorf("glusterfs: error creating volume %v ", err)

View File

@ -19,6 +19,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/uuid:go_default_library",
"//pkg/volume:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"regexp"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/volume"
@ -244,7 +245,11 @@ func (r *hostPathRecycler) GetPath() string {
// Recycle blocks until the pod has completed or any error occurs.
// HostPath recycling only works in single node clusters and is meant for testing purposes only.
func (r *hostPathRecycler) Recycle() error {
pod := r.config.RecyclerPodTemplate
templateClone, err := conversion.NewCloner().DeepCopy(r.config.RecyclerPodTemplate)
if err != nil {
return err
}
pod := templateClone.(*api.Pod)
// overrides
pod.Spec.ActiveDeadlineSeconds = &r.timeout
pod.Spec.Volumes[0].VolumeSource = api.VolumeSource{

View File

@ -19,6 +19,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/mount:go_default_library",

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
@ -332,7 +333,11 @@ func (r *nfsRecycler) GetPath() string {
// Recycle recycles/scrubs clean an NFS volume.
// Recycle blocks until the pod has completed or any error occurs.
func (r *nfsRecycler) Recycle() error {
pod := r.config.RecyclerPodTemplate
templateClone, err := conversion.NewCloner().DeepCopy(r.config.RecyclerPodTemplate)
if err != nil {
return err
}
pod := templateClone.(*api.Pod)
// overrides
pod.Spec.ActiveDeadlineSeconds = &r.timeout
pod.GenerateName = "pv-recycler-nfs-"

View File

@ -119,12 +119,19 @@ func (plugin *photonPersistentDiskPlugin) newUnmounterInternal(volName string, p
}}, nil
}
func (plugin *photonPersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *photonPersistentDiskPlugin) ConstructVolumeSpec(volumeSpecName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
pdID, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
if err != nil {
return nil, err
}
photonPersistentDisk := &api.Volume{
Name: volumeName,
Name: volumeSpecName,
VolumeSource: api.VolumeSource{
PhotonPersistentDisk: &api.PhotonPersistentDiskVolumeSource{
PdID: volumeName,
PdID: pdID,
},
},
}
@ -283,7 +290,7 @@ func (c *photonPersistentDiskUnmounter) TearDownAt(dir string) error {
}
func makeGlobalPDPath(host volume.VolumeHost, devName string) string {
return path.Join(host.GetPluginDir(photonPersistentDiskPluginName), "mounts", devName)
return path.Join(host.GetPluginDir(photonPersistentDiskPluginName), mount.MountsInGlobalPDPath, devName)
}
func (ppd *photonPersistentDisk) GetPath() string {

View File

@ -83,7 +83,12 @@ func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.P
return fmt.Errorf("unexpected error creating recycler pod: %+v\n", err)
}
}
defer recyclerClient.DeletePod(pod.Name, pod.Namespace)
defer func(pod *api.Pod) {
glog.V(2).Infof("deleting recycler pod %s/%s", pod.Namespace, pod.Name)
if err := recyclerClient.DeletePod(pod.Name, pod.Namespace); err != nil {
glog.Errorf("failed to delete recycler pod %s/%s: %v", pod.Namespace, pod.Name, err)
}
}(pod)
// Now only the old pod or the new pod run. Watch it until it finishes
// and send all events on the pod to the PV

View File

@ -22,6 +22,7 @@ package operationexecutor
import (
"fmt"
"strings"
"time"
"github.com/golang/glog"
@ -1053,7 +1054,8 @@ func (oe *operationExecutor) generateUnmountDeviceFunc(
err)
}
refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
if err != nil || len(refs) > 0 {
if err != nil || hasMountRefs(deviceMountPath, refs) {
if err == nil {
err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs)
}
@ -1124,6 +1126,24 @@ func (oe *operationExecutor) generateUnmountDeviceFunc(
}, nil
}
// TODO: this is a workaround for the unmount device issue caused by gci mounter.
// In GCI cluster, if gci mounter is used for mounting, the container started by mounter
// script will cause additional mounts created in the container. Since these mounts are
// irrelavant to the original mounts, they should be not considered when checking the
// mount references. Current solution is to filter out those mount paths that contain
// the string of original mount path.
// Plan to work on better approach to solve this issue.
func hasMountRefs(mountPath string, mountRefs []string) bool {
count := 0
for _, ref := range mountRefs {
if !strings.Contains(ref, mountPath) {
count = count + 1
}
}
return count > 0
}
func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
volumeToMount VolumeToMount,
nodeName types.NodeName,

View File

@ -94,7 +94,7 @@ func UnmountPath(mountPath string, mounter mount.Interface) error {
return err
}
if notMnt {
glog.V(4).Info("%q is unmounted, deleting the directory", mountPath)
glog.V(4).Infof("%q is unmounted, deleting the directory", mountPath)
return os.Remove(mountPath)
}
return nil

View File

@ -51,6 +51,17 @@ func SetVolumeOwnership(mounter Mounter, fsGroup *int64) error {
return err
}
// chown and chmod pass through to the underlying file for symlinks.
// Symlinks have a mode of 777 but this really doesn't mean anything.
// The permissions of the underlying file are what matter.
// However, if one reads the mode of a symlink then chmods the symlink
// with that mode, it changes the mode of the underlying file, overridden
// the defaultMode and permissions initialized by the volume plugin, which
// is not what we want; thus, we skip chown/chmod for symlinks.
if info.Mode()&os.ModeSymlink != 0 {
return nil
}
stat, ok := info.Sys().(*syscall.Stat_t)
if !ok {
return nil

View File

@ -276,7 +276,7 @@ func (v *vsphereVolumeUnmounter) TearDownAt(dir string) error {
}
func makeGlobalPDPath(host volume.VolumeHost, devName string) string {
return path.Join(host.GetPluginDir(vsphereVolumePluginName), "mounts", devName)
return path.Join(host.GetPluginDir(vsphereVolumePluginName), mount.MountsInGlobalPDPath, devName)
}
func (vv *vsphereVolume) GetPath() string {

View File

@ -41,6 +41,48 @@ func init() {
}
}
// gcpAuthProvider is an auth provider plugin that uses GCP credentials to provide
// tokens for kubectl to authenticate itself to the apiserver. A sample json config
// is provided below with all recognized options described.
//
// {
// 'auth-provider': {
// # Required
// "name": "gcp",
//
// 'config': {
// # Caching options
//
// # Raw string data representing cached access token.
// "access-token": "ya29.CjWdA4GiBPTt",
// # RFC3339Nano expiration timestamp for cached access token.
// "expiry": "2016-10-31 22:31:9.123",
//
// # Command execution options
// # These options direct the plugin to execute a specified command and parse
// # token and expiry time from the output of the command.
//
// # Command to execute for access token. String is split on whitespace
// # with first field treated as the executable, remaining fields as args.
// # Command output will be parsed as JSON.
// "cmd-path": "/usr/bin/gcloud config config-helper --output=json",
//
// # JSONPath to the string field that represents the access token in
// # command output. If omitted, defaults to "{.access_token}".
// "token-key": "{.credential.access_token}",
//
// # JSONPath to the string field that represents expiration timestamp
// # of the access token in the command output. If omitted, defaults to
// # "{.token_expiry}"
// "expiry-key": ""{.credential.token_expiry}",
//
// # golang reference time in the format that the expiration timestamp uses.
// # If omitted, defaults to time.RFC3339Nano
// "time-fmt": "2006-01-02 15:04:05.999999999"
// }
// }
// }
//
type gcpAuthProvider struct {
tokenSource oauth2.TokenSource
persister restclient.AuthProviderConfigPersister

Some files were not shown because too many files have changed in this diff Show More