diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 87a7ef2f70..ac796ce481 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -135,6 +135,14 @@ const annDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by" // a volume for this PVC. const annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner" +// This annotation is added to a PVC that has been triggered by scheduler to +// be dynamically provisioned. Its value is the name of the selected node. +const annSelectedNode = "volume.alpha.kubernetes.io/selected-node" + +// If the provisioner name in a storage class is set to "kubernetes.io/no-provisioner", +// then dynamic provisioning is not supported by the storage. +const notSupportedProvisioner = "kubernetes.io/no-provisioner" + // CloudVolumeCreatedForClaimNamespaceTag is a name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD) // with namespace of a persistent volume claim used to create this volume. const CloudVolumeCreatedForClaimNamespaceTag = "kubernetes.io/created-for/pvc/namespace" diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index ad4cf139c8..0c5ccc9ec4 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -24,10 +24,12 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/features" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -58,24 +60,30 @@ type SchedulerVolumeBinder interface { // If a PVC is bound, it checks if the PV's NodeAffinity matches the Node. // Otherwise, it tries to find an available PV to bind to the PVC. // - // It returns true if there are matching PVs that can satisfy all of the Pod's PVCs, and returns true - // if bound volumes satisfy the PV NodeAffinity. + // It returns true if all of the Pod's PVCs have matching PVs or can be dynamic provisioned, + // and returns true if bound volumes satisfy the PV NodeAffinity. // // This function is called by the volume binding scheduler predicate and can be called in parallel FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error) - // AssumePodVolumes will take the PV matches for unbound PVCs and update the PV cache assuming + // AssumePodVolumes will: + // 1. Take the PV matches for unbound PVCs and update the PV cache assuming // that the PV is prebound to the PVC. + // 2. Take the PVCs that need provisioning and update the PVC cache with related + // annotations set. // - // It returns true if all volumes are fully bound, and returns true if any volume binding API operation needs - // to be done afterwards. + // It returns true if all volumes are fully bound, and returns true if any volume binding/provisioning + // API operation needs to be done afterwards. // // This function will modify assumedPod with the node name. // This function is called serially. AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error) - // BindPodVolumes will initiate the volume binding by making the API call to prebind the PV + // BindPodVolumes will: + // 1. Initiate the volume binding by making the API call to prebind the PV // to its matching PVC. + // 2. Trigger the volume provisioning by making the API call to set related + // annotations on the PVC // // This function can be called in parallel. BindPodVolumes(assumedPod *v1.Pod) error @@ -87,8 +95,7 @@ type SchedulerVolumeBinder interface { type volumeBinder struct { ctrl *PersistentVolumeController - // TODO: Need AssumeCache for PVC for dynamic provisioning - pvcCache corelisters.PersistentVolumeClaimLister + pvcCache PVCAssumeCache pvCache PVAssumeCache // Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes. @@ -111,7 +118,7 @@ func NewVolumeBinder( b := &volumeBinder{ ctrl: ctrl, - pvcCache: pvcInformer.Lister(), + pvcCache: NewPVCAssumeCache(pvcInformer.Informer()), pvCache: NewPVAssumeCache(pvInformer.Informer()), podBindingCache: NewPodBindingCache(), } @@ -123,7 +130,7 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache { return b.podBindingCache } -// FindPodVolumes caches the matching PVs per node in podBindingCache +// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) { podName := getPodName(pod) @@ -135,8 +142,8 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume boundVolumesSatisfied = true // The pod's volumes need to be processed in one call to avoid the race condition where - // volumes can get bound in between calls. - boundClaims, unboundClaims, unboundClaimsImmediate, err := b.getPodVolumes(pod) + // volumes can get bound/provisioned in between calls. + boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod) if err != nil { return false, false, err } @@ -154,20 +161,32 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume } } - // Find PVs for unbound volumes - if len(unboundClaims) > 0 { - unboundVolumesSatisfied, err = b.findMatchingVolumes(pod, unboundClaims, node) + if len(claimsToBind) > 0 { + var claimsToProvision []*v1.PersistentVolumeClaim + unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node) if err != nil { return false, false, err } + + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) { + // Try to provision for unbound volumes + if !unboundVolumesSatisfied { + unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node) + if err != nil { + return false, false, err + } + } + } } return unboundVolumesSatisfied, boundVolumesSatisfied, nil } -// AssumePodVolumes will take the cached matching PVs in podBindingCache for the chosen node -// and update the pvCache with the new prebound PV. It will update podBindingCache again -// with the PVs that need an API update. +// AssumePodVolumes will take the cached matching PVs and PVCs to provision +// in podBindingCache for the chosen node, and: +// 1. Update the pvCache with the new prebound PV. +// 2. Update the pvcCache with the new PVCs with annotations set +// It will update podBindingCache again with the PVs and PVCs that need an API update. func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) { podName := getPodName(assumedPod) @@ -179,6 +198,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al } assumedPod.Spec.NodeName = nodeName + // Assume PV claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName) newBindings := []*bindingInfo{} @@ -206,23 +226,48 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al } } - if len(newBindings) == 0 { - // Don't update cached bindings if no API updates are needed. This can happen if we - // previously updated the PV object and are waiting for the PV controller to finish binding. - glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: PVs already assumed", podName, nodeName) - return false, false, nil + // Don't update cached bindings if no API updates are needed. This can happen if we + // previously updated the PV object and are waiting for the PV controller to finish binding. + if len(newBindings) != 0 { + bindingRequired = true + b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings) } - b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings) - return false, true, nil + // Assume PVCs + claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName) + + newProvisionedPVCs := []*v1.PersistentVolumeClaim{} + for _, claim := range claimsToProvision { + // The claims from method args can be pointing to watcher cache. We must not + // modify these, therefore create a copy. + claimClone := claim.DeepCopy() + metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName) + err = b.pvcCache.Assume(claimClone) + if err != nil { + b.revertAssumedPVs(newBindings) + b.revertAssumedPVCs(newProvisionedPVCs) + return + } + + newProvisionedPVCs = append(newProvisionedPVCs, claimClone) + } + + if len(newProvisionedPVCs) != 0 { + bindingRequired = true + b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs) + } + + return } -// BindPodVolumes gets the cached bindings in podBindingCache and makes the API update for those PVs. +// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache +// and makes the API update for those PVs/PVCs. func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error { podName := getPodName(assumedPod) glog.V(4).Infof("BindPodVolumes for pod %q", podName) bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) + claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName) // Do the actual prebinding. Let the PV controller take care of the rest // There is no API rollback if the actual binding fails @@ -232,6 +277,20 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error { if err != nil { // only revert assumed cached updates for volumes we haven't successfully bound b.revertAssumedPVs(bindings[i:]) + // Revert all of the assumed cached updates for claims, + // since no actual API update will be done + b.revertAssumedPVCs(claimsToProvision) + return err + } + } + + // Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest + // PV controller is expect to signal back by removing related annotations if actual provisioning fails + for i, claim := range claimsToProvision { + if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { + glog.V(4).Infof("updating PersistentVolumeClaim[%s] failed: %v", getPVCName(claim), err) + // only revert assumed cached updates for claims we haven't successfully updated + b.revertAssumedPVCs(claimsToProvision[i:]) return err } } @@ -253,7 +312,13 @@ func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFull } pvcName := vol.PersistentVolumeClaim.ClaimName - pvc, err := b.pvcCache.PersistentVolumeClaims(namespace).Get(pvcName) + claim := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: namespace, + }, + } + pvc, err := b.pvcCache.GetPVC(getPVCName(claim)) if err != nil || pvc == nil { return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcName, err) } @@ -342,14 +407,18 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node return true, nil } -func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, err error) { +// findMatchingVolumes tries to find matching volumes for given claims, +// and return unbound claims for further provision. +func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) - // Sort all the claims by increasing size request to get the smallest fits sort.Sort(byPVCSize(claimsToBind)) chosenPVs := map[string]*v1.PersistentVolume{} + foundMatches = true + matchedClaims := []*bindingInfo{} + for _, bindingInfo := range claimsToBind { // Get storage class name from each PVC storageClassName := "" @@ -362,21 +431,68 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI // Find a matching PV bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true) if err != nil { - return false, err + return false, nil, err } if bindingInfo.pv == nil { glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name) - return false, nil + unboundClaims = append(unboundClaims, bindingInfo.pvc) + foundMatches = false + continue } // matching PV needs to be excluded so we don't select it again chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv + matchedClaims = append(matchedClaims, bindingInfo) glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName) } // Mark cache with all the matches for each PVC for this node - b.podBindingCache.UpdateBindings(pod, node.Name, claimsToBind) - glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name) + if len(matchedClaims) > 0 { + b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims) + } + + if foundMatches { + glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name) + } + + return +} + +// checkVolumeProvisions checks given unbound claims (the claims have gone through func +// findMatchingVolumes, and do not have matching volumes for binding), and return true +// if all of the claims are eligible for dynamic provision. +func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) { + podName := getPodName(pod) + provisionedClaims := []*v1.PersistentVolumeClaim{} + + for _, claim := range claimsToProvision { + className := v1helper.GetPersistentVolumeClaimClass(claim) + if className == "" { + return false, fmt.Errorf("no class for claim %q", getPVCName(claim)) + } + + class, err := b.ctrl.classLister.Get(className) + if err != nil { + return false, fmt.Errorf("failed to find storage class %q", className) + } + provisioner := class.Provisioner + if provisioner == "" || provisioner == notSupportedProvisioner { + glog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, getPVCName(claim)) + return false, nil + } + + // TODO: Check if the node can satisfy the topology requirement in the class + + // TODO: Check if capacity of the node domain in the storage class + // can satisfy resource requirement of given claim + + provisionedClaims = append(provisionedClaims, claim) + + } + glog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name) + + // Mark cache with all the PVCs that need provisioning for this node + b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims) return true, nil } @@ -387,6 +503,12 @@ func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) { } } +func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) { + for _, claim := range claims { + b.pvcCache.Restore(getPVCName(claim)) + } +} + type bindingInfo struct { // Claim that needs to be bound pvc *v1.PersistentVolumeClaim diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go index ad8d2efa96..98bc9e8bcd 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go @@ -33,20 +33,23 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/controller" ) var ( - unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", &waitClass) - unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", &waitClass) - preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", &waitClass) - boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", &waitClass) - boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", &waitClass) - badPVC = makeBadPVC() - immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", &immediateClass) - immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", &immediateClass) + unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", "1", &waitClass) + unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", "1", &waitClass) + preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", "1", &waitClass) + boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", "1", &waitClass) + boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", "1", &waitClass) + badPVC = makeBadPVC() + immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", "1", &immediateClass) + immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", "1", &immediateClass) + provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", pvcUnbound, "", "1", &waitClass) + provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "1", &waitClass) + provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "2", &waitClass) + noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", pvcUnbound, "", "1", &provisionNotSupportClass) pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass) pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass) @@ -68,10 +71,12 @@ var ( binding1aBound = makeBinding(unboundPVC, pvNode1aBound) binding1bBound = makeBinding(unboundPVC2, pvNode1bBound) - waitClass = "waitClass" - immediateClass = "immediateClass" + waitClass = "waitClass" + immediateClass = "immediateClass" + provisionNotSupportClass = "provisionNotSupportedClass" - nodeLabelKey = "nodeKey" + nodeLabelKey = "nodeKey" + nodeLabelValue = "node1" ) type testEnv struct { @@ -80,7 +85,7 @@ type testEnv struct { binder SchedulerVolumeBinder internalBinder *volumeBinder internalPVCache *pvAssumeCache - internalPVCCache cache.Indexer + internalPVCCache *pvcAssumeCache } func newTestBinder(t *testing.T) *testEnv { @@ -106,6 +111,7 @@ func newTestBinder(t *testing.T) *testEnv { Name: waitClass, }, VolumeBindingMode: &waitMode, + Provisioner: "test-provisioner", }, { ObjectMeta: metav1.ObjectMeta{ @@ -113,6 +119,13 @@ func newTestBinder(t *testing.T) *testEnv { }, VolumeBindingMode: &immediateMode, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: provisionNotSupportClass, + }, + VolumeBindingMode: &waitMode, + Provisioner: "kubernetes.io/no-provisioner", + }, } for _, class := range classes { if err := classInformer.Informer().GetIndexer().Add(class); err != nil { @@ -132,22 +145,31 @@ func newTestBinder(t *testing.T) *testEnv { t.Fatalf("Failed to convert to internal PV cache") } + pvcCache := internalBinder.pvcCache + internalPVCCache, ok := pvcCache.(*pvcAssumeCache) + if !ok { + t.Fatalf("Failed to convert to internal PVC cache") + } + return &testEnv{ client: client, reactor: reactor, binder: binder, internalBinder: internalBinder, internalPVCache: internalPVCache, - internalPVCCache: pvcInformer.Informer().GetIndexer(), + internalPVCCache: internalPVCCache, } } -func (env *testEnv) initClaims(t *testing.T, pvcs []*v1.PersistentVolumeClaim) { - for _, pvc := range pvcs { - err := env.internalPVCCache.Add(pvc) - if err != nil { - t.Fatalf("Failed to add PVC %q to internal cache: %v", pvc.Name, err) +func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) { + internalPVCCache := env.internalPVCCache + for _, pvc := range cachedPVCs { + internalPVCCache.add(pvc) + if apiPVCs == nil { + env.reactor.claims[pvc.Name] = pvc } + } + for _, pvc := range apiPVCs { env.reactor.claims[pvc.Name] = pvc } } @@ -166,7 +188,7 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P } -func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo) { +func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) { pvCache := env.internalBinder.pvCache for _, binding := range bindings { if err := pvCache.Assume(binding.pv); err != nil { @@ -175,20 +197,38 @@ func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, } env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings) + + pvcCache := env.internalBinder.pvcCache + for _, pvc := range provisionings { + if err := pvcCache.Assume(pvc); err != nil { + t.Fatalf("Failed to setup test %q: error: %v", name, err) + } + } + + env.internalBinder.podBindingCache.UpdateProvisionedPVCs(pod, node, provisionings) } -func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo) { +func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) { cache := env.internalBinder.podBindingCache cache.UpdateBindings(pod, node, bindings) + + cache.UpdateProvisionedPVCs(pod, node, provisionings) } -func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo) { +func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) { cache := env.internalBinder.podBindingCache bindings := cache.GetBindings(pod, node) if !reflect.DeepEqual(expectedBindings, bindings) { t.Errorf("Test %q failed: Expected bindings %+v, got %+v", name, expectedBindings, bindings) } + + provisionedClaims := cache.GetProvisionedPVCs(pod, node) + + if !reflect.DeepEqual(expectedProvisionings, provisionedClaims) { + t.Errorf("Test %q failed: Expected provisionings %+v, got %+v", name, expectedProvisionings, provisionedClaims) + } + } func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) []*bindingInfo { @@ -196,7 +236,7 @@ func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) return cache.GetBindings(pod, node) } -func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) { +func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) { // TODO: Check binding cache // Check pv cache @@ -218,9 +258,23 @@ func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindi t.Errorf("Test %q failed: expected PV.ClaimRef.Namespace %q, got %q", name, b.pvc.Namespace, pv.Spec.ClaimRef.Namespace) } } + + // Check pvc cache + pvcCache := env.internalBinder.pvcCache + for _, p := range provisionings { + pvcKey := getPVCName(p) + pvc, err := pvcCache.GetPVC(pvcKey) + if err != nil { + t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err) + continue + } + if pvc.Annotations[annSelectedNode] != nodeLabelValue { + t.Errorf("Test %q failed: expected annSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[annSelectedNode]) + } + } } -func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) { +func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) { // All PVs have been unmodified in cache pvCache := env.internalBinder.pvCache for _, b := range bindings { @@ -230,6 +284,20 @@ func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, t.Errorf("Test %q failed: PV %q was modified in cache", name, b.pv.Name) } } + + // Check pvc cache + pvcCache := env.internalBinder.pvcCache + for _, p := range provisionings { + pvcKey := getPVCName(p) + pvc, err := pvcCache.GetPVC(pvcKey) + if err != nil { + t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err) + continue + } + if pvc.Annotations[annSelectedNode] != "" { + t.Errorf("Test %q failed: expected annSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[annSelectedNode]) + } + } } func (env *testEnv) validateBind( @@ -257,20 +325,46 @@ func (env *testEnv) validateBind( } } +func (env *testEnv) validateProvision( + t *testing.T, + name string, + pod *v1.Pod, + expectedPVCs []*v1.PersistentVolumeClaim, + expectedAPIPVCs []*v1.PersistentVolumeClaim) { + + // Check pvc cache + pvcCache := env.internalBinder.pvcCache + for _, pvc := range expectedPVCs { + cachedPVC, err := pvcCache.GetPVC(getPVCName(pvc)) + if err != nil { + t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, getPVCName(pvc), err) + } + if !reflect.DeepEqual(cachedPVC, pvc) { + t.Errorf("Test %q failed: cached PVC check failed [A-expected, B-got]:\n%s", name, diff.ObjectDiff(pvc, cachedPVC)) + } + } + + // Check reactor for API updates + if err := env.reactor.checkClaims(expectedAPIPVCs); err != nil { + t.Errorf("Test %q failed: API reactor validation failed: %v", name, err) + } +} + const ( pvcUnbound = iota pvcPrebound pvcBound ) -func makeTestPVC(name, size string, pvcBoundState int, pvName string, className *string) *v1.PersistentVolumeClaim { +func makeTestPVC(name, size string, pvcBoundState int, pvName, resourceVersion string, className *string) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: "testns", UID: types.UID("pvc-uid"), - ResourceVersion: "1", + ResourceVersion: resourceVersion, SelfLink: testapi.Default.SelfLink("pvc", name), + Annotations: map[string]string{}, }, Spec: v1.PersistentVolumeClaimSpec{ Resources: v1.ResourceRequirements{ @@ -389,7 +483,15 @@ func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindin return &bindingInfo{pvc: pvc, pv: pv} } -func TestFindPodVolumes(t *testing.T) { +func addProvisionAnn(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim { + res := pvc.DeepCopy() + // Add provision related annotations + res.Annotations[annSelectedNode] = nodeLabelValue + + return res +} + +func TestFindPodVolumesWithoutProvisioning(t *testing.T) { scenarios := map[string]struct { // Inputs pvs []*v1.PersistentVolume @@ -470,10 +572,11 @@ func TestFindPodVolumes(t *testing.T) { expectedBound: true, }, "two-unbound-pvcs,partial-match": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, - pvs: []*v1.PersistentVolume{pvNode1a}, - expectedUnbound: false, - expectedBound: true, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, + pvs: []*v1.PersistentVolume{pvNode1a}, + expectedBindings: []*bindingInfo{binding1a}, + expectedUnbound: false, + expectedBound: true, }, "one-bound,one-unbound": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC}, @@ -552,7 +655,7 @@ func TestFindPodVolumes(t *testing.T) { if scenario.cachePVCs == nil { scenario.cachePVCs = scenario.podPVCs } - testEnv.initClaims(t, scenario.cachePVCs) + testEnv.initClaims(scenario.cachePVCs, scenario.cachePVCs) // b. Generate pod with given claims if scenario.pod == nil { @@ -575,16 +678,126 @@ func TestFindPodVolumes(t *testing.T) { if unboundSatisfied != scenario.expectedUnbound { t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied) } - testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings) + testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings, nil) + } +} + +func TestFindPodVolumesWithProvisioning(t *testing.T) { + scenarios := map[string]struct { + // Inputs + pvs []*v1.PersistentVolume + podPVCs []*v1.PersistentVolumeClaim + // If nil, use pod PVCs + cachePVCs []*v1.PersistentVolumeClaim + // If nil, makePod with podPVCs + pod *v1.Pod + + // Expected podBindingCache fields + expectedBindings []*bindingInfo + expectedProvisions []*v1.PersistentVolumeClaim + + // Expected return values + expectedUnbound bool + expectedBound bool + shouldFail bool + }{ + "one-provisioned": { + podPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, + expectedUnbound: true, + expectedBound: true, + }, + "two-unbound-pvcs,one-matched,one-provisioned": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, + pvs: []*v1.PersistentVolume{pvNode1a}, + expectedBindings: []*bindingInfo{binding1a}, + expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, + expectedUnbound: true, + expectedBound: true, + }, + "one-bound,one-provisioned": { + podPVCs: []*v1.PersistentVolumeClaim{boundPVC, provisionedPVC}, + pvs: []*v1.PersistentVolume{pvBound}, + expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, + expectedUnbound: true, + expectedBound: true, + }, + "immediate-unbound-pvc": { + podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC}, + expectedUnbound: false, + expectedBound: false, + shouldFail: true, + }, + "one-immediate-bound,one-provisioned": { + podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC, provisionedPVC}, + pvs: []*v1.PersistentVolume{pvBoundImmediate}, + expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, + expectedUnbound: true, + expectedBound: true, + }, + "invalid-provisioner": { + podPVCs: []*v1.PersistentVolumeClaim{noProvisionerPVC}, + expectedUnbound: false, + expectedBound: true, + }, + } + + // Set VolumeScheduling and DynamicProvisioningScheduling feature gate + utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true,DynamicProvisioningScheduling=true") + defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false,DynamicProvisioningScheduling=false") + + testNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + nodeLabelKey: "node1", + }, + }, + } + + for name, scenario := range scenarios { + // Setup + testEnv := newTestBinder(t) + testEnv.initVolumes(scenario.pvs, scenario.pvs) + + // a. Init pvc cache + if scenario.cachePVCs == nil { + scenario.cachePVCs = scenario.podPVCs + } + testEnv.initClaims(scenario.cachePVCs, scenario.cachePVCs) + + // b. Generate pod with given claims + if scenario.pod == nil { + scenario.pod = makePod(scenario.podPVCs) + } + + // Execute + unboundSatisfied, boundSatisfied, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode) + + // Validate + if !scenario.shouldFail && err != nil { + t.Errorf("Test %q failed: returned error: %v", name, err) + } + if scenario.shouldFail && err == nil { + t.Errorf("Test %q failed: returned success but expected error", name) + } + if boundSatisfied != scenario.expectedBound { + t.Errorf("Test %q failed: expected boundSatsified %v, got %v", name, scenario.expectedBound, boundSatisfied) + } + if unboundSatisfied != scenario.expectedUnbound { + t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied) + } + testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings, scenario.expectedProvisions) } } func TestAssumePodVolumes(t *testing.T) { scenarios := map[string]struct { // Inputs - podPVCs []*v1.PersistentVolumeClaim - pvs []*v1.PersistentVolume - bindings []*bindingInfo + podPVCs []*v1.PersistentVolumeClaim + pvs []*v1.PersistentVolume + bindings []*bindingInfo + provisionedPVCs []*v1.PersistentVolumeClaim // Expected return values shouldFail bool @@ -636,6 +849,21 @@ func TestAssumePodVolumes(t *testing.T) { shouldFail: true, expectedBindingRequired: true, }, + "one-binding, one-pvc-provisioned": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, + bindings: []*bindingInfo{binding1a}, + pvs: []*v1.PersistentVolume{pvNode1a}, + provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + expectedBindingRequired: true, + }, + "one-binding, one-provision-tmpupdate-failed": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion}, + bindings: []*bindingInfo{binding1a}, + pvs: []*v1.PersistentVolume{pvNode1a}, + provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2}, + shouldFail: true, + expectedBindingRequired: true, + }, } for name, scenario := range scenarios { @@ -643,9 +871,9 @@ func TestAssumePodVolumes(t *testing.T) { // Setup testEnv := newTestBinder(t) - testEnv.initClaims(t, scenario.podPVCs) + testEnv.initClaims(scenario.podPVCs, scenario.podPVCs) pod := makePod(scenario.podPVCs) - testEnv.initPodCache(pod, "node1", scenario.bindings) + testEnv.initPodCache(pod, "node1", scenario.bindings, scenario.provisionedPVCs) testEnv.initVolumes(scenario.pvs, scenario.pvs) // Execute @@ -668,9 +896,9 @@ func TestAssumePodVolumes(t *testing.T) { scenario.expectedBindings = scenario.bindings } if scenario.shouldFail { - testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings) + testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs) } else { - testEnv.validateAssume(t, name, pod, scenario.expectedBindings) + testEnv.validateAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs) } } } @@ -683,11 +911,20 @@ func TestBindPodVolumes(t *testing.T) { // if nil, use cachedPVs apiPVs []*v1.PersistentVolume + provisionedPVCs []*v1.PersistentVolumeClaim + cachedPVCs []*v1.PersistentVolumeClaim + // if nil, use cachedPVCs + apiPVCs []*v1.PersistentVolumeClaim + // Expected return values shouldFail bool expectedPVs []*v1.PersistentVolume // if nil, use expectedPVs expectedAPIPVs []*v1.PersistentVolume + + expectedPVCs []*v1.PersistentVolumeClaim + // if nil, use expectedPVCs + expectedAPIPVCs []*v1.PersistentVolumeClaim }{ "all-bound": {}, "not-fully-bound": { @@ -711,6 +948,30 @@ func TestBindPodVolumes(t *testing.T) { expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion}, shouldFail: true, }, + "one-provisioned-pvc": { + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + }, + "provision-api-update-failed": { + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)}, + cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVC2}, + apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVCHigherVersion}, + expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVC2}, + expectedAPIPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVCHigherVersion}, + shouldFail: true, + }, + "bingding-succeed, provision-api-update-failed": { + bindings: []*bindingInfo{binding1aBound}, + cachedPVs: []*v1.PersistentVolume{pvNode1a}, + expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)}, + cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVC2}, + apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVCHigherVersion}, + expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVC2}, + expectedAPIPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVCHigherVersion}, + shouldFail: true, + }, } for name, scenario := range scenarios { glog.V(5).Infof("Running test case %q", name) @@ -721,8 +982,12 @@ func TestBindPodVolumes(t *testing.T) { if scenario.apiPVs == nil { scenario.apiPVs = scenario.cachedPVs } + if scenario.apiPVCs == nil { + scenario.apiPVCs = scenario.cachedPVCs + } testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs) - testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings) + testEnv.initClaims(scenario.cachedPVCs, scenario.apiPVCs) + testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings, scenario.provisionedPVCs) // Execute err := testEnv.binder.BindPodVolumes(pod) @@ -737,7 +1002,11 @@ func TestBindPodVolumes(t *testing.T) { if scenario.expectedAPIPVs == nil { scenario.expectedAPIPVs = scenario.expectedPVs } + if scenario.expectedAPIPVCs == nil { + scenario.expectedAPIPVCs = scenario.expectedPVCs + } testEnv.validateBind(t, name, pod, scenario.expectedPVs, scenario.expectedAPIPVs) + testEnv.validateProvision(t, name, pod, scenario.expectedPVCs, scenario.expectedAPIPVCs) } } @@ -753,7 +1022,7 @@ func TestFindAssumeVolumes(t *testing.T) { // Setup testEnv := newTestBinder(t) testEnv.initVolumes(pvs, pvs) - testEnv.initClaims(t, podPVCs) + testEnv.initClaims(podPVCs, podPVCs) pod := makePod(podPVCs) testNode := &v1.Node{ @@ -787,7 +1056,7 @@ func TestFindAssumeVolumes(t *testing.T) { if !bindingRequired { t.Errorf("Test failed: binding not required") } - testEnv.validateAssume(t, "assume", pod, expectedBindings) + testEnv.validateAssume(t, "assume", pod, expectedBindings, nil) // After assume, claimref should be set on pv expectedBindings = testEnv.getPodBindings(t, "after-assume", testNode.Name, pod) @@ -803,6 +1072,6 @@ func TestFindAssumeVolumes(t *testing.T) { if !unboundSatisfied { t.Errorf("Test failed: couldn't find PVs for all PVCs") } - testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings) + testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, nil) } } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index d773374ec4..002bcc4297 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -279,6 +279,12 @@ const ( // A node which has closer cpu,memory utilization and volume count is favoured by scheduler // while making decisions. BalanceAttachedNodeVolumes utilfeature.Feature = "BalanceAttachedNodeVolumes" + + // owner: @lichuqiang + // alpha: v1.11 + // + // Extend the default scheduler to be aware of volume topology and handle PV provisioning + DynamicProvisioningScheduling utilfeature.Feature = "DynamicProvisioningScheduling" ) func init() { @@ -327,6 +333,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha}, VolumeSubpath: {Default: true, PreRelease: utilfeature.GA}, BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha}, + DynamicProvisioningScheduling: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 5aed8c6819..912ab05a1b 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -506,12 +506,16 @@ func ClusterRoles() []rbacv1.ClusterRole { } if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + rules := []rbacv1.PolicyRule{ + rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("storageclasses").RuleOrDie(), + } + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) { + rules = append(rules, rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie()) + } roles = append(roles, rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: "system:volume-scheduler"}, - Rules: []rbacv1.PolicyRule{ - rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(), - rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("storageclasses").RuleOrDie(), - }, + Rules: rules, }) }