Uprade to Kubernetes v1.3.4

pull/446/head
Jimmi Dyson 2016-08-03 14:49:44 +01:00
parent 9daed05308
commit ecf7a35b19
No known key found for this signature in database
GPG Key ID: 978CD4AF4C1E87F5
66 changed files with 2204 additions and 1551 deletions

1874
Godeps/Godeps.json generated

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,7 @@ minikube start
--docker-env=[]: Environment variables to pass to the Docker daemon. (format: key=value)
--insecure-registry=[]: Insecure Docker registries to pass to the Docker daemon
--iso-url="https://storage.googleapis.com/minikube/minikube-0.5.iso": Location of the minikube iso
--kubernetes-version="v1.3.3": The kubernetes version that the minikube VM will (ex: v1.2.3)
--kubernetes-version="v1.3.4": The kubernetes version that the minikube VM will (ex: v1.2.3)
OR a URI which contains a localkube binary (ex: https://storage.googleapis.com/minikube/k8sReleases/v1.3.0/localkube-linux-amd64)
--memory=1024: Amount of RAM allocated to the minikube VM
--vm-driver="virtualbox": VM driver is one of: [virtualbox vmwarefusion kvm xhyve]

View File

@ -35,6 +35,10 @@ func (cli *Client) ContainerLogs(ctx context.Context, container string, options
query.Set("timestamps", "1")
}
if options.Details {
query.Set("details", "1")
}
if options.Follow {
query.Set("follow", "1")
}

View File

@ -10,8 +10,8 @@ import (
)
// ImageLoad loads an image in the docker host from the client host.
// It's up to the caller to close the io.ReadCloser returned by
// this function.
// It's up to the caller to close the io.ReadCloser in the
// ImageLoadResponse returned by this function.
func (cli *Client) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (types.ImageLoadResponse, error) {
v := url.Values{}
v.Set("quiet", "0")

View File

@ -27,12 +27,12 @@ func (cli *Client) ImagePull(ctx context.Context, ref string, options types.Imag
query := url.Values{}
query.Set("fromImage", repository)
if tag != "" {
if tag != "" && !options.All {
query.Set("tag", tag)
}
resp, err := cli.tryImageCreate(ctx, query, options.RegistryAuth)
if resp.statusCode == http.StatusUnauthorized {
if resp.statusCode == http.StatusUnauthorized && options.PrivilegeFunc != nil {
newAuthHeader, privilegeErr := options.PrivilegeFunc()
if privilegeErr != nil {
return nil, privilegeErr

View File

@ -10,7 +10,6 @@ import (
distreference "github.com/docker/distribution/reference"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/reference"
)
// ImagePush requests the docker host to push an image to a remote registry.
@ -27,7 +26,10 @@ func (cli *Client) ImagePush(ctx context.Context, ref string, options types.Imag
return nil, errors.New("cannot push a digest reference")
}
tag := reference.GetTagFromNamedRef(distributionRef)
var tag = ""
if nameTaggedRef, isNamedTagged := distributionRef.(distreference.NamedTagged); isNamedTagged {
tag = nameTaggedRef.Tag()
}
query := url.Values{}
query.Set("tag", tag)

View File

@ -6,6 +6,7 @@ import (
"net/url"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/filters"
"github.com/docker/engine-api/types/registry"
"golang.org/x/net/context"
)
@ -17,6 +18,14 @@ func (cli *Client) ImageSearch(ctx context.Context, term string, options types.I
query := url.Values{}
query.Set("term", term)
if options.Filters.Len() > 0 {
filterJSON, err := filters.ToParam(options.Filters)
if err != nil {
return results, err
}
query.Set("filters", filterJSON)
}
resp, err := cli.tryImageSearch(ctx, query, options.RegistryAuth)
if resp.statusCode == http.StatusUnauthorized {
newAuthHeader, privilegeErr := options.PrivilegeFunc()

View File

@ -57,6 +57,7 @@ type ContainerLogsOptions struct {
Timestamps bool
Follow bool
Tail string
Details bool
}
// ContainerRemoveOptions holds parameters to remove containers.
@ -172,12 +173,14 @@ type ImageListOptions struct {
// ImageLoadResponse returns information to the client about a load process.
type ImageLoadResponse struct {
// Body must be closed to avoid a resource leak
Body io.ReadCloser
JSON bool
}
// ImagePullOptions holds information to pull images.
type ImagePullOptions struct {
All bool
RegistryAuth string // RegistryAuth is the base64 encoded credentials for the registry
PrivilegeFunc RequestPrivilegeFunc
}
@ -203,6 +206,7 @@ type ImageRemoveOptions struct {
type ImageSearchOptions struct {
RegistryAuth string
PrivilegeFunc RequestPrivilegeFunc
Filters filters.Args
}
// ImageTagOptions holds parameters to tag an image

View File

@ -136,30 +136,49 @@ func (n UTSMode) Valid() bool {
return true
}
// PidMode represents the pid stack of the container.
// PidMode represents the pid namespace of the container.
type PidMode string
// IsPrivate indicates whether the container uses its private pid stack.
// IsPrivate indicates whether the container uses its own new pid namespace.
func (n PidMode) IsPrivate() bool {
return !(n.IsHost())
return !(n.IsHost() || n.IsContainer())
}
// IsHost indicates whether the container uses the host's pid stack.
// IsHost indicates whether the container uses the host's pid namespace.
func (n PidMode) IsHost() bool {
return n == "host"
}
// Valid indicates whether the pid stack is valid.
// IsContainer indicates whether the container uses a container's pid namespace.
func (n PidMode) IsContainer() bool {
parts := strings.SplitN(string(n), ":", 2)
return len(parts) > 1 && parts[0] == "container"
}
// Valid indicates whether the pid namespace is valid.
func (n PidMode) Valid() bool {
parts := strings.Split(string(n), ":")
switch mode := parts[0]; mode {
case "", "host":
case "container":
if len(parts) != 2 || parts[1] == "" {
return false
}
default:
return false
}
return true
}
// Container returns the name of the container whose pid namespace is going to be used.
func (n PidMode) Container() string {
parts := strings.SplitN(string(n), ":", 2)
if len(parts) > 1 {
return parts[1]
}
return ""
}
// DeviceMapping represents the device mapping between the host and the container.
type DeviceMapping struct {
PathOnHost string

View File

@ -27,6 +27,8 @@ func GetTagFromNamedRef(ref distreference.Named) string {
tag = x.Digest().String()
case distreference.NamedTagged:
tag = x.Tag()
default:
tag = "latest"
}
return tag
}

View File

@ -78,12 +78,10 @@ type IndexInfo struct {
type SearchResult struct {
// StarCount indicates the number of stars this repository has
StarCount int `json:"star_count"`
// IsOfficial indicates whether the result is an official repository or not
// IsOfficial is true if the result is from an official repository.
IsOfficial bool `json:"is_official"`
// Name is the name of the repository
Name string `json:"name"`
// IsTrusted indicates whether the result is trusted
IsTrusted bool `json:"is_trusted"`
// IsAutomated indicates whether the result is automated
IsAutomated bool `json:"is_automated"`
// Description is a textual description of the repository

View File

@ -395,6 +395,7 @@ type Volume struct {
Mountpoint string // Mountpoint is the location on disk of the volume
Status map[string]interface{} `json:",omitempty"` // Status provides low-level status information about the volume
Labels map[string]string // Labels is metadata specific to the volume
Scope string // Scope describes the level at which the volume exists (e.g. `global` for cluster-wide or `local` for machine level)
}
// VolumesListResponse contains the response for the remote API:

View File

@ -56,7 +56,6 @@ import (
"k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume"
petset "k8s.io/kubernetes/pkg/controller/petset"
"k8s.io/kubernetes/pkg/controller/podautoscaler"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
@ -66,7 +65,8 @@ import (
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
@ -408,7 +408,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
attachDetachController, attachDetachControllerErr :=
volume.NewAttachDetachController(
attachdetach.NewAttachDetachController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")),
podInformer,
nodeInformer,

View File

@ -25,6 +25,7 @@ import (
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apis/rbac"
"k8s.io/kubernetes/pkg/auth/user"
"k8s.io/kubernetes/pkg/serviceaccount"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
)
@ -201,8 +202,7 @@ func appliesToUser(user user.Info, subject rbac.Subject) (bool, error) {
if subject.Namespace == "" {
return false, fmt.Errorf("subject of kind service account without specified namespace")
}
// TODO(ericchiang): Is there a better way of matching a service account name?
return "system:serviceaccount:"+subject.Name+":"+subject.Namespace == user.GetName(), nil
return serviceaccount.MakeUsername(subject.Namespace, subject.Name) == user.GetName(), nil
default:
return false, fmt.Errorf("unknown subject kind: %s", subject.Kind)
}

File diff suppressed because it is too large Load Diff

View File

@ -25,7 +25,7 @@ import (
)
// AWSCloud implements InstanceGroups
var _ InstanceGroups = &AWSCloud{}
var _ InstanceGroups = &Cloud{}
// ResizeInstanceGroup sets the size of the specificed instancegroup Exported
// so it can be used by the e2e tests, which don't want to instantiate a full
@ -44,8 +44,8 @@ func ResizeInstanceGroup(asg ASG, instanceGroupName string, size int) error {
// Implement InstanceGroups.ResizeInstanceGroup
// Set the size to the fixed size
func (a *AWSCloud) ResizeInstanceGroup(instanceGroupName string, size int) error {
return ResizeInstanceGroup(a.asg, instanceGroupName, size)
func (c *Cloud) ResizeInstanceGroup(instanceGroupName string, size int) error {
return ResizeInstanceGroup(c.asg, instanceGroupName, size)
}
// DescribeInstanceGroup gets info about the specified instancegroup
@ -72,8 +72,8 @@ func DescribeInstanceGroup(asg ASG, instanceGroupName string) (InstanceGroupInfo
// Implement InstanceGroups.DescribeInstanceGroup
// Queries the cloud provider for information about the specified instance group
func (a *AWSCloud) DescribeInstanceGroup(instanceGroupName string) (InstanceGroupInfo, error) {
return DescribeInstanceGroup(a.asg, instanceGroupName)
func (c *Cloud) DescribeInstanceGroup(instanceGroupName string) (InstanceGroupInfo, error) {
return DescribeInstanceGroup(c.asg, instanceGroupName)
}
// awsInstanceGroup implements InstanceGroupInfo

View File

@ -30,8 +30,8 @@ import (
const ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled"
func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB, proxyProtocol bool) (*elb.LoadBalancerDescription, error) {
loadBalancer, err := s.describeLoadBalancer(loadBalancerName)
func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB, proxyProtocol bool) (*elb.LoadBalancerDescription, error) {
loadBalancer, err := c.describeLoadBalancer(loadBalancerName)
if err != nil {
return nil, err
}
@ -55,25 +55,25 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
createRequest.SecurityGroups = stringPointerArray(securityGroupIDs)
createRequest.Tags = []*elb.Tag{
{Key: aws.String(TagNameKubernetesCluster), Value: aws.String(s.getClusterName())},
{Key: aws.String(TagNameKubernetesCluster), Value: aws.String(c.getClusterName())},
{Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())},
}
glog.Infof("Creating load balancer for %v with name: ", namespacedName, loadBalancerName)
_, err := s.elb.CreateLoadBalancer(createRequest)
_, err := c.elb.CreateLoadBalancer(createRequest)
if err != nil {
return nil, err
}
if proxyProtocol {
err = s.createProxyProtocolPolicy(loadBalancerName)
err = c.createProxyProtocolPolicy(loadBalancerName)
if err != nil {
return nil, err
}
for _, listener := range listeners {
glog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to true", *listener.InstancePort)
err := s.setBackendPolicies(loadBalancerName, *listener.InstancePort, []*string{aws.String(ProxyProtocolPolicyName)})
err := c.setBackendPolicies(loadBalancerName, *listener.InstancePort, []*string{aws.String(ProxyProtocolPolicyName)})
if err != nil {
return nil, err
}
@ -97,7 +97,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
request.LoadBalancerName = aws.String(loadBalancerName)
request.Subnets = stringSetToPointers(removals)
glog.V(2).Info("Detaching load balancer from removed subnets")
_, err := s.elb.DetachLoadBalancerFromSubnets(request)
_, err := c.elb.DetachLoadBalancerFromSubnets(request)
if err != nil {
return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %v", err)
}
@ -109,7 +109,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
request.LoadBalancerName = aws.String(loadBalancerName)
request.Subnets = stringSetToPointers(additions)
glog.V(2).Info("Attaching load balancer to added subnets")
_, err := s.elb.AttachLoadBalancerToSubnets(request)
_, err := c.elb.AttachLoadBalancerToSubnets(request)
if err != nil {
return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %v", err)
}
@ -128,7 +128,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
request.LoadBalancerName = aws.String(loadBalancerName)
request.SecurityGroups = stringPointerArray(securityGroupIDs)
glog.V(2).Info("Applying updated security groups to load balancer")
_, err := s.elb.ApplySecurityGroupsToLoadBalancer(request)
_, err := c.elb.ApplySecurityGroupsToLoadBalancer(request)
if err != nil {
return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %v", err)
}
@ -188,7 +188,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
request.LoadBalancerName = aws.String(loadBalancerName)
request.LoadBalancerPorts = removals
glog.V(2).Info("Deleting removed load balancer listeners")
_, err := s.elb.DeleteLoadBalancerListeners(request)
_, err := c.elb.DeleteLoadBalancerListeners(request)
if err != nil {
return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %v", err)
}
@ -200,7 +200,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
request.LoadBalancerName = aws.String(loadBalancerName)
request.Listeners = additions
glog.V(2).Info("Creating added load balancer listeners")
_, err := s.elb.CreateLoadBalancerListeners(request)
_, err := c.elb.CreateLoadBalancerListeners(request)
if err != nil {
return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %v", err)
}
@ -219,7 +219,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
// back if a policy of the same name already exists. However, the aws-sdk does not
// seem to return an error to us in these cases. Therefore this will issue an API
// request every time.
err := s.createProxyProtocolPolicy(loadBalancerName)
err := c.createProxyProtocolPolicy(loadBalancerName)
if err != nil {
return nil, err
}
@ -252,7 +252,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
if setPolicy {
glog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to %t", instancePort, proxyProtocol)
err := s.setBackendPolicies(loadBalancerName, instancePort, proxyPolicies)
err := c.setBackendPolicies(loadBalancerName, instancePort, proxyPolicies)
if err != nil {
return nil, err
}
@ -266,7 +266,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
for instancePort, found := range foundBackends {
if !found {
glog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to false", instancePort)
err := s.setBackendPolicies(loadBalancerName, instancePort, []*string{})
err := c.setBackendPolicies(loadBalancerName, instancePort, []*string{})
if err != nil {
return nil, err
}
@ -277,7 +277,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
}
if dirty {
loadBalancer, err = s.describeLoadBalancer(loadBalancerName)
loadBalancer, err = c.describeLoadBalancer(loadBalancerName)
if err != nil {
glog.Warning("Unable to retrieve load balancer after creation/update")
return nil, err
@ -288,7 +288,7 @@ func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadB
}
// Makes sure that the health check for an ELB matches the configured listeners
func (s *AWSCloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDescription, listeners []*elb.Listener) error {
func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDescription, listeners []*elb.Listener) error {
actual := loadBalancer.HealthCheck
// Default AWS settings
@ -332,7 +332,7 @@ func (s *AWSCloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerD
request.HealthCheck = healthCheck
request.LoadBalancerName = loadBalancer.LoadBalancerName
_, err := s.elb.ConfigureHealthCheck(request)
_, err := c.elb.ConfigureHealthCheck(request)
if err != nil {
return fmt.Errorf("error configuring load-balancer health-check: %v", err)
}
@ -341,7 +341,7 @@ func (s *AWSCloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerD
}
// Makes sure that exactly the specified hosts are registered as instances with the load balancer
func (s *AWSCloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error {
func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error {
expected := sets.NewString()
for _, instance := range instances {
expected.Insert(orEmpty(instance.InstanceId))
@ -373,7 +373,7 @@ func (s *AWSCloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstan
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
registerRequest.Instances = addInstances
registerRequest.LoadBalancerName = aws.String(loadBalancerName)
_, err := s.elb.RegisterInstancesWithLoadBalancer(registerRequest)
_, err := c.elb.RegisterInstancesWithLoadBalancer(registerRequest)
if err != nil {
return err
}
@ -384,7 +384,7 @@ func (s *AWSCloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstan
deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{}
deregisterRequest.Instances = removeInstances
deregisterRequest.LoadBalancerName = aws.String(loadBalancerName)
_, err := s.elb.DeregisterInstancesFromLoadBalancer(deregisterRequest)
_, err := c.elb.DeregisterInstancesFromLoadBalancer(deregisterRequest)
if err != nil {
return err
}
@ -394,7 +394,7 @@ func (s *AWSCloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstan
return nil
}
func (s *AWSCloud) createProxyProtocolPolicy(loadBalancerName string) error {
func (c *Cloud) createProxyProtocolPolicy(loadBalancerName string) error {
request := &elb.CreateLoadBalancerPolicyInput{
LoadBalancerName: aws.String(loadBalancerName),
PolicyName: aws.String(ProxyProtocolPolicyName),
@ -407,7 +407,7 @@ func (s *AWSCloud) createProxyProtocolPolicy(loadBalancerName string) error {
},
}
glog.V(2).Info("Creating proxy protocol policy on load balancer")
_, err := s.elb.CreateLoadBalancerPolicy(request)
_, err := c.elb.CreateLoadBalancerPolicy(request)
if err != nil {
return fmt.Errorf("error creating proxy protocol policy on load balancer: %v", err)
}
@ -415,7 +415,7 @@ func (s *AWSCloud) createProxyProtocolPolicy(loadBalancerName string) error {
return nil
}
func (s *AWSCloud) setBackendPolicies(loadBalancerName string, instancePort int64, policies []*string) error {
func (c *Cloud) setBackendPolicies(loadBalancerName string, instancePort int64, policies []*string) error {
request := &elb.SetLoadBalancerPoliciesForBackendServerInput{
InstancePort: aws.Int64(instancePort),
LoadBalancerName: aws.String(loadBalancerName),
@ -426,7 +426,7 @@ func (s *AWSCloud) setBackendPolicies(loadBalancerName string, instancePort int6
} else {
glog.V(2).Infof("Removing AWS loadbalancer backend policies on node port %d", instancePort)
}
_, err := s.elb.SetLoadBalancerPoliciesForBackendServer(request)
_, err := c.elb.SetLoadBalancerPoliciesForBackendServer(request)
if err != nil {
return fmt.Errorf("error adjusting AWS loadbalancer backend policies: %v", err)
}

View File

@ -25,14 +25,14 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
)
func (s *AWSCloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
// This should be unnecessary (we already filter on TagNameKubernetesCluster,
// and something is broken if cluster name doesn't match, but anyway...
// TODO: All clouds should be cluster-aware by default
filters := []*ec2.Filter{newEc2Filter("tag:"+TagNameKubernetesCluster, clusterName)}
request := &ec2.DescribeRouteTablesInput{Filters: s.addFilters(filters)}
request := &ec2.DescribeRouteTablesInput{Filters: c.addFilters(filters)}
tables, err := s.ec2.DescribeRouteTables(request)
tables, err := c.ec2.DescribeRouteTables(request)
if err != nil {
return nil, err
}
@ -49,8 +49,8 @@ func (s *AWSCloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
// ListRoutes implements Routes.ListRoutes
// List all routes that match the filter
func (s *AWSCloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
table, err := s.findRouteTable(clusterName)
func (c *Cloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
table, err := c.findRouteTable(clusterName)
if err != nil {
return nil, err
}
@ -68,7 +68,7 @@ func (s *AWSCloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error
instanceIDs = append(instanceIDs, &instanceID)
}
instances, err := s.getInstancesByIDs(instanceIDs)
instances, err := c.getInstancesByIDs(instanceIDs)
if err != nil {
return nil, err
}
@ -95,12 +95,12 @@ func (s *AWSCloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error
}
// Sets the instance attribute "source-dest-check" to the specified value
func (s *AWSCloud) configureInstanceSourceDestCheck(instanceID string, sourceDestCheck bool) error {
func (c *Cloud) configureInstanceSourceDestCheck(instanceID string, sourceDestCheck bool) error {
request := &ec2.ModifyInstanceAttributeInput{}
request.InstanceId = aws.String(instanceID)
request.SourceDestCheck = &ec2.AttributeBooleanValue{Value: aws.Bool(sourceDestCheck)}
_, err := s.ec2.ModifyInstanceAttribute(request)
_, err := c.ec2.ModifyInstanceAttribute(request)
if err != nil {
return fmt.Errorf("error configuring source-dest-check on instance %s: %v", instanceID, err)
}
@ -109,20 +109,20 @@ func (s *AWSCloud) configureInstanceSourceDestCheck(instanceID string, sourceDes
// CreateRoute implements Routes.CreateRoute
// Create the described route
func (s *AWSCloud) CreateRoute(clusterName string, nameHint string, route *cloudprovider.Route) error {
instance, err := s.getInstanceByNodeName(route.TargetInstance)
func (c *Cloud) CreateRoute(clusterName string, nameHint string, route *cloudprovider.Route) error {
instance, err := c.getInstanceByNodeName(route.TargetInstance)
if err != nil {
return err
}
// In addition to configuring the route itself, we also need to configure the instance to accept that traffic
// On AWS, this requires turning source-dest checks off
err = s.configureInstanceSourceDestCheck(orEmpty(instance.InstanceId), false)
err = c.configureInstanceSourceDestCheck(orEmpty(instance.InstanceId), false)
if err != nil {
return err
}
table, err := s.findRouteTable(clusterName)
table, err := c.findRouteTable(clusterName)
if err != nil {
return err
}
@ -147,7 +147,7 @@ func (s *AWSCloud) CreateRoute(clusterName string, nameHint string, route *cloud
request.DestinationCidrBlock = deleteRoute.DestinationCidrBlock
request.RouteTableId = table.RouteTableId
_, err = s.ec2.DeleteRoute(request)
_, err = c.ec2.DeleteRoute(request)
if err != nil {
return fmt.Errorf("error deleting blackholed AWS route (%s): %v", aws.StringValue(deleteRoute.DestinationCidrBlock), err)
}
@ -159,7 +159,7 @@ func (s *AWSCloud) CreateRoute(clusterName string, nameHint string, route *cloud
request.InstanceId = instance.InstanceId
request.RouteTableId = table.RouteTableId
_, err = s.ec2.CreateRoute(request)
_, err = c.ec2.CreateRoute(request)
if err != nil {
return fmt.Errorf("error creating AWS route (%s): %v", route.DestinationCIDR, err)
}
@ -169,8 +169,8 @@ func (s *AWSCloud) CreateRoute(clusterName string, nameHint string, route *cloud
// DeleteRoute implements Routes.DeleteRoute
// Delete the specified route
func (s *AWSCloud) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
table, err := s.findRouteTable(clusterName)
func (c *Cloud) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
table, err := c.findRouteTable(clusterName)
if err != nil {
return err
}
@ -179,7 +179,7 @@ func (s *AWSCloud) DeleteRoute(clusterName string, route *cloudprovider.Route) e
request.DestinationCidrBlock = aws.String(route.DestinationCIDR)
request.RouteTableId = table.RouteTableId
_, err = s.ec2.DeleteRoute(request)
_, err = c.ec2.DeleteRoute(request)
if err != nil {
return fmt.Errorf("error deleting AWS route (%s): %v", route.DestinationCIDR, err)
}

View File

@ -2363,6 +2363,15 @@ func (gce *GCECloud) AttachDisk(diskName, instanceID string, readOnly bool) erro
func (gce *GCECloud) DetachDisk(devicePath, instanceID string) error {
inst, err := gce.getInstanceByName(instanceID)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DetachDisk will assume PD %q is not attached to it.",
instanceID,
devicePath)
return nil
}
return fmt.Errorf("error getting instance %q", instanceID)
}
@ -2377,6 +2386,15 @@ func (gce *GCECloud) DetachDisk(devicePath, instanceID string) error {
func (gce *GCECloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
instance, err := gce.getInstanceByName(instanceID)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DiskIsAttached will assume PD %q is not attached to it.",
instanceID,
diskName)
return false, nil
}
return false, err
}

View File

@ -20,10 +20,13 @@ import (
"errors"
"fmt"
"net"
"sync"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
@ -60,6 +63,9 @@ type rangeAllocator struct {
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeAndCIDR
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
sync.Mutex
nodesInProcessing sets.String
}
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
@ -77,6 +83,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
clusterCIDR: clusterCIDR,
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
recorder: recorder,
nodesInProcessing: sets.NewString(),
}
if serviceCIDR != nil {
@ -122,7 +129,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
return ra, nil
}
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
r.Lock()
defer r.Unlock()
if r.nodesInProcessing.Has(nodeName) {
return false
}
r.nodesInProcessing.Insert(nodeName)
return true
}
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
r.Lock()
defer r.Unlock()
r.nodesInProcessing.Delete(nodeName)
}
func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
defer r.removeNodeFromProcessing(node.Name)
if node.Spec.PodCIDR == "" {
return nil
}
@ -138,12 +162,22 @@ func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
// WARNING: If you're adding any return calls or defer any more work from this function
// you have to handle correctly nodesInProcessing.
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
if node == nil {
return nil
}
if !r.insertNodeToProcessing(node.Name) {
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
return nil
}
if node.Spec.PodCIDR != "" {
return r.occupyCIDR(node)
}
podCIDR, err := r.cidrs.allocateNext()
if err != nil {
r.removeNodeFromProcessing(node.Name)
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
@ -173,8 +207,8 @@ func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error {
return err
}
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be
// assignable.
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
// so that they won't be assignable.
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
// Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either
// clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR)
@ -192,6 +226,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
var err error
var node *api.Node
defer r.removeNodeFromProcessing(data.nodeName)
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
// TODO: change it to using PATCH instead of full Node updates.
node, err = r.client.Core().Nodes().Get(data.nodeName)
@ -209,9 +244,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
}
if err != nil {
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
// We accept the fact that we may leek CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.
if !apierrors.IsServerTimeout(err) {
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
}
}
}
return err

View File

@ -78,26 +78,13 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) {
}, nil
}
func (s *cidrSet) release(cidr *net.IPNet) error {
used, err := s.getIndexForCIDR(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
s.used.SetBit(&s.used, used, 0)
return nil
}
func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
begin, end := 0, s.maxCIDRs
func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
begin, end = 0, s.maxCIDRs
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()
if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) {
return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
}
if s.clusterMaskSize < maskSize {
@ -107,7 +94,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
Mask: subNetMask,
})
if err != nil {
return err
return -1, -1, err
}
ip := make([]byte, 4)
@ -118,9 +105,30 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
Mask: subNetMask,
})
if err != nil {
return err
return -1, -1, err
}
}
return begin, end, nil
}
func (s *cidrSet) release(cidr *net.IPNet) error {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
for i := begin; i <= end; i++ {
s.used.SetBit(&s.used, i, 0)
}
return nil
}
func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()

View File

@ -232,6 +232,34 @@ func NewNodeController(
glog.Errorf("Error allocating CIDR: %v", err)
}
},
UpdateFunc: func(_, obj interface{}) {
node := obj.(*api.Node)
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
// - already processed a Node successfully and allocated a CIDR for it
// (cidr is marked as used),
// - already processed a Node but we did saw a "timeout" response and
// request eventually got through in this case we haven't released
// the allocated CIDR (cidr is still marked as used).
// There's a possible error here:
// - NC sees a new Node and assigns a CIDR X to it,
// - Update Node call fails with a timeout,
// - Node is updated by some other component, NC sees an update and
// assigns CIDR Y to the Node,
// - Both CIDR X and CIDR Y are marked as used in the local cache,
// even though Node sees only CIDR Y
// The problem here is that in in-memory cache we see CIDR X as marked,
// which prevents it from being assigned to any new node. The cluster
// state is correct.
// Restart of NC fixes the issue.
if node.Spec.PodCIDR == "" {
err := nc.cidrAllocator.AllocateOrOccupyCIDR(node)
if err != nil {
glog.Errorf("Error allocating CIDR: %v", err)
}
}
},
DeleteFunc: func(obj interface{}) {
node := obj.(*api.Node)
err := nc.cidrAllocator.ReleaseCIDR(node)

View File

@ -0,0 +1,2 @@
assignees:
- saad-ali

View File

@ -16,7 +16,7 @@ limitations under the License.
// Package volume implements a controller to manage volume attach and detach
// operations.
package volume
package attachdetach
import (
"fmt"
@ -28,10 +28,10 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/populator"
"k8s.io/kubernetes/pkg/controller/volume/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
@ -54,7 +54,7 @@ const (
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 5 * time.Minute
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute
)
// AttachDetachController defines the operations supported by this controller.

View File

@ -224,7 +224,7 @@ type nodeToUpdateStatusFor struct {
}
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_ api.UniqueVolumeName, volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_, err := asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
return err
}

View File

@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
kcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
@ -82,12 +82,26 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
glog.Errorf("MetaNamespaceKeyFunc failed for pod %q (UID %q) with: %v", dswPodKey, dswPodUID, err)
continue
}
// retrieve the pod object from pod informer with the namespace key
informerPodObj, exists, err := dswp.podInformer.GetStore().GetByKey(dswPodKey)
if err != nil || informerPodObj == nil {
glog.Errorf("podInformer GetByKey failed for pod %q (UID %q) with %v", dswPodKey, dswPodUID, err)
// Retrieve the pod object from pod informer with the namespace key
informerPodObj, exists, err :=
dswp.podInformer.GetStore().GetByKey(dswPodKey)
if err != nil {
glog.Errorf(
"podInformer GetByKey failed for pod %q (UID %q) with %v",
dswPodKey,
dswPodUID,
err)
continue
}
if exists && informerPodObj == nil {
glog.Info(
"podInformer GetByKey found pod, but informerPodObj is nil for pod %q (UID %q)",
dswPodKey,
dswPodUID)
continue
}
if exists {
informerPod, ok := informerPodObj.(*api.Pod)
if !ok {
@ -95,7 +109,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
continue
}
informerPodUID := volumehelper.GetUniquePodName(informerPod)
// Check whether the unique idenfier of the pod from dsw matches the one retrived from pod informer
// Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer
if informerPodUID == dswPodUID {
glog.V(10).Infof(
"Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
@ -103,7 +117,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
}
}
// the pod from dsw does not exist in pod informer, or it does not match the unique idenfier retrieved
// the pod from dsw does not exist in pod informer, or it does not match the unique identifer retrieved
// from the informer, delete it from dsw
glog.V(1).Infof(
"Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID)

View File

@ -23,10 +23,11 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/statusupdater"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
@ -114,9 +115,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
@ -134,9 +135,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start (maxWaitForUnmountDuration expiry) for volume %q (spec.Name: %q) from node %q with err: %v",
@ -169,9 +170,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v",

View File

@ -27,7 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/util/strategicpatch"
)
@ -62,10 +62,12 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
for nodeName, attachedVolumes := range nodesToUpdate {
nodeObj, exists, err := nsu.nodeInformer.GetStore().GetByKey(nodeName)
if nodeObj == nil || !exists || err != nil {
return fmt.Errorf(
"failed to find node %q in NodeInformer cache. %v",
// If node does not exist, its status cannot be updated, log error and move on.
glog.Warningf(
"Could not update node status. Failed to find node %q in NodeInformer cache. %v",
nodeName,
err)
return nil
}
node, ok := nodeObj.(*api.Node)

View File

@ -1,3 +1,4 @@
assignees:
- jsafrane
- saad-ali
- thockin

View File

@ -71,7 +71,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volume"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
@ -501,9 +501,9 @@ func NewMainKubelet(
return nil, err
}
klet.volumeManager, err = kubeletvolume.NewVolumeManager(
klet.volumeManager, err = volumemanager.NewVolumeManager(
enableControllerAttachDetach,
hostname,
nodeName,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr,
@ -687,7 +687,7 @@ type Kubelet struct {
// VolumeManager runs a set of asynchronous loops that figure out which
// volumes need to be attached/mounted/unmounted/detached based on the pods
// scheduled on this node and makes it so.
volumeManager kubeletvolume.VolumeManager
volumeManager volumemanager.VolumeManager
// Cloud provider interface.
cloud cloudprovider.Interface
@ -977,7 +977,9 @@ func (kl *Kubelet) initializeModules() error {
// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules() {
if err := kl.cadvisor.Start(); err != nil {
kl.runtimeState.setInternalError(fmt.Errorf("Failed to start cAdvisor %v", err))
// Fail kubelet and rely on the babysitter to retry starting kubelet.
// TODO(random-liu): Add backoff logic in the babysitter
glog.Fatalf("Failed to start cAdvisor %v", err)
}
}

View File

@ -1631,13 +1631,6 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePerio
r.containerRefManager.ClearRef(c.ID)
}
// Touch the systemd service file to update the mod time so it will
// not be garbage collected too soon.
if err := r.os.Chtimes(serviceFile, time.Now(), time.Now()); err != nil {
glog.Errorf("rkt: Failed to change the modification time of the service file %q: %v", serviceName, err)
return err
}
// Since all service file have 'KillMode=mixed', the processes in
// the unit's cgroup will receive a SIGKILL if the normal stop timeouts.
reschan := make(chan string)

View File

@ -0,0 +1,2 @@
assignees:
- saad-ali

View File

@ -51,14 +51,6 @@ type ActualStateOfWorld interface {
// operationexecutor to interact with it.
operationexecutor.ActualStateOfWorldAttacherUpdater
// AddVolume adds the given volume to the cache indicating the specified
// volume is attached to this node. A unique volume name is generated from
// the volumeSpec and returned on success.
// If a volume with the same generated name already exists, this is a noop.
// If no volume plugin can support the given volumeSpec or more than one
// plugin can support it, an error is returned.
AddVolume(volumeSpec *volume.Spec, devicePath string) (api.UniqueVolumeName, error)
// AddPodToVolume adds the given pod to the given volume in the cache
// indicating the specified volume has been successfully mounted to the
// specified pod.
@ -274,9 +266,8 @@ type mountedPod struct {
}
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_, err := asw.AddVolume(volumeSpec, devicePath)
return err
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, _, devicePath string) error {
return asw.addVolume(volumeName, volumeSpec, devicePath)
}
func (asw *actualStateOfWorld) MarkVolumeAsDetached(
@ -315,27 +306,34 @@ func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */)
}
func (asw *actualStateOfWorld) AddVolume(
volumeSpec *volume.Spec, devicePath string) (api.UniqueVolumeName, error) {
// addVolume adds the given volume to the cache indicating the specified
// volume is attached to this node. If no volume name is supplied, a unique
// volume name is generated from the volumeSpec and returned on success. If a
// volume with the same generated name already exists, this is a noop. If no
// volume plugin can support the given volumeSpec or more than one plugin can
// support it, an error is returned.
func (asw *actualStateOfWorld) addVolume(
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string) error {
asw.Lock()
defer asw.Unlock()
volumePlugin, err := asw.volumePluginMgr.FindPluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
return "", fmt.Errorf(
return fmt.Errorf(
"failed to get Plugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
volumeName, err :=
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
if len(volumeName) == 0 {
volumeName, err = volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
}
pluginIsAttachable := false
@ -357,7 +355,7 @@ func (asw *actualStateOfWorld) AddVolume(
asw.attachedVolumes[volumeName] = volumeObj
}
return volumeObj.volumeName, nil
return nil
}
func (asw *actualStateOfWorld) AddPodToVolume(

View File

@ -183,14 +183,27 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
err)
}
volumeName, err :=
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
var volumeName api.UniqueVolumeName
// The unique volume name used depends on whether the volume is attachable
// or not.
attachable := dsw.isAttachableVolume(volumeSpec)
if attachable {
// For attachable volumes, use the unique volume name as reported by
// the plugin.
volumeName, err =
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
} else {
// For non-attachable volumes, generate a unique name based on the pod
// namespace and name and the name of the volume within the pod.
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, outerVolumeSpecName)
}
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
@ -198,7 +211,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
volumeObj = volumeToMount{
volumeName: volumeName,
podsToMount: make(map[types.UniquePodName]podToMount),
pluginIsAttachable: dsw.isAttachableVolume(volumeSpec),
pluginIsAttachable: attachable,
volumeGidValue: volumeGidValue,
reportedInUse: false,
}

View File

@ -32,7 +32,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/volume/cache"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"

View File

@ -24,9 +24,10 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet/volume/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
@ -117,9 +118,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -158,9 +159,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
rc.hostName,
rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -193,9 +194,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
volumeToMount.Pod.UID)
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -231,9 +232,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -266,9 +267,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -297,9 +298,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",

View File

@ -28,9 +28,9 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/volume/cache"
"k8s.io/kubernetes/pkg/kubelet/volume/populator"
"k8s.io/kubernetes/pkg/kubelet/volume/reconciler"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"

View File

@ -1061,6 +1061,7 @@ func DefaultAPIResourceConfigSource() *genericapiserver.ResourceConfig {
extensionsapiv1beta1.SchemeGroupVersion.WithResource("horizontalpodautoscalers"),
extensionsapiv1beta1.SchemeGroupVersion.WithResource("ingresses"),
extensionsapiv1beta1.SchemeGroupVersion.WithResource("jobs"),
extensionsapiv1beta1.SchemeGroupVersion.WithResource("networkpolicies"),
extensionsapiv1beta1.SchemeGroupVersion.WithResource("replicasets"),
extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources"),
)

View File

@ -0,0 +1,2 @@
assignees:
- saad-ali

View File

@ -0,0 +1,120 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package exponentialbackoff contains logic for implementing exponential
// backoff for GoRoutineMap and NestedPendingOperations.
package exponentialbackoff
import (
"fmt"
"time"
)
const (
// initialDurationBeforeRetry is the amount of time after an error occurs
// that GoroutineMap will refuse to allow another operation to start with
// the same target (if exponentialBackOffOnError is enabled). Each
// successive error results in a wait 2x times the previous.
initialDurationBeforeRetry time.Duration = 500 * time.Millisecond
// maxDurationBeforeRetry is the maximum amount of time that
// durationBeforeRetry will grow to due to exponential backoff.
maxDurationBeforeRetry time.Duration = 2 * time.Minute
)
// ExponentialBackoff contains the last occurrence of an error and the duration
// that retries are not permitted.
type ExponentialBackoff struct {
lastError error
lastErrorTime time.Time
durationBeforeRetry time.Duration
}
// SafeToRetry returns an error if the durationBeforeRetry period for the given
// lastErrorTime has not yet expired. Otherwise it returns nil.
func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error {
if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry {
return NewExponentialBackoffError(operationName, *expBackoff)
}
return nil
}
func (expBackoff *ExponentialBackoff) Update(err *error) {
if expBackoff.durationBeforeRetry == 0 {
expBackoff.durationBeforeRetry = initialDurationBeforeRetry
} else {
expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry
if expBackoff.durationBeforeRetry > maxDurationBeforeRetry {
expBackoff.durationBeforeRetry = maxDurationBeforeRetry
}
}
expBackoff.lastError = *err
expBackoff.lastErrorTime = time.Now()
}
func (expBackoff *ExponentialBackoff) GenerateNoRetriesPermittedMsg(
operationName string) string {
return fmt.Sprintf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). Error: %v",
operationName,
expBackoff.lastErrorTime.Add(expBackoff.durationBeforeRetry),
expBackoff.durationBeforeRetry,
expBackoff.lastError)
}
// NewExponentialBackoffError returns a new instance of ExponentialBackoff error.
func NewExponentialBackoffError(
operationName string, expBackoff ExponentialBackoff) error {
return exponentialBackoffError{
operationName: operationName,
expBackoff: expBackoff,
}
}
// IsExponentialBackoff returns true if an error returned from GoroutineMap
// indicates that a new operation can not be started because
// exponentialBackOffOnError is enabled and a previous operation with the same
// operation failed within the durationBeforeRetry period.
func IsExponentialBackoff(err error) bool {
switch err.(type) {
case exponentialBackoffError:
return true
default:
return false
}
}
// exponentialBackoffError is the error returned returned from GoroutineMap when
// a new operation can not be started because exponentialBackOffOnError is
// enabled and a previous operation with the same operation failed within the
// durationBeforeRetry period.
type exponentialBackoffError struct {
operationName string
expBackoff ExponentialBackoff
}
var _ error = exponentialBackoffError{}
func (err exponentialBackoffError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.",
err.operationName,
err.expBackoff.lastErrorTime,
err.expBackoff.lastErrorTime.Add(err.expBackoff.durationBeforeRetry),
err.expBackoff.durationBeforeRetry,
err.expBackoff.lastError)
}

View File

@ -23,18 +23,18 @@ package goroutinemap
import (
"fmt"
"runtime"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
k8sRuntime "k8s.io/kubernetes/pkg/util/runtime"
)
const (
// initialDurationBeforeRetry is the amount of time after an error occurs
// that GoRoutineMap will refuse to allow another operation to start with
// the same operationName (if exponentialBackOffOnError is enabled). Each
// the same operation name (if exponentialBackOffOnError is enabled). Each
// successive error results in a wait 2x times the previous.
initialDurationBeforeRetry time.Duration = 500 * time.Millisecond
@ -45,12 +45,13 @@ const (
// GoRoutineMap defines the supported set of operations.
type GoRoutineMap interface {
// Run adds operationName to the list of running operations and spawns a new
// go routine to execute the operation. If an operation with the same name
// already exists, an error is returned. Once the operation is complete, the
// go routine is terminated and the operationName is removed from the list
// of executing operations allowing a new operation to be started with the
// same name without error.
// Run adds operation name to the list of running operations and spawns a
// new go routine to execute the operation.
// If an operation with the same operation name already exists, an
// AlreadyExists or ExponentialBackoff error is returned.
// Once the operation is complete, the go routine is terminated and the
// operation name is removed from the list of executing operations allowing
// a new operation to be started with the same operation name without error.
Run(operationName string, operationFunc func() error) error
// Wait blocks until all operations are completed. This is typically
@ -61,65 +62,72 @@ type GoRoutineMap interface {
// NewGoRoutineMap returns a new instance of GoRoutineMap.
func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
return &goRoutineMap{
g := &goRoutineMap{
operations: make(map[string]operation),
exponentialBackOffOnError: exponentialBackOffOnError,
lock: &sync.Mutex{},
}
g.cond = sync.NewCond(g.lock)
return g
}
type goRoutineMap struct {
operations map[string]operation
exponentialBackOffOnError bool
wg sync.WaitGroup
sync.Mutex
cond *sync.Cond
lock *sync.Mutex
}
type operation struct {
operationPending bool
lastError error
lastErrorTime time.Time
durationBeforeRetry time.Duration
operationPending bool
expBackoff exponentialbackoff.ExponentialBackoff
}
func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) error {
grm.Lock()
defer grm.Unlock()
func (grm *goRoutineMap) Run(
operationName string,
operationFunc func() error) error {
grm.lock.Lock()
defer grm.lock.Unlock()
existingOp, exists := grm.operations[operationName]
if exists {
// Operation with name exists
if existingOp.operationPending {
return newAlreadyExistsError(operationName)
return NewAlreadyExistsError(operationName)
}
if time.Since(existingOp.lastErrorTime) <= existingOp.durationBeforeRetry {
return newExponentialBackoffError(operationName, existingOp)
if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
return err
}
}
grm.operations[operationName] = operation{
operationPending: true,
lastError: existingOp.lastError,
lastErrorTime: existingOp.lastErrorTime,
durationBeforeRetry: existingOp.durationBeforeRetry,
operationPending: true,
expBackoff: existingOp.expBackoff,
}
grm.wg.Add(1)
go func() (err error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(operationName, &err)
// Handle panic, if any, from operationFunc()
defer recoverFromPanic(operationName, &err)
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
}()
return nil
}
func (grm *goRoutineMap) operationComplete(operationName string, err *error) {
defer grm.wg.Done()
grm.Lock()
defer grm.Unlock()
func (grm *goRoutineMap) operationComplete(
operationName string, err *error) {
// Defer operations are executed in Last-In is First-Out order. In this case
// the lock is acquired first when operationCompletes begins, and is
// released when the method finishes, after the lock is released cond is
// signaled to wake waiting goroutine.
defer grm.cond.Signal()
grm.lock.Lock()
defer grm.lock.Unlock()
if *err == nil || !grm.exponentialBackOffOnError {
// Operation completed without error, or exponentialBackOffOnError disabled
@ -133,70 +141,33 @@ func (grm *goRoutineMap) operationComplete(operationName string, err *error) {
} else {
// Operation completed with error and exponentialBackOffOnError Enabled
existingOp := grm.operations[operationName]
if existingOp.durationBeforeRetry == 0 {
existingOp.durationBeforeRetry = initialDurationBeforeRetry
} else {
existingOp.durationBeforeRetry = 2 * existingOp.durationBeforeRetry
if existingOp.durationBeforeRetry > maxDurationBeforeRetry {
existingOp.durationBeforeRetry = maxDurationBeforeRetry
}
}
existingOp.lastError = *err
existingOp.lastErrorTime = time.Now()
existingOp.expBackoff.Update(err)
existingOp.operationPending = false
grm.operations[operationName] = existingOp
// Log error
glog.Errorf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). error: %v",
operationName,
existingOp.lastErrorTime.Add(existingOp.durationBeforeRetry),
existingOp.durationBeforeRetry,
*err)
glog.Errorf("%v",
existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
}
}
func (grm *goRoutineMap) Wait() {
grm.wg.Wait()
}
grm.lock.Lock()
defer grm.lock.Unlock()
func recoverFromPanic(operationName string, err *error) {
if r := recover(); r != nil {
callers := ""
for i := 0; true; i++ {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
}
callers = callers + fmt.Sprintf("%v:%v\n", file, line)
}
*err = fmt.Errorf(
"operation for %q recovered from panic %q. (err=%v) Call stack:\n%v",
operationName,
r,
*err,
callers)
for len(grm.operations) > 0 {
grm.cond.Wait()
}
}
// alreadyExistsError is the error returned when NewGoRoutine() detects that
// an operation with the given name is already running.
type alreadyExistsError struct {
operationName string
}
var _ error = alreadyExistsError{}
func (err alreadyExistsError) Error() string {
return fmt.Sprintf("Failed to create operation with name %q. An operation with that name is already executing.", err.operationName)
}
func newAlreadyExistsError(operationName string) error {
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
func NewAlreadyExistsError(operationName string) error {
return alreadyExistsError{operationName}
}
// IsAlreadyExists returns true if an error returned from NewGoRoutine indicates
// that operation with the same name already exists.
// IsAlreadyExists returns true if an error returned from GoRoutineMap indicates
// a new operation can not be started because an operation with the same
// operation name is already executing.
func IsAlreadyExists(err error) bool {
switch err.(type) {
case alreadyExistsError:
@ -206,42 +177,17 @@ func IsAlreadyExists(err error) bool {
}
}
// exponentialBackoffError is the error returned when NewGoRoutine() detects
// that the previous operation for given name failed less then
// durationBeforeRetry.
type exponentialBackoffError struct {
// alreadyExistsError is the error returned by GoRoutineMap when a new operation
// can not be started because an operation with the same operation name is
// already executing.
type alreadyExistsError struct {
operationName string
failedOp operation
}
var _ error = exponentialBackoffError{}
var _ error = alreadyExistsError{}
func (err exponentialBackoffError) Error() string {
func (err alreadyExistsError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.",
err.operationName,
err.failedOp.lastErrorTime,
err.failedOp.lastErrorTime.Add(err.failedOp.durationBeforeRetry),
err.failedOp.durationBeforeRetry,
err.failedOp.lastError)
}
func newExponentialBackoffError(
operationName string, failedOp operation) error {
return exponentialBackoffError{
operationName: operationName,
failedOp: failedOp,
}
}
// IsExponentialBackoff returns true if an error returned from NewGoRoutine()
// indicates that the previous operation for given name failed less then
// durationBeforeRetry.
func IsExponentialBackoff(err error) bool {
switch err.(type) {
case exponentialBackoffError:
return true
default:
return false
}
"Failed to create operation with name %q. An operation with that name is already executing.",
err.operationName)
}

View File

@ -107,7 +107,8 @@ func doMount(source string, target string, fstype string, options []string) erro
command := exec.Command("mount", mountArgs...)
output, err := command.CombinedOutput()
if err != nil {
return fmt.Errorf("Mount failed: %v\nMounting arguments: %s %s %s %v\nOutput: %s\n",
glog.Errorf("Mount failed: %v\nMounting arguments: %s %s %s %v\nOutput: %s\n", err, source, target, fstype, options, string(output))
return fmt.Errorf("mount failed: %v\nMounting arguments: %s %s %s %v\nOutput: %s\n",
err, source, target, fstype, options, string(output))
}
return err
@ -249,6 +250,7 @@ func (mounter *SafeFormatAndMount) formatAndMount(source string, target string,
options = append(options, "defaults")
// Run fsck on the disk to fix repairable issues
glog.V(4).Infof("Checking for issues with fsck on disk: %s", source)
args := []string{"-a", source}
cmd := mounter.Runner.Command("fsck", args...)
out, err := cmd.CombinedOutput()
@ -267,6 +269,7 @@ func (mounter *SafeFormatAndMount) formatAndMount(source string, target string,
}
// Try to mount the disk
glog.V(4).Infof("Attempting to mount disk: %s %s %s", fstype, source, target)
err = mounter.Interface.Mount(source, target, fstype, options)
if err != nil {
// It is possible that this disk is not formatted. Double check using diskLooksUnformatted
@ -281,12 +284,15 @@ func (mounter *SafeFormatAndMount) formatAndMount(source string, target string,
if fstype == "ext4" || fstype == "ext3" {
args = []string{"-E", "lazy_itable_init=0,lazy_journal_init=0", "-F", source}
}
glog.Infof("Disk %q appears to be unformatted, attempting to format as type: %q with options: %v", source, fstype, args)
cmd := mounter.Runner.Command("mkfs."+fstype, args...)
_, err := cmd.CombinedOutput()
if err == nil {
// the disk has been formatted successfully try to mount it again.
glog.Infof("Disk successfully formatted (mkfs): %s - %s %s", fstype, source, target)
return mounter.Interface.Mount(source, target, fstype, options)
}
glog.Errorf("format of disk %q failed: type:(%q) target:(%q) options:(%q)error:(%v)", source, fstype, target, options, err)
return err
}
}
@ -297,6 +303,7 @@ func (mounter *SafeFormatAndMount) formatAndMount(source string, target string,
func (mounter *SafeFormatAndMount) diskLooksUnformatted(disk string) (bool, error) {
args := []string{"-nd", "-o", "FSTYPE", disk}
cmd := mounter.Runner.Command("lsblk", args...)
glog.V(4).Infof("Attempting to determine if disk %q is formatted using lsblk with args: (%v)", disk, args)
dataOut, err := cmd.CombinedOutput()
output := strings.TrimSpace(string(dataOut))
@ -304,6 +311,7 @@ func (mounter *SafeFormatAndMount) diskLooksUnformatted(disk string) (bool, erro
// an error if so.
if err != nil {
glog.Errorf("Could not determine if disk %q is formatted (%v)", disk, err)
return false, err
}

View File

@ -18,8 +18,9 @@ package runtime
import (
"fmt"
"github.com/golang/glog"
"runtime"
"github.com/golang/glog"
)
// For testing, bypass HandleCrash.
@ -47,6 +48,11 @@ func HandleCrash(additionalHandlers ...func(interface{})) {
// logPanic logs the caller tree when a panic occurs.
func logPanic(r interface{}) {
callers := getCallers(r)
glog.Errorf("Recovered from panic: %#v (%v)\n%v", r, r, callers)
}
func getCallers(r interface{}) string {
callers := ""
for i := 0; true; i++ {
_, file, line, ok := runtime.Caller(i)
@ -55,7 +61,8 @@ func logPanic(r interface{}) {
}
callers = callers + fmt.Sprintf("%v:%v\n", file, line)
}
glog.Errorf("Recovered from panic: %#v (%v)\n%v", r, r, callers)
return callers
}
// ErrorHandlers is a list of functions which will be invoked when an unreturnable
@ -92,3 +99,18 @@ func GetCaller() string {
}
return f.Name()
}
// RecoverFromPanic replaces the specified error with an error containing the
// original error, and the call tree when a panic occurs. This enables error
// handlers to handle errors and panics the same way.
func RecoverFromPanic(err *error) {
if r := recover(); r != nil {
callers := getCallers(r)
*err = fmt.Errorf(
"recovered from panic %q. (err=%v) Call stack:\n%v",
r,
*err,
callers)
}
}

View File

@ -51,7 +51,7 @@ var (
// semantic version is a git hash, but the version itself is no
// longer the direct output of "git describe", but a slight
// translation to be semver compliant.
gitVersion string = "v1.3.3+$Format:%h$"
gitVersion string = "v1.3.4+$Format:%h$"
gitCommit string = "$Format:%H$" // sha1 from git, output of $(git rev-parse HEAD)
gitTreeState string = "not a git tree" // state of git tree, either "clean" or "dirty"

View File

@ -147,6 +147,7 @@ func (plugin *awsElasticBlockStorePlugin) NewDeleter(spec *volume.Spec) (volume.
func (plugin *awsElasticBlockStorePlugin) newDeleterInternal(spec *volume.Spec, manager ebsManager) (volume.Deleter, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.AWSElasticBlockStore == nil {
glog.Errorf("spec.PersistentVolumeSource.AWSElasticBlockStore is nil")
return nil, fmt.Errorf("spec.PersistentVolumeSource.AWSElasticBlockStore is nil")
}
return &awsElasticBlockStoreDeleter{
@ -242,6 +243,7 @@ func (b *awsElasticBlockStoreMounter) SetUpAt(dir string, fsGroup *int64) error
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
glog.V(4).Infof("PersistentDisk set up: %s %v %v", dir, !notMnt, err)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mount point: %s %v", dir, err)
return err
}
if !notMnt {
@ -263,17 +265,17 @@ func (b *awsElasticBlockStoreMounter) SetUpAt(dir string, fsGroup *int64) error
if err != nil {
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
glog.Errorf("IsLikelyNotMountPoint check failed for %s: %v", dir, mntErr)
return err
}
if !notMnt {
if mntErr = b.mounter.Unmount(dir); mntErr != nil {
glog.Errorf("Failed to unmount: %v", mntErr)
glog.Errorf("failed to unmount %s: %v", dir, mntErr)
return err
}
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
glog.Errorf("IsLikelyNotMountPoint check failed for %s: %v", dir, mntErr)
return err
}
if !notMnt {
@ -283,6 +285,7 @@ func (b *awsElasticBlockStoreMounter) SetUpAt(dir string, fsGroup *int64) error
}
}
os.Remove(dir)
glog.Errorf("Mount of disk %s failed: %v", dir, err)
return err
}
@ -290,6 +293,7 @@ func (b *awsElasticBlockStoreMounter) SetUpAt(dir string, fsGroup *int64) error
volume.SetVolumeOwnership(b, fsGroup)
}
glog.V(4).Infof("Successfully mounted %s", dir)
return nil
}
@ -305,10 +309,12 @@ func getVolumeIDFromGlobalMount(host volume.VolumeHost, globalPath string) (stri
basePath := path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts")
rel, err := filepath.Rel(basePath, globalPath)
if err != nil {
glog.Errorf("Failed to get volume id from global mount %s - %v", globalPath, err)
return "", err
}
if strings.Contains(rel, "../") {
return "", fmt.Errorf("Unexpected mount path: " + globalPath)
glog.Errorf("Unexpected mount path: %s", globalPath)
return "", fmt.Errorf("unexpected mount path: " + globalPath)
}
// Reverse the :// replacement done in makeGlobalPDPath
volumeID := rel
@ -391,6 +397,7 @@ var _ volume.Provisioner = &awsElasticBlockStoreProvisioner{}
func (c *awsElasticBlockStoreProvisioner) Provision() (*api.PersistentVolume, error) {
volumeID, sizeGB, labels, err := c.manager.CreateVolume(c)
if err != nil {
glog.Errorf("Provision failed: %v", err)
return nil, err
}

View File

@ -166,8 +166,8 @@ func pathExists(path string) (bool, error) {
}
// Return cloud provider
func getCloudProvider(cloudProvider cloudprovider.Interface) (*aws.AWSCloud, error) {
awsCloudProvider, ok := cloudProvider.(*aws.AWSCloud)
func getCloudProvider(cloudProvider cloudprovider.Interface) (*aws.Cloud, error) {
awsCloudProvider, ok := cloudProvider.(*aws.Cloud)
if !ok || awsCloudProvider == nil {
return nil, fmt.Errorf("Failed to get AWS Cloud Provider. GetCloudProvider returned %v instead", cloudProvider)
}

View File

@ -277,7 +277,7 @@ func (b *cinderVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
// TODO: handle failed mounts here.
notmnt, err := b.mounter.IsLikelyNotMountPoint(dir)
if err != nil && !os.IsNotExist(err) {
glog.V(4).Infof("IsLikelyNotMountPoint failed: %v", err)
glog.Errorf("Cannot validate mount point: %s %v", dir, err)
return err
}
if !notmnt {
@ -299,6 +299,7 @@ func (b *cinderVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
}
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
glog.V(4).Infof("Attempting to mount cinder volume %s to %s with options %v", b.pdName, dir, options)
err = b.mounter.Mount(globalPDPath, dir, "", options)
if err != nil {
glog.V(4).Infof("Mount failed: %v", err)
@ -326,6 +327,7 @@ func (b *cinderVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
os.Remove(dir)
// TODO: we should really eject the attach/detach out into its own control loop.
detachDiskLogError(b.cinderVolume)
glog.Errorf("Failed to mount %s: %v", dir, err)
return err
}

View File

@ -60,7 +60,10 @@ func (plugin *configMapPlugin) GetVolumeName(spec *volume.Spec) (string, error)
return "", fmt.Errorf("Spec does not reference a ConfigMap volume type")
}
return volumeSource.Name, nil
return fmt.Sprintf(
"%v/%v",
spec.Name(),
volumeSource.Name), nil
}
func (plugin *configMapPlugin) CanSupport(spec *volume.Spec) bool {
@ -118,12 +121,14 @@ func (sv *configMapVolume) GetAttributes() volume.Attributes {
}
}
// This is the spec for the volume that this plugin wraps.
var wrappedVolumeSpec = volume.Spec{
// This should be on a tmpfs instead of the local disk; the problem is
// charging the memory for the tmpfs to the right cgroup. We should make
// this a tmpfs when we can do the accounting correctly.
Volume: &api.Volume{VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}},
func wrappedVolumeSpec() volume.Spec {
// This is the spec for the volume that this plugin wraps.
return volume.Spec{
// This should be on a tmpfs instead of the local disk; the problem is
// charging the memory for the tmpfs to the right cgroup. We should make
// this a tmpfs when we can do the accounting correctly.
Volume: &api.Volume{VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}},
}
}
func (b *configMapVolumeMounter) SetUp(fsGroup *int64) error {
@ -134,7 +139,7 @@ func (b *configMapVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
glog.V(3).Infof("Setting up volume %v for pod %v at %v", b.volName, b.pod.UID, dir)
// Wrap EmptyDir, let it do the setup.
wrapped, err := b.plugin.host.NewWrapperMounter(b.volName, wrappedVolumeSpec, &b.pod, *b.opts)
wrapped, err := b.plugin.host.NewWrapperMounter(b.volName, wrappedVolumeSpec(), &b.pod, *b.opts)
if err != nil {
return err
}
@ -233,7 +238,7 @@ func (c *configMapVolumeUnmounter) TearDownAt(dir string) error {
glog.V(3).Infof("Tearing down volume %v for pod %v at %v", c.volName, c.podUID, dir)
// Wrap EmptyDir, let it do the teardown.
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec, c.podUID)
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec(), c.podUID)
if err != nil {
return err
}

View File

@ -49,8 +49,10 @@ type downwardAPIPlugin struct {
var _ volume.VolumePlugin = &downwardAPIPlugin{}
var wrappedVolumeSpec = volume.Spec{
Volume: &api.Volume{VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: api.StorageMediumMemory}}},
func wrappedVolumeSpec() volume.Spec {
return volume.Spec{
Volume: &api.Volume{VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: api.StorageMediumMemory}}},
}
}
func (plugin *downwardAPIPlugin) Init(host volume.VolumeHost) error {
@ -144,7 +146,7 @@ func (b *downwardAPIVolumeMounter) SetUp(fsGroup *int64) error {
func (b *downwardAPIVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
glog.V(3).Infof("Setting up a downwardAPI volume %v for pod %v/%v at %v", b.volName, b.pod.Namespace, b.pod.Name, dir)
// Wrap EmptyDir. Here we rely on the idempotency of the wrapped plugin to avoid repeatedly mounting
wrapped, err := b.plugin.host.NewWrapperMounter(b.volName, wrappedVolumeSpec, b.pod, *b.opts)
wrapped, err := b.plugin.host.NewWrapperMounter(b.volName, wrappedVolumeSpec(), b.pod, *b.opts)
if err != nil {
glog.Errorf("Couldn't setup downwardAPI volume %v for pod %v/%v: %s", b.volName, b.pod.Namespace, b.pod.Name, err.Error())
return err
@ -233,7 +235,7 @@ func (c *downwardAPIVolumeUnmounter) TearDownAt(dir string) error {
glog.V(3).Infof("Tearing down volume %v for pod %v at %v", c.volName, c.podUID, dir)
// Wrap EmptyDir, let it do the teardown.
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec, c.podUID)
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec(), c.podUID)
if err != nil {
return err
}

View File

@ -265,7 +265,7 @@ func (f *flexVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
notmnt, err := f.blockDeviceMounter.IsLikelyNotMountPoint(dir)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("Cannot validate mountpoint: %s", dir)
glog.Errorf("Cannot validate mount point: %s %v", dir, err)
return err
}
if !notmnt {
@ -290,18 +290,20 @@ func (f *flexVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
f.options[optionKeySecret+"/"+name] = secret
}
glog.V(4).Infof("attempting to attach volume: %s with options %v", f.volName, f.options)
device, err := f.manager.attach(f)
if err != nil {
if !isCmdNotSupportedErr(err) {
glog.Errorf("Failed to attach volume: %s", f.volName)
glog.Errorf("failed to attach volume: %s", f.volName)
return err
}
// Attach not supported or required. Continue to mount.
}
glog.V(4).Infof("attempting to mount volume: %s", f.volName)
if err := f.manager.mount(f, device, dir); err != nil {
if !isCmdNotSupportedErr(err) {
glog.Errorf("Failed to mount volume: %s", f.volName)
glog.Errorf("failed to mount volume: %s", f.volName)
return err
}
options := make([]string, 0)
@ -318,13 +320,15 @@ func (f *flexVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
os.MkdirAll(dir, 0750)
// Mount not supported by driver. Use core mounting logic.
glog.V(4).Infof("attempting to mount the volume: %s to device: %s", f.volName, device)
err = f.blockDeviceMounter.Mount(string(device), dir, f.fsType, options)
if err != nil {
glog.Errorf("Failed to mount the volume: %s, device: %s, error: %s", f.volName, device, err.Error())
glog.Errorf("failed to mount the volume: %s to device: %s, error: %v", f.volName, device, err)
return err
}
}
glog.V(4).Infof("Successfully mounted volume: %s on device: %s", f.volName, device)
return nil
}
@ -370,7 +374,7 @@ func (f *flexVolumeUnmounter) TearDownAt(dir string) error {
}
// Unmount not supported by the driver. Use core unmount logic.
if err := f.mounter.Unmount(dir); err != nil {
glog.Errorf("Failed to unmount volume: %s, error: %s", dir, err.Error())
glog.Errorf("Failed to unmount volume: %s, error: %v", dir, err)
return err
}
}
@ -378,7 +382,7 @@ func (f *flexVolumeUnmounter) TearDownAt(dir string) error {
if refCount == 1 {
if err := f.manager.detach(f, device); err != nil {
if !isCmdNotSupportedErr(err) {
glog.Errorf("Failed to teardown volume: %s, error: %s", dir, err.Error())
glog.Errorf("Failed to teardown volume: %s, error: %v", dir, err)
return err
}
// Teardown not supported by driver. Unmount is good enough.

View File

@ -234,6 +234,7 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
glog.V(4).Infof("PersistentDisk set up: %s %v %v, pd name %v readOnly %v", dir, !notMnt, err, b.pdName, b.readOnly)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mount point: %s %v", dir, err)
return err
}
if !notMnt {
@ -241,6 +242,7 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
}
if err := os.MkdirAll(dir, 0750); err != nil {
glog.Errorf("mkdir failed on disk %s (%v)", dir, err)
return err
}
@ -251,6 +253,8 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
}
globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName)
glog.V(4).Infof("attempting to mount %s", dir)
err = b.mounter.Mount(globalPDPath, dir, "", options)
if err != nil {
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
@ -275,6 +279,7 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
}
}
os.Remove(dir)
glog.Errorf("Mount of disk %s failed: %v", dir, err)
return err
}
@ -282,6 +287,7 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
volume.SetVolumeOwnership(b, fsGroup)
}
glog.V(4).Infof("Successfully mounted %s", dir)
return nil
}

View File

@ -41,8 +41,10 @@ type gitRepoPlugin struct {
var _ volume.VolumePlugin = &gitRepoPlugin{}
var wrappedVolumeSpec = volume.Spec{
Volume: &api.Volume{VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}},
func wrappedVolumeSpec() volume.Spec {
return volume.Spec{
Volume: &api.Volume{VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}},
}
}
const (
@ -61,7 +63,7 @@ func (plugin *gitRepoPlugin) GetPluginName() string {
func (plugin *gitRepoPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
volumeSource, _ := getVolumeSource(spec)
if volumeSource == nil {
return "", fmt.Errorf("Spec does not reference a GCE volume type")
return "", fmt.Errorf("Spec does not reference a Git repo volume type")
}
return fmt.Sprintf(
@ -155,7 +157,7 @@ func (b *gitRepoVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
}
// Wrap EmptyDir, let it do the setup.
wrapped, err := b.plugin.host.NewWrapperMounter(b.volName, wrappedVolumeSpec, &b.pod, b.opts)
wrapped, err := b.plugin.host.NewWrapperMounter(b.volName, wrappedVolumeSpec(), &b.pod, b.opts)
if err != nil {
return err
}
@ -237,7 +239,7 @@ func (c *gitRepoVolumeUnmounter) TearDown() error {
func (c *gitRepoVolumeUnmounter) TearDownAt(dir string) error {
// Wrap EmptyDir, let it do the teardown.
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec, c.podUID)
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec(), c.podUID)
if err != nil {
return err
}

View File

@ -210,9 +210,10 @@ func (b *rbdMounter) SetUp(fsGroup *int64) error {
func (b *rbdMounter) SetUpAt(dir string, fsGroup *int64) error {
// diskSetUp checks mountpoints and prevent repeated calls
glog.V(4).Infof("rbd: attempting to SetUp and mount %s", dir)
err := diskSetUp(b.manager, *b, dir, b.mounter, fsGroup)
if err != nil {
glog.Errorf("rbd: failed to setup")
glog.Errorf("rbd: failed to setup mount %s %v", dir, err)
}
return err
}

View File

@ -45,8 +45,10 @@ type secretPlugin struct {
var _ volume.VolumePlugin = &secretPlugin{}
var wrappedVolumeSpec = volume.Spec{
Volume: &api.Volume{VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: api.StorageMediumMemory}}},
func wrappedVolumeSpec() volume.Spec {
return volume.Spec{
Volume: &api.Volume{VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: api.StorageMediumMemory}}},
}
}
func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
@ -150,7 +152,7 @@ func (b *secretVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
glog.V(3).Infof("Setting up volume %v for pod %v at %v", b.volName, b.pod.UID, dir)
// Wrap EmptyDir, let it do the setup.
wrapped, err := b.plugin.host.NewWrapperMounter(b.volName, wrappedVolumeSpec, &b.pod, *b.opts)
wrapped, err := b.plugin.host.NewWrapperMounter(b.volName, wrappedVolumeSpec(), &b.pod, *b.opts)
if err != nil {
return err
}
@ -249,7 +251,7 @@ func (c *secretVolumeUnmounter) TearDownAt(dir string) error {
glog.V(3).Infof("Tearing down volume %v for pod %v at %v", c.volName, c.podUID, dir)
// Wrap EmptyDir, let it do the teardown.
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec, c.podUID)
wrapped, err := c.plugin.host.NewWrapperUnmounter(c.volName, wrappedVolumeSpec(), c.podUID)
if err != nil {
return err
}

View File

@ -0,0 +1,2 @@
assignees:
- saad-ali

View File

@ -0,0 +1,287 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package nestedpendingoperations is a modified implementation of
pkg/util/goroutinemap. It implements a data structure for managing go routines
by volume/pod name. It prevents the creation of new go routines if an existing
go routine for the volume already exists. It also allows multiple operations to
execute in parallel for the same volume as long as they are operating on
different pods.
*/
package nestedpendingoperations
import (
"fmt"
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
k8sRuntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/volume/util/types"
)
const (
// emptyUniquePodName is a UniquePodName for empty string.
emptyUniquePodName types.UniquePodName = types.UniquePodName("")
)
// NestedPendingOperations defines the supported set of operations.
type NestedPendingOperations interface {
// Run adds the concatenation of volumeName and podName to the list of
// running operations and spawns a new go routine to execute operationFunc.
// If an operation with the same volumeName and same or empty podName
// exists, an AlreadyExists or ExponentialBackoff error is returned.
// This enables multiple operations to execute in parallel for the same
// volumeName as long as they have different podName.
// Once the operation is complete, the go routine is terminated and the
// concatenation of volumeName and podName is removed from the list of
// executing operations allowing a new operation to be started with the
// volumeName without error.
Run(volumeName api.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error) error
// Wait blocks until all operations are completed. This is typically
// necessary during tests - the test should wait until all operations finish
// and evaluate results after that.
Wait()
}
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
func NewNestedPendingOperations(exponentialBackOffOnError bool) NestedPendingOperations {
g := &nestedPendingOperations{
operations: []operation{},
exponentialBackOffOnError: exponentialBackOffOnError,
lock: &sync.Mutex{},
}
g.cond = sync.NewCond(g.lock)
return g
}
type nestedPendingOperations struct {
operations []operation
exponentialBackOffOnError bool
cond *sync.Cond
lock *sync.Mutex
}
type operation struct {
volumeName api.UniqueVolumeName
podName types.UniquePodName
operationPending bool
expBackoff exponentialbackoff.ExponentialBackoff
}
func (grm *nestedPendingOperations) Run(
volumeName api.UniqueVolumeName,
podName types.UniquePodName,
operationFunc func() error) error {
grm.lock.Lock()
defer grm.lock.Unlock()
var previousOp operation
opExists := false
previousOpIndex := -1
for previousOpIndex, previousOp = range grm.operations {
if previousOp.volumeName != volumeName {
// No match, keep searching
continue
}
if previousOp.podName != emptyUniquePodName &&
podName != emptyUniquePodName &&
previousOp.podName != podName {
// No match, keep searching
continue
}
// Match
opExists = true
break
}
if opExists {
// Operation already exists
if previousOp.operationPending {
// Operation is pending
operationName := getOperationName(volumeName, podName)
return NewAlreadyExistsError(operationName)
}
operationName := getOperationName(volumeName, podName)
if err := previousOp.expBackoff.SafeToRetry(operationName); err != nil {
return err
}
// Update existing operation to mark as pending.
grm.operations[previousOpIndex].operationPending = true
grm.operations[previousOpIndex].volumeName = volumeName
grm.operations[previousOpIndex].podName = podName
} else {
// Create a new operation
grm.operations = append(grm.operations,
operation{
operationPending: true,
volumeName: volumeName,
podName: podName,
expBackoff: exponentialbackoff.ExponentialBackoff{},
})
}
go func() (err error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(volumeName, podName, &err)
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
}()
return nil
}
func (grm *nestedPendingOperations) getOperation(
volumeName api.UniqueVolumeName,
podName types.UniquePodName) (uint, error) {
// Assumes lock has been acquired by caller.
for i, op := range grm.operations {
if op.volumeName == volumeName &&
op.podName == podName {
return uint(i), nil
}
}
logOperationName := getOperationName(volumeName, podName)
return 0, fmt.Errorf("Operation %q not found.", logOperationName)
}
func (grm *nestedPendingOperations) deleteOperation(
// Assumes lock has been acquired by caller.
volumeName api.UniqueVolumeName,
podName types.UniquePodName) {
opIndex := -1
for i, op := range grm.operations {
if op.volumeName == volumeName &&
op.podName == podName {
opIndex = i
break
}
}
// Delete index without preserving order
grm.operations[opIndex] = grm.operations[len(grm.operations)-1]
grm.operations = grm.operations[:len(grm.operations)-1]
}
func (grm *nestedPendingOperations) operationComplete(
volumeName api.UniqueVolumeName, podName types.UniquePodName, err *error) {
// Defer operations are executed in Last-In is First-Out order. In this case
// the lock is acquired first when operationCompletes begins, and is
// released when the method finishes, after the lock is released cond is
// signaled to wake waiting goroutine.
defer grm.cond.Signal()
grm.lock.Lock()
defer grm.lock.Unlock()
if *err == nil || !grm.exponentialBackOffOnError {
// Operation completed without error, or exponentialBackOffOnError disabled
grm.deleteOperation(volumeName, podName)
if *err != nil {
// Log error
logOperationName := getOperationName(volumeName, podName)
glog.Errorf("operation %s failed with: %v",
logOperationName,
*err)
}
return
}
// Operation completed with error and exponentialBackOffOnError Enabled
existingOpIndex, getOpErr := grm.getOperation(volumeName, podName)
if getOpErr != nil {
// Failed to find existing operation
logOperationName := getOperationName(volumeName, podName)
glog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
logOperationName,
*err)
return
}
grm.operations[existingOpIndex].expBackoff.Update(err)
grm.operations[existingOpIndex].operationPending = false
// Log error
operationName :=
getOperationName(volumeName, podName)
glog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
GenerateNoRetriesPermittedMsg(operationName))
}
func (grm *nestedPendingOperations) Wait() {
grm.lock.Lock()
defer grm.lock.Unlock()
for len(grm.operations) > 0 {
grm.cond.Wait()
}
}
func getOperationName(
volumeName api.UniqueVolumeName, podName types.UniquePodName) string {
podNameStr := ""
if podName != emptyUniquePodName {
podNameStr = fmt.Sprintf(" (%q)", podName)
}
return fmt.Sprintf("%q%s",
volumeName,
podNameStr)
}
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
func NewAlreadyExistsError(operationName string) error {
return alreadyExistsError{operationName}
}
// IsAlreadyExists returns true if an error returned from
// NestedPendingOperations indicates a new operation can not be started because
// an operation with the same operation name is already executing.
func IsAlreadyExists(err error) bool {
switch err.(type) {
case alreadyExistsError:
return true
default:
return false
}
}
// alreadyExistsError is the error returned by NestedPendingOperations when a
// new operation can not be started because an operation with the same operation
// name is already executing.
type alreadyExistsError struct {
operationName string
}
var _ error = alreadyExistsError{}
func (err alreadyExistsError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name is already executing.",
err.operationName)
}

View File

@ -0,0 +1,2 @@
assignees:
- saad-ali

View File

@ -15,8 +15,9 @@ limitations under the License.
*/
// Package operationexecutor implements interfaces that enable execution of
// attach, detach, mount, and unmount operations with a goroutinemap so that
// more than one operation is never triggered on the same volume.
// attach, detach, mount, and unmount operations with a
// nestedpendingoperations so that more than one operation is never triggered
// on the same volume for the same pod.
package operationexecutor
import (
@ -27,13 +28,14 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// OperationExecutor defines a set of operations for attaching, detaching,
// mounting, or unmounting a volume that are executed with a goroutinemap which
// mounting, or unmounting a volume that are executed with a NewNestedPendingOperations which
// prevents more than one operation from being triggered on the same volume.
//
// These operations should be idempotent (for example, AttachVolume should
@ -105,7 +107,7 @@ func NewOperationExecutor(
return &operationExecutor{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
pendingOperations: goroutinemap.NewGoRoutineMap(
pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
true /* exponentialBackOffOnError */),
}
}
@ -129,8 +131,14 @@ type ActualStateOfWorldMounterUpdater interface {
// ActualStateOfWorldAttacherUpdater defines a set of operations updating the
// actual state of the world cache after successful attach/detach/mount/unmount.
type ActualStateOfWorldAttacherUpdater interface {
// Marks the specified volume as attached to the specified node
MarkVolumeAsAttached(volumeSpec *volume.Spec, nodeName string, devicePath string) error
// Marks the specified volume as attached to the specified node. If the
// volume name is supplied, that volume name will be used. If not, the
// volume name is computed using the result from querying the plugin.
//
// TODO: in the future, we should be able to remove the volumeName
// argument to this method -- since it is used only for attachable
// volumes. See issue 29695.
MarkVolumeAsAttached(volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, nodeName, devicePath string) error
// Marks the specified volume as detached from the specified node
MarkVolumeAsDetached(volumeName api.UniqueVolumeName, nodeName string)
@ -323,7 +331,7 @@ type operationExecutor struct {
// pendingOperations keeps track of pending attach and detach operations so
// multiple operations are not started on the same volume
pendingOperations goroutinemap.GoRoutineMap
pendingOperations nestedpendingoperations.NestedPendingOperations
}
func (oe *operationExecutor) AttachVolume(
@ -336,7 +344,7 @@ func (oe *operationExecutor) AttachVolume(
}
return oe.pendingOperations.Run(
string(volumeToAttach.VolumeName), attachFunc)
volumeToAttach.VolumeName, "" /* podName */, attachFunc)
}
func (oe *operationExecutor) DetachVolume(
@ -350,7 +358,7 @@ func (oe *operationExecutor) DetachVolume(
}
return oe.pendingOperations.Run(
string(volumeToDetach.VolumeName), detachFunc)
volumeToDetach.VolumeName, "" /* podName */, detachFunc)
}
func (oe *operationExecutor) MountVolume(
@ -363,8 +371,16 @@ func (oe *operationExecutor) MountVolume(
return err
}
podName := volumetypes.UniquePodName("")
// TODO: remove this -- not necessary
if !volumeToMount.PluginIsAttachable {
// Non-attachable volume plugins can execute mount for multiple pods
// referencing the same volume in parallel
podName = volumehelper.GetUniquePodName(volumeToMount.Pod)
}
return oe.pendingOperations.Run(
string(volumeToMount.VolumeName), mountFunc)
volumeToMount.VolumeName, podName, mountFunc)
}
func (oe *operationExecutor) UnmountVolume(
@ -376,8 +392,12 @@ func (oe *operationExecutor) UnmountVolume(
return err
}
// All volume plugins can execute mount for multiple pods referencing the
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
return oe.pendingOperations.Run(
string(volumeToUnmount.VolumeName), unmountFunc)
volumeToUnmount.VolumeName, podName, unmountFunc)
}
func (oe *operationExecutor) UnmountDevice(
@ -390,7 +410,7 @@ func (oe *operationExecutor) UnmountDevice(
}
return oe.pendingOperations.Run(
string(deviceToDetach.VolumeName), unmountDeviceFunc)
deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc)
}
func (oe *operationExecutor) VerifyControllerAttachedVolume(
@ -404,7 +424,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
}
return oe.pendingOperations.Run(
string(volumeToMount.VolumeName), verifyControllerAttachedVolumeFunc)
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc)
}
func (oe *operationExecutor) generateAttachVolumeFunc(
@ -455,7 +475,7 @@ func (oe *operationExecutor) generateAttachVolumeFunc(
// Update actual state of world
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
api.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
@ -894,12 +914,13 @@ func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
// If the volume does not implement the attacher interface, it is
// assumed to be attached and the the actual state of the world is
// updated accordingly.
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeSpec, nodeName, volumeToMount.DevicePath)
volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"VerifyControllerAttachedVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v.",
"VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
@ -950,7 +971,7 @@ func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
for _, attachedVolume := range node.Status.VolumesAttached {
if attachedVolume.Name == volumeToMount.VolumeName {
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
api.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
glog.Infof("Controller successfully attached volume %q (spec.Name: %q) pod %q (UID: %q) devicePath: %q",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),

View File

@ -52,6 +52,12 @@ func GetUniqueVolumeName(pluginName, volumeName string) api.UniqueVolumeName {
return api.UniqueVolumeName(fmt.Sprintf("%s/%s", pluginName, volumeName))
}
// GetUniqueVolumeNameForNonAttachableVolume returns the unique volume name
// for a non-attachable volume.
func GetUniqueVolumeNameForNonAttachableVolume(podName types.UniquePodName, volumePlugin volume.VolumePlugin, podSpecName string) api.UniqueVolumeName {
return api.UniqueVolumeName(fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, podSpecName))
}
// GetUniqueVolumeNameFromSpec uses the given VolumePlugin to generate a unique
// name representing the volume defined in the specified volume spec.
// This returned name can be used to uniquely reference the actual backing

View File

@ -136,7 +136,7 @@ func (l *persistentVolumeLabel) getEBSVolumes() (aws.Volumes, error) {
if err != nil || cloudProvider == nil {
return nil, err
}
awsCloudProvider, ok := cloudProvider.(*aws.AWSCloud)
awsCloudProvider, ok := cloudProvider.(*aws.Cloud)
if !ok {
// GetCloudProvider has gone very wrong
return nil, fmt.Errorf("error retrieving AWS cloud provider")