Upgrade k8s to v1.4.6

pull/880/head
Matt Rickard 2016-12-02 10:13:51 -08:00
parent cdeccff598
commit b0c7704e93
58 changed files with 2288 additions and 12477 deletions

2259
Godeps/Godeps.json generated

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ minikube start
--insecure-registry stringSlice Insecure Docker registries to pass to the Docker daemon
--iso-url string Location of the minikube iso (default "https://storage.googleapis.com/minikube/minikube-0.7.iso")
--kubernetes-version string The kubernetes version that the minikube VM will use (ex: v1.2.3)
OR a URI which contains a localkube binary (ex: https://storage.googleapis.com/minikube/k8sReleases/v1.3.0/localkube-linux-amd64) (default "v1.4.5")
OR a URI which contains a localkube binary (ex: https://storage.googleapis.com/minikube/k8sReleases/v1.3.0/localkube-linux-amd64) (default "v1.4.6")
--kvm-network string The KVM network name. (only supported with KVM driver) (default "default")
--memory int Amount of RAM allocated to the minikube VM (default 2048)
--network-plugin string The name of the network plugin

View File

@ -603,6 +603,11 @@ func (as *authStore) isOpPermitted(userName string, key, rangeEnd []byte, permTy
return false
}
// root role should have permission on all ranges
if hasRootRole(user) {
return true
}
if as.isRangeOpPermitted(tx, userName, key, rangeEnd, permTyp) {
return true
}

View File

@ -154,13 +154,13 @@ type Server interface {
// EtcdServer is the production implementation of the Server interface
type EtcdServer struct {
// r and inflightSnapshots must be the first elements to keep 64-bit alignment for atomic
// access to fields
// count the number of inflight snapshots.
// MUST use atomic operation to access this field.
inflightSnapshots int64
Cfg *ServerConfig
// inflightSnapshots holds count the number of snapshots currently inflight.
inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
// consistIndex used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
Cfg *ServerConfig
readych chan struct{}
r raftNode
@ -195,10 +195,6 @@ type EtcdServer struct {
// compactor is used to auto-compact the KV.
compactor *compactor.Periodic
// consistent index used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex
// peerRt used to send requests (version, lease) to peers.
peerRt http.RoundTripper
reqIDGen *idutil.Generator
@ -212,8 +208,6 @@ type EtcdServer struct {
// wg is used to wait for the go routines that depends on the server state
// to exit when stopping the server.
wg sync.WaitGroup
appliedIndex uint64
}
// NewServer creates a new EtcdServer from the supplied configuration. The

View File

@ -408,6 +408,13 @@ func (s *store) restore() error {
s.currentRev = rev
}
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
if s.currentRev.main < s.compactMainRev {
s.currentRev.main = s.compactMainRev
}
for key, lid := range keyToLease {
if s.le == nil {
panic("no lessor to attach lease")

View File

@ -38,7 +38,7 @@ var (
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
Lead uint64
Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
RaftState StateType
}

View File

@ -183,9 +183,9 @@ func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
type Entry struct {
Type EntryType `protobuf:"varint,1,opt,name=Type,json=type,enum=raftpb.EntryType" json:"Type"`
Term uint64 `protobuf:"varint,2,opt,name=Term,json=term" json:"Term"`
Index uint64 `protobuf:"varint,3,opt,name=Index,json=index" json:"Index"`
Type EntryType `protobuf:"varint,1,opt,name=Type,json=type,enum=raftpb.EntryType" json:"Type"`
Data []byte `protobuf:"bytes,4,opt,name=Data,json=data" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}

View File

@ -15,9 +15,9 @@ enum EntryType {
}
message Entry {
optional uint64 Term = 2 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
optional uint64 Index = 3 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
optional EntryType Type = 1 [(gogoproto.nullable) = false];
optional uint64 Term = 2 [(gogoproto.nullable) = false];
optional uint64 Index = 3 [(gogoproto.nullable) = false];
optional bytes Data = 4;
}

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.12"
Version = "3.0.13"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -27,24 +27,33 @@ import (
"k8s.io/kubernetes/pkg/util/sysctl"
)
// Conntracker is an interface to the global sysctl. Descriptions of the various
// sysctl fields can be found here:
//
// https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
type Conntracker interface {
// SetMax adjusts nf_conntrack_max.
SetMax(max int) error
// SetTCPEstablishedTimeout adjusts nf_conntrack_tcp_timeout_established.
SetTCPEstablishedTimeout(seconds int) error
// SetTCPCloseWaitTimeout nf_conntrack_tcp_timeout_close_wait.
SetTCPCloseWaitTimeout(seconds int) error
}
type realConntracker struct{}
var readOnlySysFSError = errors.New("ReadOnlySysFS")
func (realConntracker) SetMax(max int) error {
glog.Infof("Setting nf_conntrack_max to %d", max)
if err := sysctl.New().SetSysctl("net/netfilter/nf_conntrack_max", max); err != nil {
func (rct realConntracker) SetMax(max int) error {
if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil {
return err
}
// sysfs is expected to be mounted as 'rw'. However, it may be unexpectedly mounted as
// 'ro' by docker because of a known docker issue (https://github.com/docker/docker/issues/24000).
// Setting conntrack will fail when sysfs is readonly. When that happens, we don't set conntrack
// hashsize and return a special error readOnlySysFSError here. The caller should deal with
// sysfs is expected to be mounted as 'rw'. However, it may be
// unexpectedly mounted as 'ro' by docker because of a known docker
// issue (https://github.com/docker/docker/issues/24000). Setting
// conntrack will fail when sysfs is readonly. When that happens, we
// don't set conntrack hashsize and return a special error
// readOnlySysFSError here. The caller should deal with
// readOnlySysFSError differently.
writable, err := isSysFSWritable()
if err != nil {
@ -58,9 +67,22 @@ func (realConntracker) SetMax(max int) error {
return ioutil.WriteFile("/sys/module/nf_conntrack/parameters/hashsize", []byte(strconv.Itoa(max/4)), 0640)
}
func (realConntracker) SetTCPEstablishedTimeout(seconds int) error {
glog.Infof("Setting nf_conntrack_tcp_timeout_established to %d", seconds)
return sysctl.New().SetSysctl("net/netfilter/nf_conntrack_tcp_timeout_established", seconds)
func (rct realConntracker) SetTCPEstablishedTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_timeout_established", seconds)
}
func (rct realConntracker) SetTCPCloseWaitTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_timeout_close_wait", seconds)
}
func (realConntracker) setIntSysCtl(name string, value int) error {
entry := "net/netfilter/" + name
glog.Infof("Set sysctl '%v' to %v", entry, value)
if err := sysctl.New().SetSysctl(entry, value); err != nil {
return err
}
return nil
}
// isSysFSWritable checks /proc/mounts to see whether sysfs is 'rw' or not.
@ -73,16 +95,21 @@ func isSysFSWritable() (bool, error) {
glog.Errorf("failed to list mount points: %v", err)
return false, err
}
for _, mountPoint := range mountPoints {
const sysfsDevice = "sysfs"
if mountPoint.Device != sysfsDevice {
continue
}
// Check whether sysfs is 'rw'
const permWritable = "rw"
if len(mountPoint.Opts) > 0 && mountPoint.Opts[0] == permWritable {
return true, nil
}
glog.Errorf("sysfs is not writable: %+v", mountPoint)
break
glog.Errorf("sysfs is not writable: %+v (mount options are %v)",
mountPoint, mountPoint.Opts)
return false, readOnlySysFSError
}
return false, nil
return false, errors.New("No sysfs mounted")
}

View File

@ -92,5 +92,10 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.Int32Var(&s.ConntrackMin, "conntrack-min", s.ConntrackMin,
"Minimum number of conntrack entries to allocate, regardless of conntrack-max-per-core (set conntrack-max-per-core=0 to leave the limit as-is).")
fs.DurationVar(&s.ConntrackTCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", s.ConntrackTCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
fs.DurationVar(
&s.ConntrackTCPCloseWaitTimeout.Duration, "conntrack-tcp-timeout-close-wait",
s.ConntrackTCPCloseWaitTimeout.Duration,
"NAT timeout for TCP connections in the CLOSE_WAIT state")
config.DefaultFeatureGate.AddFlag(fs)
}

View File

@ -315,12 +315,22 @@ func (s *ProxyServer) Run() error {
// administrator should decide whether and how to handle this issue,
// whether to drain the node and restart docker.
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "DOCKER RESTART NEEDED (docker issue #24000): /sys is read-only: can't raise conntrack limits, problems may arise later."
const message = "DOCKER RESTART NEEDED (docker issue #24000): /sys is read-only: " +
"cannot modify conntrack limits, problems may arise later."
s.Recorder.Eventf(s.Config.NodeRef, api.EventTypeWarning, err.Error(), message)
}
}
if s.Config.ConntrackTCPEstablishedTimeout.Duration > 0 {
if err := s.Conntracker.SetTCPEstablishedTimeout(int(s.Config.ConntrackTCPEstablishedTimeout.Duration / time.Second)); err != nil {
timeout := int(s.Config.ConntrackTCPEstablishedTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}
if s.Config.ConntrackTCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.Config.ConntrackTCPCloseWaitTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
}

File diff suppressed because it is too large Load Diff

View File

@ -75,8 +75,12 @@ type KubeProxyConfiguration struct {
// regardless of conntrackMaxPerCore (set conntrackMaxPerCore=0 to leave the limit as-is).
ConntrackMin int32 `json:"conntrackMin"`
// conntrackTCPEstablishedTimeout is how long an idle TCP connection will be kept open
// (e.g. '250ms', '2s'). Must be greater than 0.
// (e.g. '2s'). Must be greater than 0.
ConntrackTCPEstablishedTimeout unversioned.Duration `json:"conntrackTCPEstablishedTimeout"`
// conntrackTCPCloseWaitTimeout is how long an idle conntrack entry
// in CLOSE_WAIT state will remain in the conntrack
// table. (e.g. '60s'). Must be greater than 0 to set.
ConntrackTCPCloseWaitTimeout unversioned.Duration `json:"conntrackTCPCloseWaitTimeout"`
}
// Currently two modes of proxying are available: 'userspace' (older, stable) or 'iptables'

View File

@ -100,6 +100,29 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
if obj.ConntrackTCPEstablishedTimeout == zero {
obj.ConntrackTCPEstablishedTimeout = unversioned.Duration{Duration: 24 * time.Hour} // 1 day (1/5 default)
}
if obj.ConntrackTCPCloseWaitTimeout == zero {
// See https://github.com/kubernetes/kubernetes/issues/32551.
//
// CLOSE_WAIT conntrack state occurs when the the Linux kernel
// sees a FIN from the remote server. Note: this is a half-close
// condition that persists as long as the local side keeps the
// socket open. The condition is rare as it is typical in most
// protocols for both sides to issue a close; this typically
// occurs when the local socket is lazily garbage collected.
//
// If the CLOSE_WAIT conntrack entry expires, then FINs from the
// local socket will not be properly SNAT'd and will not reach the
// remote server (if the connection was subject to SNAT). If the
// remote timeouts for FIN_WAIT* states exceed the CLOSE_WAIT
// timeout, then there will be an inconsistency in the state of
// the connection and a new connection reusing the SNAT (src,
// port) pair may be rejected by the remote side with RST. This
// can cause new calls to connect(2) to return with ECONNREFUSED.
//
// We set CLOSE_WAIT to one hour by default to better match
// typical server timeouts.
obj.ConntrackTCPCloseWaitTimeout = unversioned.Duration{Duration: 1 * time.Hour}
}
}
func SetDefaults_KubeSchedulerConfiguration(obj *KubeSchedulerConfiguration) {

View File

@ -71,9 +71,13 @@ type KubeProxyConfiguration struct {
// conntrackMin is the minimum value of connect-tracking records to allocate,
// regardless of conntrackMaxPerCore (set conntrackMaxPerCore=0 to leave the limit as-is).
ConntrackMin int32 `json:"conntrackMin"`
// conntrackTCPEstablishedTimeout is how long an idle TCP connection will be kept open
// (e.g. '250ms', '2s'). Must be greater than 0.
// conntrackTCPEstablishedTimeout is how long an idle TCP connection
// will be kept open (e.g. '2s'). Must be greater than 0.
ConntrackTCPEstablishedTimeout unversioned.Duration `json:"conntrackTCPEstablishedTimeout"`
// conntrackTCPCloseWaitTimeout is how long an idle conntrack entry
// in CLOSE_WAIT state will remain in the conntrack
// table. (e.g. '60s'). Must be greater than 0 to set.
ConntrackTCPCloseWaitTimeout unversioned.Duration `json:"conntrackTCPCloseWaitTimeout"`
}
// Currently two modes of proxying are available: 'userspace' (older, stable) or 'iptables'

View File

@ -71,6 +71,7 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_componentconfig_KubeProxyCon
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackMin = in.ConntrackMin
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
out.ConntrackTCPCloseWaitTimeout = in.ConntrackTCPCloseWaitTimeout
return nil
}
@ -101,6 +102,7 @@ func autoConvert_componentconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackMin = in.ConntrackMin
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
out.ConntrackTCPCloseWaitTimeout = in.ConntrackTCPCloseWaitTimeout
return nil
}

View File

@ -77,6 +77,7 @@ func DeepCopy_v1alpha1_KubeProxyConfiguration(in interface{}, out interface{}, c
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackMin = in.ConntrackMin
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
out.ConntrackTCPCloseWaitTimeout = in.ConntrackTCPCloseWaitTimeout
return nil
}
}

View File

@ -164,6 +164,7 @@ func DeepCopy_componentconfig_KubeProxyConfiguration(in interface{}, out interfa
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackMin = in.ConntrackMin
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
out.ConntrackTCPCloseWaitTimeout = in.ConntrackTCPCloseWaitTimeout
return nil
}
}

View File

@ -249,14 +249,15 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
return nil, err
}
var versionedDeleteOptions runtime.Object
var versionedDeleterObject interface{}
switch {
case isGracefulDeleter:
objectPtr, err := a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions"))
versionedDeleteOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions"))
if err != nil {
return nil, err
}
versionedDeleterObject = indirectArbitraryPointer(objectPtr)
versionedDeleterObject = indirectArbitraryPointer(versionedDeleteOptions)
isDeleter = true
case isDeleter:
gracefulDeleter = rest.GracefulDeleteAdapter{Deleter: deleter}
@ -625,6 +626,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Returns(http.StatusOK, "OK", versionedStatus)
if isGracefulDeleter {
route.Reads(versionedDeleterObject)
if err := addObjectParams(ws, route, versionedDeleteOptions); err != nil {
return nil, err
}
}
addParams(route, action.Params)
ws.Route(route)
@ -927,6 +931,12 @@ func addObjectParams(ws *restful.WebService, route *restful.RouteBuilder, obj in
}
switch sf.Type.Kind() {
case reflect.Interface, reflect.Struct:
case reflect.Ptr:
// TODO: This is a hack to let unversioned.Time through. This needs to be fixed in a more generic way eventually. bug #36191
if (sf.Type.Elem().Kind() == reflect.Interface || sf.Type.Elem().Kind() == reflect.Struct) && strings.TrimPrefix(sf.Type.String(), "*") != "unversioned.Time" {
continue
}
fallthrough
default:
jsonTag := sf.Tag.Get("json")
if len(jsonTag) == 0 {

View File

@ -739,7 +739,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
}
// DeleteResource returns a function that will handle a resource deletion
func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, admit admission.Interface) restful.RouteFunction {
func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
// For performance tracking purposes.
trace := util.NewTrace("Delete " + req.Request.URL.Path)
@ -759,7 +759,7 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope,
ctx = api.WithNamespace(ctx, namespace)
options := &api.DeleteOptions{}
if checkBody {
if allowsOptions {
body, err := readBody(req.Request)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
@ -781,6 +781,13 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope,
scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), res.ResponseWriter, req.Request)
return
}
} else {
if values := req.Request.URL.Query(); len(values) > 0 {
if err := scope.ParameterCodec.DecodeParameters(values, scope.Kind.GroupVersion(), options); err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
}
}
}

View File

@ -21,7 +21,6 @@ import (
"fmt"
"io"
"net"
"net/url"
"regexp"
"strconv"
"strings"
@ -299,28 +298,31 @@ type Volumes interface {
// Attach the disk to the specified instance
// instanceName can be empty to mean "the instance on which we are running"
// Returns the device (e.g. /dev/xvdf) where we attached the volume
AttachDisk(diskName string, instanceName string, readOnly bool) (string, error)
// Detach the disk from the specified instance
// instanceName can be empty to mean "the instance on which we are running"
AttachDisk(diskName KubernetesVolumeID, nodeName string, readOnly bool) (string, error)
// Detach the disk from the node with the specified NodeName
// nodeName can be empty to mean "the instance on which we are running"
// Returns the device where the volume was attached
DetachDisk(diskName string, instanceName string) (string, error)
DetachDisk(diskName KubernetesVolumeID, nodeName string) (string, error)
// Create a volume with the specified options
CreateDisk(volumeOptions *VolumeOptions) (volumeName string, err error)
CreateDisk(volumeOptions *VolumeOptions) (volumeName KubernetesVolumeID, err error)
// Delete the specified volume
// Returns true iff the volume was deleted
// If the was not found, returns (false, nil)
DeleteDisk(volumeName string) (bool, error)
DeleteDisk(volumeName KubernetesVolumeID) (bool, error)
// Get labels to apply to volume on creation
GetVolumeLabels(volumeName string) (map[string]string, error)
GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error)
// Get volume's disk path from volume name
// return the device path where the volume is attached
GetDiskPath(volumeName string) (string, error)
GetDiskPath(volumeName KubernetesVolumeID) (string, error)
// Check if the volume is already attached to the instance
DiskIsAttached(diskName, instanceID string) (bool, error)
// Check if the volume is already attached to the node with the specified NodeName
DiskIsAttached(diskName KubernetesVolumeID, nodeName string) (bool, error)
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []KubernetesVolumeID, nodeName string) (map[KubernetesVolumeID]bool, error)
}
// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
@ -362,7 +364,7 @@ type Cloud struct {
// attached, to avoid a race condition where we assign a device mapping
// and then get a second request before we attach the volume
attachingMutex sync.Mutex
attaching map[ /*nodeName*/ string]map[mountDevice]string
attaching map[string]map[mountDevice]awsVolumeID
}
var _ Volumes = &Cloud{}
@ -795,7 +797,7 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
cfg: cfg,
region: regionName,
attaching: make(map[string]map[mountDevice]string),
attaching: make(map[string]map[mountDevice]awsVolumeID),
}
selfAWSInstance, err := awsCloud.buildSelfAWSInstance()
@ -1166,7 +1168,7 @@ func (i *awsInstance) describeInstance() (*ec2.Instance, error) {
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
func (c *Cloud) getMountDevice(i *awsInstance, volumeID awsVolumeID, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
instanceType := i.getInstanceType()
if instanceType == nil {
return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID)
@ -1176,7 +1178,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
if err != nil {
return "", false, err
}
deviceMappings := map[mountDevice]string{}
deviceMappings := map[mountDevice]awsVolumeID{}
for _, blockDevice := range info.BlockDeviceMappings {
name := aws.StringValue(blockDevice.DeviceName)
if strings.HasPrefix(name, "/dev/sd") {
@ -1188,7 +1190,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
if len(name) < 1 || len(name) > 2 {
glog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName))
}
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
deviceMappings[mountDevice(name)] = awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
}
// We lock to prevent concurrent mounts from conflicting
@ -1234,7 +1236,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
attaching := c.attaching[i.nodeName]
if attaching == nil {
attaching = make(map[mountDevice]string)
attaching = make(map[mountDevice]awsVolumeID)
c.attaching[i.nodeName] = attaching
}
attaching[chosen] = volumeID
@ -1245,7 +1247,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
// endAttaching removes the entry from the "attachments in progress" map
// It returns true if it was found (and removed), false otherwise
func (c *Cloud) endAttaching(i *awsInstance, volumeID string, mountDevice mountDevice) bool {
func (c *Cloud) endAttaching(i *awsInstance, volumeID awsVolumeID, mountDevice mountDevice) bool {
c.attachingMutex.Lock()
defer c.attachingMutex.Unlock()
@ -1266,44 +1268,16 @@ type awsDisk struct {
ec2 EC2
// Name in k8s
name string
name KubernetesVolumeID
// id in AWS
awsID string
awsID awsVolumeID
}
func newAWSDisk(aws *Cloud, name string) (*awsDisk, error) {
// name looks like aws://availability-zone/id
// The original idea of the URL-style name was to put the AZ into the
// host, so we could find the AZ immediately from the name without
// querying the API. But it turns out we don't actually need it for
// multi-AZ clusters, as we put the AZ into the labels on the PV instead.
// However, if in future we want to support multi-AZ cluster
// volume-awareness without using PersistentVolumes, we likely will
// want the AZ in the host.
if !strings.HasPrefix(name, "aws://") {
name = "aws://" + "" + "/" + name
}
url, err := url.Parse(name)
func newAWSDisk(aws *Cloud, name KubernetesVolumeID) (*awsDisk, error) {
awsID, err := name.mapToAWSVolumeID()
if err != nil {
// TODO: Maybe we should pass a URL into the Volume functions
return nil, fmt.Errorf("Invalid disk name (%s): %v", name, err)
return nil, err
}
if url.Scheme != "aws" {
return nil, fmt.Errorf("Invalid scheme for AWS volume (%s)", name)
}
awsID := url.Path
if len(awsID) > 1 && awsID[0] == '/' {
awsID = awsID[1:]
}
// TODO: Regex match?
if strings.Contains(awsID, "/") || !strings.HasPrefix(awsID, "vol-") {
return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name)
}
disk := &awsDisk{ec2: aws.ec2, name: name, awsID: awsID}
return disk, nil
}
@ -1313,7 +1287,7 @@ func (d *awsDisk) describeVolume() (*ec2.Volume, error) {
volumeID := d.awsID
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{&volumeID},
VolumeIds: []*string{volumeID.awsString()},
}
volumes, err := d.ec2.DescribeVolumes(request)
@ -1394,7 +1368,7 @@ func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment,
// Deletes the EBS disk
func (d *awsDisk) deleteVolume() (bool, error) {
request := &ec2.DeleteVolumeInput{VolumeId: aws.String(d.awsID)}
request := &ec2.DeleteVolumeInput{VolumeId: d.awsID.awsString()}
_, err := d.ec2.DeleteVolume(request)
if err != nil {
if awsError, ok := err.(awserr.Error); ok {
@ -1451,7 +1425,7 @@ func (c *Cloud) getAwsInstance(nodeName string) (*awsInstance, error) {
}
// AttachDisk implements Volumes.AttachDisk
func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) (string, error) {
func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, instanceName string, readOnly bool) (string, error) {
disk, err := newAWSDisk(c, diskName)
if err != nil {
return "", err
@ -1499,7 +1473,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
request := &ec2.AttachVolumeInput{
Device: aws.String(ec2Device),
InstanceId: aws.String(awsInstance.awsID),
VolumeId: aws.String(disk.awsID),
VolumeId: disk.awsID.awsString(),
}
attachResponse, err := c.ec2.AttachVolume(request)
@ -1538,7 +1512,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
}
// DetachDisk implements Volumes.DetachDisk
func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error) {
func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, instanceName string) (string, error) {
disk, err := newAWSDisk(c, diskName)
if err != nil {
return "", err
@ -1570,7 +1544,7 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
request := ec2.DetachVolumeInput{
InstanceId: &awsInstance.awsID,
VolumeId: &disk.awsID,
VolumeId: disk.awsID.awsString(),
}
response, err := c.ec2.DetachVolume(&request)
@ -1601,7 +1575,7 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
}
// CreateDisk implements Volumes.CreateDisk
func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, error) {
allZones, err := c.getAllZones()
if err != nil {
return "", fmt.Errorf("error querying for all zones: %v", err)
@ -1659,10 +1633,11 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
return "", err
}
az := orEmpty(response.AvailabilityZone)
awsID := orEmpty(response.VolumeId)
volumeName := "aws://" + az + "/" + awsID
awsID := awsVolumeID(aws.StringValue(response.VolumeId))
if awsID == "" {
return "", fmt.Errorf("VolumeID was not returned by CreateVolume")
}
volumeName := KubernetesVolumeID("aws://" + aws.StringValue(response.AvailabilityZone) + "/" + string(awsID))
// apply tags
tags := make(map[string]string)
@ -1675,7 +1650,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
}
if len(tags) != 0 {
if err := c.createTags(awsID, tags); err != nil {
if err := c.createTags(string(awsID), tags); err != nil {
// delete the volume and hope it succeeds
_, delerr := c.DeleteDisk(volumeName)
if delerr != nil {
@ -1689,7 +1664,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
}
// DeleteDisk implements Volumes.DeleteDisk
func (c *Cloud) DeleteDisk(volumeName string) (bool, error) {
func (c *Cloud) DeleteDisk(volumeName KubernetesVolumeID) (bool, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return false, err
@ -1698,7 +1673,7 @@ func (c *Cloud) DeleteDisk(volumeName string) (bool, error) {
}
// GetVolumeLabels implements Volumes.GetVolumeLabels
func (c *Cloud) GetVolumeLabels(volumeName string) (map[string]string, error) {
func (c *Cloud) GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return nil, err
@ -1724,7 +1699,7 @@ func (c *Cloud) GetVolumeLabels(volumeName string) (map[string]string, error) {
}
// GetDiskPath implements Volumes.GetDiskPath
func (c *Cloud) GetDiskPath(volumeName string) (string, error) {
func (c *Cloud) GetDiskPath(volumeName KubernetesVolumeID) (string, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return "", err
@ -1740,14 +1715,14 @@ func (c *Cloud) GetDiskPath(volumeName string) (string, error) {
}
// DiskIsAttached implements Volumes.DiskIsAttached
func (c *Cloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
awsInstance, err := c.getAwsInstance(instanceID)
func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName string) (bool, error) {
awsInstance, err := c.getAwsInstance(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DiskIsAttached will assume disk %q is not attached to it.",
instanceID,
nodeName,
diskName)
return false, nil
}
@ -1755,19 +1730,64 @@ func (c *Cloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
return false, err
}
diskID, err := diskName.mapToAWSVolumeID()
if err != nil {
return false, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}
info, err := awsInstance.describeInstance()
if err != nil {
return false, err
}
for _, blockDevice := range info.BlockDeviceMappings {
name := aws.StringValue(blockDevice.Ebs.VolumeId)
if name == diskName {
id := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
if id == diskID {
return true, nil
}
}
return false, nil
}
func (c *Cloud) DisksAreAttached(diskNames []KubernetesVolumeID, nodeName string) (map[KubernetesVolumeID]bool, error) {
idToDiskName := make(map[awsVolumeID]KubernetesVolumeID)
attached := make(map[KubernetesVolumeID]bool)
for _, diskName := range diskNames {
volumeID, err := diskName.mapToAWSVolumeID()
if err != nil {
return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}
idToDiskName[volumeID] = diskName
attached[diskName] = false
}
awsInstance, err := c.getAwsInstance(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.",
nodeName,
diskNames)
return attached, nil
}
return attached, err
}
info, err := awsInstance.describeInstance()
if err != nil {
return attached, err
}
for _, blockDevice := range info.BlockDeviceMappings {
volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
diskName, found := idToDiskName[volumeID]
if found {
// Disk is still attached to node
attached[diskName] = true
}
}
return attached, nil
}
// Gets the current load balancer state
func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) {
request := &elb.DescribeLoadBalancersInput{}

View File

@ -0,0 +1,83 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"net/url"
"strings"
)
// awsVolumeID represents the ID of the volume in the AWS API, e.g. vol-12345678a
// The "traditional" format is "vol-12345678"
// A new longer format is also being introduced: "vol-12345678abcdef01"
// We should not assume anything about the length or format, though it seems
// reasonable to assume that volumes will continue to start with "vol-".
type awsVolumeID string
func (i awsVolumeID) awsString() *string {
return aws.String(string(i))
}
// KubernetesVolumeID represents the id for a volume in the kubernetes API;
// a few forms are recognized:
// * aws://<zone>/<awsVolumeId>
// * aws:///<awsVolumeId>
// * <awsVolumeId>
type KubernetesVolumeID string
// mapToAWSVolumeID extracts the awsVolumeID from the KubernetesVolumeID
func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) {
// name looks like aws://availability-zone/awsVolumeId
// The original idea of the URL-style name was to put the AZ into the
// host, so we could find the AZ immediately from the name without
// querying the API. But it turns out we don't actually need it for
// multi-AZ clusters, as we put the AZ into the labels on the PV instead.
// However, if in future we want to support multi-AZ cluster
// volume-awareness without using PersistentVolumes, we likely will
// want the AZ in the host.
s := string(name)
if !strings.HasPrefix(s, "aws://") {
// Assume a bare aws volume id (vol-1234...)
// Build a URL with an empty host (AZ)
s = "aws://" + "" + "/" + s
}
url, err := url.Parse(s)
if err != nil {
// TODO: Maybe we should pass a URL into the Volume functions
return "", fmt.Errorf("Invalid disk name (%s): %v", name, err)
}
if url.Scheme != "aws" {
return "", fmt.Errorf("Invalid scheme for AWS volume (%s)", name)
}
awsID := url.Path
awsID = strings.Trim(awsID, "/")
// We sanity check the resulting volume; the two known formats are
// vol-12345678 and vol-12345678abcdef01
// TODO: Regex match?
if strings.Contains(awsID, "/") || !strings.HasPrefix(awsID, "vol-") {
return "", fmt.Errorf("Invalid format for AWS volume (%s)", name)
}
return awsVolumeID(awsID), nil
}

View File

@ -73,6 +73,34 @@ func (az *Cloud) AttachDisk(diskName, diskURI, vmName string, lun int32, caching
return err
}
// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName
func (az *Cloud) DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
vm, exists, err := az.getVirtualMachine(nodeName)
if !exists {
// if host doesn't exist, no need to detach
glog.Warningf("Cannot find node %q, DisksAreAttached will assume disks %v are not attached to it.",
nodeName, diskNames)
return attached, nil
} else if err != nil {
return attached, err
}
disks := *vm.Properties.StorageProfile.DataDisks
for _, disk := range disks {
for _, diskName := range diskNames {
if disk.Name != nil && diskName != "" && *disk.Name == diskName {
attached[diskName] = true
}
}
}
return attached, nil
}
// DetachDiskByName detaches a vhd from host
// the vhd can be identified by diskName or diskURI
func (az *Cloud) DetachDiskByName(diskName, diskURI, vmName string) error {

View File

@ -132,6 +132,10 @@ type Disks interface {
// DiskIsAttached checks if a disk is attached to the given node.
DiskIsAttached(diskName, instanceID string) (bool, error)
// DisksAreAttached is a batch function to check if a list of disks are attached
// to the node with the specified NodeName.
DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error)
// CreateDisk creates a new PD with given properties. Tags are serialized
// as JSON into Description field.
CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error
@ -2578,6 +2582,37 @@ func (gce *GCECloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
return false, nil
}
func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
instance, err := gce.getInstanceByName(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DisksAreAttached will assume PD %v are not attached to it.",
nodeName,
diskNames)
return attached, nil
}
return attached, err
}
for _, instanceDisk := range instance.Disks {
for _, diskName := range diskNames {
if instanceDisk.DeviceName == diskName {
// Disk is still attached to node
attached[diskName] = true
}
}
}
return attached, nil
}
// Returns a gceDisk for the disk, if it is found in the specified zone.
// If not found, returns (nil, nil)
func (gce *GCECloud) findDiskByName(diskName string, zone string) (*gceDisk, error) {

View File

@ -228,3 +228,33 @@ func (os *OpenStack) DiskIsAttached(diskName, instanceID string) (bool, error) {
}
return false, nil
}
// query if a list of volumes are attached to a compute instance
func (os *OpenStack) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
for _, diskName := range diskNames {
disk, err := os.getVolume(diskName)
if err != nil {
continue
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] {
attached[diskName] = true
}
}
return attached, nil
}
// diskIsUsed returns true a disk is attached to any node.
func (os *OpenStack) diskIsUsed(diskName string) (bool, error) {
disk, err := os.getVolume(diskName)
if err != nil {
return false, err
}
if len(disk.Attachments) > 0 {
return true, nil
}
return false, nil
}

View File

@ -645,3 +645,24 @@ func (rs *Rackspace) DiskIsAttached(diskName, instanceID string) (bool, error) {
}
return false, nil
}
// query if a list volumes are attached to a compute instance
func (rs *Rackspace) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
var returnedErr error
for _, diskName := range diskNames {
result, err := rs.DiskIsAttached(diskName, instanceID)
if err != nil {
returnedErr = fmt.Errorf("Error in checking disk %q attached: %v \n %v", diskName, err, returnedErr)
continue
}
if result {
attached[diskName] = true
}
}
return attached, returnedErr
}

View File

@ -125,6 +125,10 @@ type Volumes interface {
// Assumption: If node doesn't exist, disk is not attached to the node.
DiskIsAttached(volPath, nodeName string) (bool, error)
// DisksAreAttached checks if a list disks are attached to the given node.
// Assumption: If node doesn't exist, disks are not attached to the node.
DisksAreAttached(volPath []string, nodeName string) (map[string]bool, error)
// CreateVolume creates a new vmdk with specified parameters.
CreateVolume(name string, size int, tags *map[string]string) (volumePath string, err error)
@ -874,6 +878,64 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName string) (bool, error)
return attached, err
}
// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin.
func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName string) (map[string]bool, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
attached := make(map[string]bool)
for _, volPath := range volPaths {
attached[volPath] = false
}
// Create vSphere client
c, err := vsphereLogin(vs.cfg, ctx)
if err != nil {
glog.Errorf("Failed to create vSphere client. err: %s", err)
return attached, err
}
defer c.Logout(ctx)
// Find VM to detach disk from
var vSphereInstance string
if nodeName == "" {
vSphereInstance = vs.localInstanceID
} else {
vSphereInstance = nodeName
}
nodeExist, err := vs.NodeExists(c, vSphereInstance)
if err != nil {
glog.Errorf("Failed to check whether node exist. err: %s.", err)
return attached, err
}
if !nodeExist {
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume vmdk %v are not attached to it.",
vSphereInstance,
volPaths)
return attached, nil
}
// Get VM device list
_, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance)
if err != nil {
glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err)
return attached, err
}
for _, volPath := range volPaths {
result, _ := checkDiskAttached(volPath, vmDevices, dc, c)
if result {
attached[volPath] = true
}
}
return attached, err
}
func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (bool, error) {
virtualDiskControllerKey, err := getVirtualDiskControllerKey(volPath, vmdevices, dc, client)
if err != nil {
@ -884,7 +946,7 @@ func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *o
return false, err
}
for _, controllerType := range supportedSCSIControllerType {
controllerkey, _ := getControllerKey(controllerType, vmdevices, dc, client)
controllerkey, _ := getControllerKey(controllerType, vmdevices)
if controllerkey == virtualDiskControllerKey {
return true, nil
}
@ -916,7 +978,7 @@ func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceL
// Returns key of the controller.
// Key is unique id that distinguishes one device from other devices in the same virtual machine.
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) {
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList) (int32, error) {
for _, device := range vmDevices {
devType := vmDevices.Type(device)
if devType == scsiType {

View File

@ -168,7 +168,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
return
}
if len(times) > 1 {
glog.Errorf("Multiple unmet start times for %s so only starting last one", nameForLog)
glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
}
scheduledTime := times[len(times)-1]
tooLate := false
@ -176,7 +176,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
}
if tooLate {
glog.Errorf("Missed starting window for %s", nameForLog)
glog.V(4).Infof("Missed starting window for %s", nameForLog)
// TODO: generate an event for a miss. Use a warning level event because it indicates a
// problem with the controller (restart or long queue), and is not expected by user either.
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
@ -198,14 +198,14 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
// TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid.", nameForLog)
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return
}
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
for _, j := range sj.Status.Active {
// TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog)
glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedGet", "Get job: %v", err)
@ -241,11 +241,14 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
return
}
// ... and the job itself
// ... the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
glog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
return
}
// ... and its reference from active list
deleteFromActiveList(&sj, job.ObjectMeta.UID)
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name)
}
}
@ -260,6 +263,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return
}
glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ //

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduledjob
import (
"fmt"
"sync"
"k8s.io/kubernetes/pkg/api"
@ -127,6 +128,7 @@ func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job
if f.Err != nil {
return nil, f.Err
}
job.SelfLink = fmt.Sprintf("/api/batch/v1/namespaces/%s/jobs/%s", namespace, job.Name)
f.Jobs = append(f.Jobs, *job)
job.UID = "test-uid"
return job, nil

View File

@ -56,6 +56,10 @@ const (
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute
// reconcilerSyncDuration is the amount of time the reconciler sync states loop
// wait between successive executions
reconcilerSyncDuration time.Duration = 5 * time.Second
)
// AttachDetachController defines the operations supported by this controller.
@ -122,6 +126,7 @@ func NewAttachDetachController(
adc.reconciler = reconciler.NewReconciler(
reconcilerLoopPeriod,
reconcilerMaxWaitForUnmountDuration,
reconcilerSyncDuration,
adc.desiredStateOfWorld,
adc.actualStateOfWorld,
adc.attacherDetacher,

View File

@ -105,6 +105,8 @@ type ActualStateOfWorld interface {
// based on the current actual state of the world.
GetAttachedVolumesForNode(nodeName string) []AttachedVolume
GetAttachedVolumesPerNode() map[string][]operationexecutor.AttachedVolume
// GetVolumesToReportAttached returns a map containing the set of nodes for
// which the VolumesAttached Status field in the Node API object should be
// updated. The key in this map is the name of the node to update and the
@ -569,6 +571,25 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[string][]api.Att
return volumesToReportAttached
}
func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[string][]operationexecutor.AttachedVolume {
asw.RLock()
defer asw.RUnlock()
attachedVolumesPerNode := make(map[string][]operationexecutor.AttachedVolume)
for _, volumeObj := range asw.attachedVolumes {
for nodeName, nodeObj := range volumeObj.nodesAttachedTo {
volumes, exists := attachedVolumesPerNode[nodeName]
if !exists {
volumes = []operationexecutor.AttachedVolume{}
}
volumes = append(volumes, getAttachedVolume(&volumeObj, &nodeObj).AttachedVolume)
attachedVolumesPerNode[nodeName] = volumes
}
}
return attachedVolumesPerNode
}
func getAttachedVolume(
attachedVolume *attachedVolume,
nodeAttachedTo *nodeAttachedTo) AttachedVolume {

View File

@ -56,6 +56,7 @@ type Reconciler interface {
func NewReconciler(
loopPeriod time.Duration,
maxWaitForUnmountDuration time.Duration,
syncDuration time.Duration,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
attacherDetacher operationexecutor.OperationExecutor,
@ -63,20 +64,24 @@ func NewReconciler(
return &reconciler{
loopPeriod: loopPeriod,
maxWaitForUnmountDuration: maxWaitForUnmountDuration,
syncDuration: syncDuration,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
attacherDetacher: attacherDetacher,
nodeStatusUpdater: nodeStatusUpdater,
timeOfLastSync: time.Now(),
}
}
type reconciler struct {
loopPeriod time.Duration
maxWaitForUnmountDuration time.Duration
syncDuration time.Duration
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
attacherDetacher operationexecutor.OperationExecutor
nodeStatusUpdater statusupdater.NodeStatusUpdater
timeOfLastSync time.Time
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
@ -85,107 +90,135 @@ func (rc *reconciler) Run(stopCh <-chan struct{}) {
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached.
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err)
continue
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
if attachedVolume.MountedByNode && !timeout {
glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted",
attachedVolume.VolumeName,
attachedVolume.NodeName)
continue
}
// Before triggering volume detach, mark volume as detached and update the node status
// If it fails to update node status, skip detach volume
rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
// Skip detaching this volume if unable to update node status
glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ",
attachedVolume.VolumeName,
attachedVolume.NodeName,
err)
continue
}
// Trigger detach volume which requires verifing safe to detach step
// If timeout is true, skip verifySafeToDetach check
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
verifySafeToDetach := !timeout
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
if err == nil {
if !timeout {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
} else {
glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching",
attachedVolume.VolumeName,
attachedVolume.NodeName,
rc.maxWaitForUnmountDuration)
}
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
attachedVolume.NodeName,
err)
}
}
}
// Ensure volumes that should be attached are attached.
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
if rc.actualStateOfWorld.VolumeNodeExists(
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
} else {
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v",
volumeToAttach.VolumeName,
volumeToAttach.VolumeSpec.Name(),
volumeToAttach.NodeName,
err)
}
}
}
// Update Node Status
err := rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
glog.Infof("UpdateNodeStatuses failed with: %v", err)
rc.reconcile()
// reconciler periodically checks whether the attached volumes from actual state
// are still attached to the node and udpate the status if they are not.
if time.Since(rc.timeOfLastSync) > rc.syncDuration {
rc.sync()
}
}
}
func (rc *reconciler) sync() {
defer rc.updateSyncTime()
rc.syncStates()
}
func (rc *reconciler) updateSyncTime() {
rc.timeOfLastSync = time.Now()
}
func (rc *reconciler) syncStates() {
volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode()
for nodeName, volumes := range volumesPerNode {
err := rc.attacherDetacher.VerifyVolumesAreAttached(volumes, nodeName, rc.actualStateOfWorld)
if err != nil {
glog.Errorf("Error in syncing states for volumes: %v", err)
}
}
}
func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached.
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err)
continue
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
if attachedVolume.MountedByNode && !timeout {
glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted",
attachedVolume.VolumeName,
attachedVolume.NodeName)
continue
}
// Before triggering volume detach, mark volume as detached and update the node status
// If it fails to update node status, skip detach volume
rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
// Skip detaching this volume if unable to update node status
glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ",
attachedVolume.VolumeName,
attachedVolume.NodeName,
err)
continue
}
// Trigger detach volume which requires verifing safe to detach step
// If timeout is true, skip verifySafeToDetach check
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
verifySafeToDetach := !timeout
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
if err == nil {
if !timeout {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
} else {
glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching",
attachedVolume.VolumeName,
attachedVolume.NodeName,
rc.maxWaitForUnmountDuration)
}
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
attachedVolume.NodeName,
err)
}
}
}
// Ensure volumes that should be attached are attached.
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
if rc.actualStateOfWorld.VolumeNodeExists(
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
} else {
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(4).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v",
volumeToAttach.VolumeName,
volumeToAttach.VolumeSpec.Name(),
volumeToAttach.NodeName,
err)
}
}
}
// Update Node Status
err := rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
glog.Infof("UpdateNodeStatuses failed with: %v", err)
}
}

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
@ -381,14 +382,8 @@ func (cm *containerManagerImpl) setupNode() error {
return fmt.Errorf("system container cannot be root (\"/\")")
}
cont := newSystemCgroups(cm.SystemCgroupsName)
rootContainer := &fs.Manager{
Cgroups: &configs.Cgroup{
Parent: "/",
Name: "/",
},
}
cont.ensureStateFunc = func(manager *fs.Manager) error {
return ensureSystemCgroups(rootContainer, manager)
return ensureSystemCgroups("/", manager)
}
systemContainers = append(systemContainers, cont)
}
@ -681,7 +676,7 @@ func getContainer(pid int) (string, error) {
// The reason of leaving kernel threads at root cgroup is that we don't want to tie the
// execution of these threads with to-be defined /system quota and create priority inversions.
//
func ensureSystemCgroups(rootContainer *fs.Manager, manager *fs.Manager) error {
func ensureSystemCgroups(rootCgroupPath string, manager *fs.Manager) error {
// Move non-kernel PIDs to the system container.
attemptsRemaining := 10
var errs []error
@ -690,7 +685,7 @@ func ensureSystemCgroups(rootContainer *fs.Manager, manager *fs.Manager) error {
errs = []error{}
attemptsRemaining--
allPids, err := rootContainer.GetPids()
allPids, err := cmutil.GetPids(rootCgroupPath)
if err != nil {
errs = append(errs, fmt.Errorf("failed to list PIDs for root: %v", err))
continue

View File

@ -0,0 +1,76 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"path/filepath"
libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups"
libcontainerutils "github.com/opencontainers/runc/libcontainer/utils"
)
// Forked from opencontainers/runc/libcontainer/cgroup/fs.Manager.GetPids()
func GetPids(cgroupPath string) ([]int, error) {
dir, err := getCgroupPath(cgroupPath)
if err != nil {
return nil, err
}
return libcontainercgroups.GetPids(dir)
}
// getCgroupPath gets the file path to the "devices" subsystem of the desired cgroup.
// cgroupPath is the path in the cgroup hierarchy.
func getCgroupPath(cgroupPath string) (string, error) {
cgroupPath = libcontainerutils.CleanPath(cgroupPath)
mnt, root, err := libcontainercgroups.FindCgroupMountpointAndRoot("devices")
// If we didn't mount the subsystem, there is no point we make the path.
if err != nil {
return "", err
}
// If the cgroup name/path is absolute do not look relative to the cgroup of the init process.
if filepath.IsAbs(cgroupPath) {
// Sometimes subsystems can be mounted togethger as 'cpu,cpuacct'.
return filepath.Join(root, mnt, cgroupPath), nil
}
parentPath, err := getCgroupParentPath(mnt, root)
if err != nil {
return "", err
}
return filepath.Join(parentPath, cgroupPath), nil
}
// getCgroupParentPath gets the parent filepath to this cgroup, for resolving relative cgroup paths.
func getCgroupParentPath(mountpoint, root string) (string, error) {
// Use GetThisCgroupDir instead of GetInitCgroupDir, because the creating
// process could in container and shared pid namespace with host, and
// /proc/1/cgroup could point to whole other world of cgroups.
initPath, err := libcontainercgroups.GetThisCgroupDir("devices")
if err != nil {
return "", err
}
// This is needed for nested containers, because in /proc/self/cgroup we
// see paths from host, which don't exist in container.
relDir, err := filepath.Rel(root, initPath)
if err != nil {
return "", err
}
return filepath.Join(mountpoint, relDir), nil
}

View File

@ -0,0 +1,23 @@
// +build !linux
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
func GetPids(cgroupPath string) ([]int, error) {
return nil, nil
}

View File

@ -28,6 +28,7 @@ import (
"strings"
appcschema "github.com/appc/spec/schema"
appctypes "github.com/appc/spec/schema/types"
rktapi "github.com/coreos/rkt/api/v1alpha"
dockertypes "github.com/docker/engine-api/types"
"github.com/golang/glog"
@ -124,7 +125,25 @@ func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error {
}
// buildImageName constructs the image name for kubecontainer.Image.
// If the annotations contain the docker2aci metadata for this image, those are
// used instead as they may be more accurate in some cases, namely if a
// non-appc valid character is present
func buildImageName(img *rktapi.Image) string {
registry := ""
repository := ""
for _, anno := range img.Annotations {
if anno.Key == appcDockerRegistryURL {
registry = anno.Value
}
if anno.Key == appcDockerRepository {
repository = anno.Value
}
}
if registry != "" && repository != "" {
// TODO(euank): This could do the special casing for dockerhub and library images
return fmt.Sprintf("%s/%s:%s", registry, repository, img.Version)
}
return fmt.Sprintf("%s:%s", img.Name, img.Version)
}
@ -160,19 +179,34 @@ func (r *Runtime) listImages(image string, detail bool) ([]*rktapi.Image, error)
return nil, err
}
imageFilters := []*rktapi.ImageFilter{
{
// TODO(yifan): Add a field in the ImageFilter to match the whole name,
// not just keywords.
// https://github.com/coreos/rkt/issues/1872#issuecomment-166456938
Keywords: []string{repoToPull},
Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}},
},
}
// If the repo name is not a valid ACIdentifier (namely if it has a port),
// then it will have a different name in the store. Search for both the
// original name and this modified name in case we choose to also change the
// api-service to do this un-conversion on its end.
if appcRepoToPull, err := appctypes.SanitizeACIdentifier(repoToPull); err != nil {
glog.Warningf("could not convert %v to an aci identifier: %v", err)
} else {
imageFilters = append(imageFilters, &rktapi.ImageFilter{
Keywords: []string{appcRepoToPull},
Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}},
})
}
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
defer cancel()
listResp, err := r.apisvc.ListImages(ctx, &rktapi.ListImagesRequest{
Detail: detail,
Filters: []*rktapi.ImageFilter{
{
// TODO(yifan): Add a field in the ImageFilter to match the whole name,
// not just keywords.
// https://github.com/coreos/rkt/issues/1872#issuecomment-166456938
Keywords: []string{repoToPull},
Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}},
},
},
Detail: detail,
Filters: imageFilters,
})
if err != nil {
return nil, fmt.Errorf("couldn't list images: %v", err)

View File

@ -116,9 +116,11 @@ const (
defaultDNSOption = "ndots:5"
// Annotations for the ENTRYPOINT and CMD for an ACI that's converted from Docker image.
// TODO(yifan): Import them from docker2aci. See https://github.com/appc/docker2aci/issues/133.
appcDockerEntrypoint = "appc.io/docker/entrypoint"
appcDockerCmd = "appc.io/docker/cmd"
// Taken from https://github.com/appc/docker2aci/blob/v0.12.3/lib/common/common.go#L33
appcDockerEntrypoint = "appc.io/docker/entrypoint"
appcDockerCmd = "appc.io/docker/cmd"
appcDockerRegistryURL = "appc.io/docker/registryurl"
appcDockerRepository = "appc.io/docker/repository"
// TODO(yifan): Reuse this const with Docker runtime.
minimumGracePeriodInSeconds = 2

View File

@ -370,7 +370,6 @@ func (asw *actualStateOfWorld) addVolume(
} else {
// If volume object already exists, update the fields such as device path
volumeObj.devicePath = devicePath
volumeObj.spec = volumeSpec
glog.V(2).Infof("Volume %q is already added to attachedVolume list, update device path %q",
volumeName,
devicePath)

View File

@ -473,6 +473,7 @@ func (rc *reconciler) syncStates(podsDir string) {
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
glog.Errorf("Error occurred during reconstruct volume from disk: %v", err)
@ -491,10 +492,6 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
if err != nil {
return nil, err
}
volumeName, err := plugin.GetVolumeName(volumeSpec)
if err != nil {
return nil, err
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: types.UID(volume.podName),
@ -504,6 +501,11 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
if err != nil {
return nil, err
}
volumeName, err := plugin.GetVolumeName(volumeSpec)
if err != nil {
return nil, err
}
var uniqueVolumeName api.UniqueVolumeName
if attachablePlugin != nil {
uniqueVolumeName = volumehelper.GetUniqueVolumeName(volume.pluginName, volumeName)
@ -526,10 +528,14 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
}
reconstructedVolume := &reconstructedVolume{
volumeName: uniqueVolumeName,
podName: volume.podName,
volumeSpec: volumeSpec,
outerVolumeSpecName: volumeName, /* volumeName is InnerVolumeSpecName. But this information will not be used for cleanup */
volumeName: uniqueVolumeName,
podName: volume.podName,
volumeSpec: volumeSpec,
// volume.volumeSpecName is actually InnerVolumeSpecName. But this information will likely to be updated in updateStates()
// by checking the desired state volumeToMount list and getting the real OuterVolumeSpecName.
// In case the pod is deleted during this period and desired state does not have this information, it will not be used
// for volume cleanup.
outerVolumeSpecName: volume.volumeSpecName,
pod: pod,
pluginIsAttachable: attachablePlugin != nil,
volumeGidValue: "",
@ -549,11 +555,22 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[api.UniqueVolumeName]*r
if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
volume.devicePath = attachedVolume.DevicePath
volumesNeedUpdate[attachedVolume.Name] = volume
glog.V(4).Infof("Get devicePath from node status for volume (%q): %q", attachedVolume.Name, volume.devicePath)
glog.V(4).Infof("Update devicePath from node status for volume (%q): %q", attachedVolume.Name, volume.devicePath)
}
}
}
// Get the list of volumes from desired state and update OuterVolumeSpecName if the information is avaiable
volumesToMount := rc.desiredStateOfWorld.GetVolumesToMount()
for _, volumeToMount := range volumesToMount {
if volume, exists := volumesNeedUpdate[volumeToMount.VolumeName]; exists {
volume.outerVolumeSpecName = volumeToMount.OuterVolumeSpecName
volumesNeedUpdate[volumeToMount.VolumeName] = volume
glog.V(4).Infof("Update OuterVolumeSpecName from desired state for volume (%q): %q",
volumeToMount.VolumeName, volume.outerVolumeSpecName)
}
}
for _, volume := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)

View File

@ -27,7 +27,7 @@ import (
)
const (
compactInterval = 10 * time.Minute
compactInterval = 5 * time.Minute
compactRevKey = "compact_rev_key"
)

View File

@ -23,11 +23,12 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"github.com/golang/glog"
"github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/configs"
)
func NewOOMAdjuster() *OOMAdjuster {
@ -40,13 +41,7 @@ func NewOOMAdjuster() *OOMAdjuster {
}
func getPids(cgroupName string) ([]int, error) {
fsManager := fs.Manager{
Cgroups: &configs.Cgroup{
Parent: "/",
Name: cgroupName,
},
}
return fsManager.GetPids()
return cmutil.GetPids(filepath.Join("/", cgroupName))
}
// Writes 'value' to /proc/<pid>/oom_score_adj. PID = 0 means self

View File

@ -51,7 +51,7 @@ var (
// semantic version is a git hash, but the version itself is no
// longer the direct output of "git describe", but a slight
// translation to be semver compliant.
gitVersion string = "v1.4.5+$Format:%h$"
gitVersion string = "v1.4.6+$Format:%h$"
gitCommit string = "$Format:%H$" // sha1 from git, output of $(git rev-parse HEAD)
gitTreeState string = "not a git tree" // state of git tree, either "clean" or "dirty"

View File

@ -63,7 +63,7 @@ func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, hostName
return "", err
}
volumeID := volumeSource.VolumeID
volumeID := aws.KubernetesVolumeID(volumeSource.VolumeID)
// awsCloud.AttachDisk checks if disk is already attached to node and
// succeeds in that case, so no need to do that separately.
@ -76,6 +76,41 @@ func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, hostName
return devicePath, nil
}
func (attacher *awsElasticBlockStoreAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName string) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[aws.KubernetesVolumeID]*volume.Spec)
volumeIDList := []aws.KubernetesVolumeID{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
name := aws.KubernetesVolumeID(volumeSource.VolumeID)
volumeIDList = append(volumeIDList, name)
volumesAttachedCheck[spec] = true
volumeSpecMap[name] = spec
}
attachedResult, err := attacher.awsVolumes.DisksAreAttached(volumeIDList, nodeName)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if volumes (%v) is already attached to current node (%q). err=%v",
volumeIDList, nodeName, err)
return volumesAttachedCheck, err
}
for volumeID, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumeID]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *awsElasticBlockStoreAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
@ -128,7 +163,7 @@ func (attacher *awsElasticBlockStoreAttacher) GetDeviceMountPath(
return "", err
}
return makeGlobalPDPath(attacher.host, volumeSource.VolumeID), nil
return makeGlobalPDPath(attacher.host, aws.KubernetesVolumeID(volumeSource.VolumeID)), nil
}
// FIXME: this method can be further pruned.
@ -186,7 +221,7 @@ func (plugin *awsElasticBlockStorePlugin) NewDetacher() (volume.Detacher, error)
}
func (detacher *awsElasticBlockStoreDetacher) Detach(deviceMountPath string, hostName string) error {
volumeID := path.Base(deviceMountPath)
volumeID := aws.KubernetesVolumeID(path.Base(deviceMountPath))
attached, err := detacher.awsVolumes.DiskIsAttached(volumeID, hostName)
if err != nil {

View File

@ -27,6 +27,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
@ -102,7 +103,7 @@ func (plugin *awsElasticBlockStorePlugin) newMounterInternal(spec *volume.Spec,
return nil, err
}
volumeID := ebs.VolumeID
volumeID := aws.KubernetesVolumeID(ebs.VolumeID)
fsType := ebs.FSType
partition := ""
if ebs.Partition != 0 {
@ -153,7 +154,7 @@ func (plugin *awsElasticBlockStorePlugin) newDeleterInternal(spec *volume.Spec,
return &awsElasticBlockStoreDeleter{
awsElasticBlockStore: &awsElasticBlockStore{
volName: spec.Name(),
volumeID: spec.PersistentVolume.Spec.AWSElasticBlockStore.VolumeID,
volumeID: aws.KubernetesVolumeID(spec.PersistentVolume.Spec.AWSElasticBlockStore.VolumeID),
manager: manager,
plugin: plugin,
}}, nil
@ -208,7 +209,7 @@ func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath
// Abstract interface to PD operations.
type ebsManager interface {
CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error)
CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID aws.KubernetesVolumeID, volumeSizeGB int, labels map[string]string, err error)
// Deletes a volume
DeleteVolume(deleter *awsElasticBlockStoreDeleter) error
}
@ -219,7 +220,7 @@ type awsElasticBlockStore struct {
volName string
podUID types.UID
// Unique id of the PD, used to find the disk resource in the provider.
volumeID string
volumeID aws.KubernetesVolumeID
// Specifies the partition to mount
partition string
// Utility interface that provides API calls to the provider to attach/detach disks.
@ -315,9 +316,9 @@ func (b *awsElasticBlockStoreMounter) SetUpAt(dir string, fsGroup *int64) error
return nil
}
func makeGlobalPDPath(host volume.VolumeHost, volumeID string) string {
func makeGlobalPDPath(host volume.VolumeHost, volumeID aws.KubernetesVolumeID) string {
// Clean up the URI to be more fs-friendly
name := volumeID
name := string(volumeID)
name = strings.Replace(name, "://", "/", -1)
return path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts", name)
}
@ -435,7 +436,7 @@ func (c *awsElasticBlockStoreProvisioner) Provision() (*api.PersistentVolume, er
},
PersistentVolumeSource: api.PersistentVolumeSource{
AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
VolumeID: volumeID,
VolumeID: string(volumeID),
FSType: "ext4",
Partition: 0,
ReadOnly: false,

View File

@ -62,7 +62,7 @@ func (util *AWSDiskUtil) DeleteVolume(d *awsElasticBlockStoreDeleter) error {
// CreateVolume creates an AWS EBS volume.
// Returns: volumeID, volumeSizeGB, labels, error
func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (string, int, map[string]string, error) {
func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (aws.KubernetesVolumeID, int, map[string]string, error) {
cloud, err := getCloudProvider(c.awsElasticBlockStore.plugin.host.GetCloudProvider())
if err != nil {
return "", 0, nil, err

View File

@ -114,6 +114,40 @@ func (attacher *azureDiskAttacher) Attach(spec *volume.Spec, hostName string) (s
return strconv.Itoa(int(lun)), err
}
func (attacher *azureDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName string) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumeIDList := []string{}
for _, spec := range specs {
volumeSource, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumeIDList = append(volumeIDList, volumeSource.DiskName)
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeSource.DiskName] = spec
}
attachedResult, err := attacher.azureProvider.DisksAreAttached(volumeIDList, nodeName)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if volumes (%v) are attached to current node (%q). err=%v",
volumeIDList, nodeName, err)
return volumesAttachedCheck, err
}
for volumeID, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumeID]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
}
}
return volumesAttachedCheck, nil
}
// WaitForAttach runs on the node to detect if the volume (referenced by LUN) is attached. If attached, the device path is returned
func (attacher *azureDiskAttacher) WaitForAttach(spec *volume.Spec, lunStr string, timeout time.Duration) (string, error) {
volumeSource, err := getVolumeSource(spec)

View File

@ -52,7 +52,9 @@ type azureCloudProvider interface {
// Attaches the disk to the host machine.
AttachDisk(diskName, diskUri, vmName string, lun int32, cachingMode compute.CachingTypes) error
// Detaches the disk, identified by disk name or uri, from the host machine.
DetachDiskByName(diskName, diskUri, vmName string) error
DetachDiskByName(diskName, diskUri string, nodeName string) error
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error)
// Get the LUN number of the disk that is attached to the host
GetDiskLun(diskName, diskUri, vmName string) (int32, error)
// Get the next available LUN number to attach a new VHD

View File

@ -108,6 +108,40 @@ func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, hostName string) (
return devicePath, err
}
func (attacher *cinderDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName string) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumeIDList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumeIDList = append(volumeIDList, volumeSource.VolumeID)
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeSource.VolumeID] = spec
}
attachedResult, err := attacher.cinderProvider.DisksAreAttached(volumeIDList, string(nodeName))
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if Volumes (%v) are already attached to current node (%q). Will continue and try attach anyway. err=%v",
volumeIDList, nodeName, err)
return volumesAttachedCheck, err
}
for volumeID, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumeID]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *cinderDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {

View File

@ -50,6 +50,7 @@ type CinderProvider interface {
InstanceID() (string, error)
GetAttachmentDiskPath(instanceID string, diskName string) (string, error)
DiskIsAttached(diskName, instanceID string) (bool, error)
DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error)
Instances() (cloudprovider.Instances, bool)
}

View File

@ -95,6 +95,40 @@ func (attacher *gcePersistentDiskAttacher) Attach(spec *volume.Spec, hostName st
return path.Join(diskByIdPath, diskGooglePrefix+pdName), nil
}
func (attacher *gcePersistentDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName string) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumePdNameMap := make(map[string]*volume.Spec)
pdNameList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
// If error is occured, skip this volume and move to the next one
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
pdNameList = append(pdNameList, volumeSource.PDName)
volumesAttachedCheck[spec] = true
volumePdNameMap[volumeSource.PDName] = spec
}
attachedResult, err := attacher.gceDisks.DisksAreAttached(pdNameList, nodeName)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if PDs (%v) are already attached to current node (%q). err=%v",
pdNameList, nodeName, err)
return volumesAttachedCheck, err
}
for pdName, attached := range attachedResult {
if !attached {
spec := volumePdNameMap[pdName]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", pdName, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *gcePersistentDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
@ -207,7 +241,7 @@ func (plugin *gcePersistentDiskPlugin) NewDetacher() (volume.Detacher, error) {
// attached to the specified node. If the volume is not attached, it succeeds
// (returns nil). If it is attached, Detach issues a call to the GCE cloud
// provider to attach it.
// Callers are responsible for retryinging on failure.
// Callers are responsible for retrying on failure.
// Callers are responsible for thread safety between concurrent attach and detach
// operations.
func (detacher *gcePersistentDiskDetacher) Detach(deviceMountPath string, hostName string) error {

View File

@ -18,6 +18,11 @@ package glusterfs
import (
"fmt"
"os"
"path"
"strconv"
dstrings "strings"
"github.com/golang/glog"
gcli "github.com/heketi/heketi/client/api/go-client"
gapi "github.com/heketi/heketi/pkg/glusterfs/api"
@ -28,10 +33,6 @@ import (
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"os"
"path"
"strconv"
dstrings "strings"
)
// This is the primary entrypoint for volume plugins.
@ -473,7 +474,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume() (r *api.GlusterfsVolumeSourc
glog.Errorf("glusterfs: error creating volume %s ", err)
return nil, 0, fmt.Errorf("error creating volume %v", err)
}
glog.V(1).Infof("glusterfs: volume with size :%d and name:%s created", volume.Size, volume.Name)
glog.V(1).Infof("glusterfs: volume with size: %d and name: %s created", volume.Size, volume.Name)
return &api.GlusterfsVolumeSource{
EndpointsName: p.glusterfsClusterConf.glusterep,
Path: volume.Name,

View File

@ -38,6 +38,9 @@ import (
const (
// emptyUniquePodName is a UniquePodName for empty string.
emptyUniquePodName types.UniquePodName = types.UniquePodName("")
// emptyUniqueVolumeName is a UniqueVolumeName for empty string
emptyUniqueVolumeName api.UniqueVolumeName = api.UniqueVolumeName("")
)
// NestedPendingOperations defines the supported set of operations.
@ -151,10 +154,16 @@ func (grm *nestedPendingOperations) IsOperationPending(
return false
}
// This is an internal function and caller should acquire and release the lock
func (grm *nestedPendingOperations) isOperationExists(
volumeName api.UniqueVolumeName,
podName types.UniquePodName) (bool, int) {
// If volumeName is empty, operation can be executed concurrently
if volumeName == emptyUniqueVolumeName {
return false, -1
}
for previousOpIndex, previousOp := range grm.operations {
if previousOp.volumeName != volumeName {
// No match, keep searching

View File

@ -64,6 +64,13 @@ type OperationExecutor interface {
// It then updates the actual state of the world to reflect that.
AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// VerifyVolumesAreAttached verifies the given list of volumes to see whether they are still attached to the node.
// If any volume is not attached right now, it will update the actual state of the world to reflect that.
// Note that this operation could be operated concurrently with other attach/detach operations.
// In theory (but very unlikely in practise), race condition among these operations might mark volume as detached
// even if it is attached. But reconciler can correct this in a short period of time.
VerifyVolumesAreAttached(AttachedVolumes []AttachedVolume, nodeName string, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// DetachVolume detaches the volume from the node specified in
// volumeToDetach, and updates the actual state of the world to reflect
// that. If verifySafeToDetach is set, a call is made to the fetch the node
@ -397,6 +404,19 @@ func (oe *operationExecutor) DetachVolume(
volumeToDetach.VolumeName, "" /* podName */, detachFunc)
}
func (oe *operationExecutor) VerifyVolumesAreAttached(
attachedVolumes []AttachedVolume,
nodeName string,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
volumesAreAttachedFunc, err :=
oe.generateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
if err != nil {
return err
}
// Give an empty UniqueVolumeName so that this operation could be executed concurrently.
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc)
}
func (oe *operationExecutor) MountVolume(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
@ -465,6 +485,83 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc)
}
func (oe *operationExecutor) generateVolumesAreAttachedFunc(
attachedVolumes []AttachedVolume,
nodeName string,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
// volumesPerPlugin maps from a volume plugin to a list of volume specs which belong
// to this type of plugin
volumesPerPlugin := make(map[string][]*volume.Spec)
// volumeSpecMap maps from a volume spec to its unique volumeName which will be used
// when calling MarkVolumeAsDetached
volumeSpecMap := make(map[*volume.Spec]api.UniqueVolumeName)
// Iterate each volume spec and put them into a map index by the pluginName
for _, volumeAttached := range attachedVolumes {
volumePlugin, err :=
oe.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec)
if err != nil || volumePlugin == nil {
glog.Errorf(
"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
volumeAttached.VolumeName,
volumeAttached.VolumeSpec.Name(),
volumeAttached.NodeName,
err)
}
volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()]
if !pluginExists {
volumeSpecList = []*volume.Spec{}
}
volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList
volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
}
return func() error {
// For each volume plugin, pass the list of volume specs to VolumesAreAttached to check
// whether the volumes are still attached.
for pluginName, volumesSpecs := range volumesPerPlugin {
attachableVolumePlugin, err :=
oe.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil || attachableVolumePlugin == nil {
glog.Errorf(
"VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v",
pluginName,
err)
continue
}
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
if newAttacherErr != nil {
glog.Errorf(
"VolumesAreAttached failed for getting plugin %q with: %v",
pluginName,
newAttacherErr)
continue
}
attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName)
if areAttachedErr != nil {
glog.Errorf(
"VolumesAreAttached failed for checking on node %q with: %v",
nodeName,
areAttachedErr)
continue
}
for spec, check := range attached {
if !check {
actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName)
glog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.",
volumeSpecMap[spec], spec.Name(), nodeName)
}
}
}
return nil
}, nil
}
func (oe *operationExecutor) generateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {

View File

@ -139,6 +139,11 @@ type Attacher interface {
// node.
Attach(spec *Spec, hostName string) (string, error)
// VolumesAreAttached checks whether the list of volumes still attached to the specified
// the node. It returns a map which maps from the volume spec to the checking result.
// If an error is occured during checking, the error will be returned
VolumesAreAttached(specs []*Spec, nodeName string) (map[*Spec]bool, error)
// WaitForAttach blocks until the device is attached to this
// node. If it successfully attaches, the path to the device
// is returned. Otherwise, if the device does not attach after

View File

@ -83,6 +83,39 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, hostName string)
return path.Join(diskByIDPath, diskSCSIPrefix+diskUUID), nil
}
func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName string) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumePathList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumePathList = append(volumePathList, volumeSource.VolumePath)
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeSource.VolumePath] = spec
}
attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathList, nodeName)
if err != nil {
glog.Errorf(
"Error checking if volumes (%v) are attached to current node (%q). err=%v",
volumePathList, nodeName, err)
return volumesAttachedCheck, err
}
for volumePath, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumePath]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumePath, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *vsphereVMDKAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {

View File

@ -51,7 +51,8 @@ func NewInterPodAntiAffinity(client clientset.Interface) admission.Interface {
// Admit will deny any pod that defines AntiAffinity topology key other than unversioned.LabelHostname i.e. "kubernetes.io/hostname"
// in requiredDuringSchedulingRequiredDuringExecution and requiredDuringSchedulingIgnoredDuringExecution.
func (p *plugin) Admit(attributes admission.Attributes) (err error) {
if attributes.GetResource().GroupResource() != api.Resource("pods") {
// Ignore all calls to subresources or resources other than pods.
if len(attributes.GetSubresource()) != 0 || attributes.GetResource().GroupResource() != api.Resource("pods") {
return nil
}
pod, ok := attributes.GetObject().(*api.Pod)

View File

@ -118,7 +118,8 @@ func (l *persistentVolumeLabel) findAWSEBSLabels(volume *api.PersistentVolume) (
// TODO: GetVolumeLabels is actually a method on the Volumes interface
// If that gets standardized we can refactor to reduce code duplication
labels, err := ebsVolumes.GetVolumeLabels(volume.Spec.AWSElasticBlockStore.VolumeID)
spec := aws.KubernetesVolumeID(volume.Spec.AWSElasticBlockStore.VolumeID)
labels, err := ebsVolumes.GetVolumeLabels(spec)
if err != nil {
return nil, err
}