diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 8af0037900..3b5cdce423 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -19,6 +19,7 @@ limitations under the License. package main import ( + "errors" "fmt" "io/ioutil" "net" @@ -34,13 +35,14 @@ import ( kubeletapp "github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir" @@ -118,7 +120,7 @@ func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusNotFound) } -func startComponents(manifestURL string) (apiServerURL string) { +func startComponents(manifestURL string) (string, string) { // Setup servers := []string{} glog.Infof("Creating etcd client pointing to %v", servers) @@ -215,21 +217,24 @@ func startComponents(manifestURL string) (apiServerURL string) { cadvisorInterface := new(cadvisor.Fake) // Kubelet (localhost) - testRootDir := makeTempDirOrDie("kubelet_integ_1.") + testRootDir := makeTempDirOrDie("kubelet_integ_1.", "") + configFilePath := makeTempDirOrDie("config", testRootDir) glog.Infof("Using %s as root dir for kubelet #1", testRootDir) - kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface) + kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. - testRootDir = makeTempDirOrDie("kubelet_integ_2.") + testRootDir = makeTempDirOrDie("kubelet_integ_2.", "") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) - kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface) - - return apiServer.URL + kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "") + return apiServer.URL, configFilePath } -func makeTempDirOrDie(prefix string) string { - tempDir, err := ioutil.TempDir("/tmp", prefix) +func makeTempDirOrDie(prefix string, baseDir string) string { + if baseDir == "" { + baseDir = "/tmp" + } + tempDir, err := ioutil.TempDir(baseDir, prefix) if err != nil { glog.Fatalf("Can't make a temp rootdir: %v", err) } @@ -278,6 +283,60 @@ func podExists(c *client.Client, podNamespace string, podID string) wait.Conditi } } +func podNotFound(c *client.Client, podNamespace string, podID string) wait.ConditionFunc { + return func() (bool, error) { + _, err := c.Pods(podNamespace).Get(podID) + return apierrors.IsNotFound(err), nil + } +} + +func podRunning(c *client.Client, podNamespace string, podID string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.Pods(podNamespace).Get(podID) + if err != nil { + return false, err + } + if pod.Status.Phase != api.PodRunning { + return false, errors.New(fmt.Sprintf("Pod status is %q", pod.Status.Phase)) + } + return true, nil + } +} + +func runStaticPodTest(c *client.Client, configFilePath string) { + manifest := `version: v1beta2 +id: static-pod +containers: + - name: static-container + image: kubernetes/pause` + + manifestFile, err := ioutil.TempFile(configFilePath, "") + defer os.Remove(manifestFile.Name()) + ioutil.WriteFile(manifestFile.Name(), []byte(manifest), 0600) + + // Wait for the mirror pod to be created. + hostname, _ := os.Hostname() + podName := fmt.Sprintf("static-pod-%s", hostname) + namespace := kubelet.NamespaceDefault + if err := wait.Poll(time.Second, time.Second*30, + podRunning(c, namespace, podName)); err != nil { + glog.Fatalf("FAILED: mirror pod has not been created or is not running: %v", err) + } + // Delete the mirror pod, and wait for it to be recreated. + c.Pods(namespace).Delete(podName) + if err = wait.Poll(time.Second, time.Second*30, + podRunning(c, namespace, podName)); err != nil { + glog.Fatalf("FAILED: mirror pod has not been re-created or is not running: %v", err) + } + // Remove the manifest file, and wait for the mirror pod to be deleted. + os.Remove(manifestFile.Name()) + if err = wait.Poll(time.Second, time.Second*30, + podNotFound(c, namespace, podName)); err != nil { + glog.Fatalf("FAILED: mirror pod has not been deleted: %v", err) + } + +} + func runReplicationControllerTest(c *client.Client) { data, err := ioutil.ReadFile("cmd/integration/controller.json") if err != nil { @@ -447,7 +506,7 @@ func runAtomicPutTest(c *client.Client) { glog.Infof("Posting update (%s, %s)", l, v) err = c.Put().Resource("services").Name(svc.Name).Body(&tmpSvc).Do().Error() if err != nil { - if errors.IsConflict(err) { + if apierrors.IsConflict(err) { glog.Infof("Conflict: (%s, %s)", l, v) // This is what we expect. continue @@ -733,7 +792,7 @@ func main() { manifestURL := ServeCachedManifestFile() - apiServerURL := startComponents(manifestURL) + apiServerURL, configFilePath := startComponents(manifestURL) // Ok. we're good to go. glog.Infof("API Server started on %s", apiServerURL) @@ -754,6 +813,9 @@ func main() { runSelfLinkTestOnNamespace(c, "") runSelfLinkTestOnNamespace(c, "other") }, + func(c *client.Client) { + runStaticPodTest(c, configFilePath) + }, } var wg sync.WaitGroup wg.Add(len(testFuncs)) @@ -782,11 +844,15 @@ func main() { createdConts.Insert(p[:n-8]) } } - // We expect 9: 2 infra containers + 2 containers from the replication controller + - // 1 infra container + 2 containers from the URL + - // 1 infra container + 1 container from the service test. - if len(createdConts) != 9 { - glog.Fatalf("Expected 9 containers; got %v\n\nlist of created containers:\n\n%#v\n\nDocker 1 Created:\n\n%#v\n\nDocker 2 Created:\n\n%#v\n\n", len(createdConts), createdConts.List(), fakeDocker1.Created, fakeDocker2.Created) + // We expect 9: 2 pod infra containers + 2 pods from the replication controller + + // 1 pod infra container + 2 pods from the URL + + // 1 pod infra container + 1 pod from the service test. + // In addition, runStaticPodTest creates 1 pod infra containers + + // 1 pod container from the mainfest file + // The total number of container created is 11 + + if len(createdConts) != 11 { + glog.Fatalf("Expected 11 containers; got %v\n\nlist of created containers:\n\n%#v\n\nDocker 1 Created:\n\n%#v\n\nDocker 2 Created:\n\n%#v\n\n", len(createdConts), createdConts.List(), fakeDocker1.Created, fakeDocker2.Created) } glog.Infof("OK - found created containers: %#v", createdConts.List()) } diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 7af1fff555..14fa2860a9 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -244,7 +244,8 @@ func SimpleRunKubelet(client *client.Client, masterServiceNamespace string, volumePlugins []volume.Plugin, tlsOptions *kubelet.TLSOptions, - cadvisorInterface cadvisor.Interface) { + cadvisorInterface cadvisor.Interface, + configFilePath string) { kcfg := KubeletConfig{ KubeClient: client, DockerClient: dockerClient, @@ -264,6 +265,7 @@ func SimpleRunKubelet(client *client.Client, VolumePlugins: volumePlugins, TLSOptions: tlsOptions, CadvisorInterface: cadvisorInterface, + ConfigFile: configFilePath, } RunKubelet(&kcfg) } diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index cf17588494..456ec4e080 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -151,7 +151,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP if err != nil { glog.Fatalf("Failed to create cAdvisor: %v", err) } - kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface) + kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "") } func newApiClient(addr net.IP, port int) *client.Client { diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 74c54b60f5..f67cd01f8c 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -206,7 +206,7 @@ func extractFromFile(filename string) (api.Pod, error) { if glog.V(4) { glog.Infof("Got pod from file %q: %#v", filename, pod) } else { - glog.V(1).Infof("Got pod from file %q: %s.%s (%s)", filename, pod.Namespace, pod.Name, pod.UID) + glog.V(5).Infof("Got pod from file %q: %s.%s (%s)", filename, pod.Namespace, pod.Name, pod.UID) } return pod, nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index dc60bb880f..833826a739 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -78,19 +78,23 @@ const ( ) var ( - // ErrNoKubeletContainers returned when there are not containers managed by the kubelet (ie: either no containers on the node, or none that the kubelet cares about). + // ErrNoKubeletContainers returned when there are not containers managed by + // the kubelet (ie: either no containers on the node, or none that the kubelet cares about). ErrNoKubeletContainers = errors.New("no containers managed by kubelet") - // ErrContainerNotFound returned when a container in the given pod with the given container name was not found, amongst those managed by the kubelet. + // ErrContainerNotFound returned when a container in the given pod with the + // given container name was not found, amongst those managed by the kubelet. ErrContainerNotFound = errors.New("no matching container") ) // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { + // Syncs current state to match the specified pods. SyncPodType specified what // type of sync is occuring per pod. StartTime specifies the time at which // syncing began (for use in monitoring). - SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error + SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, + startTime time.Time) error } type SourcesReadyFn func() bool @@ -206,6 +210,8 @@ func NewMainKubelet( klet.podStatuses = make(map[string]api.PodStatus) + klet.mirrorManager = newBasicMirrorManager(klet.kubeClient) + return klet, nil } @@ -235,6 +241,11 @@ type Kubelet struct { // pods are immutable podLock sync.RWMutex pods []api.Pod + // Record the set of mirror pods (see mirror_manager.go for more details); + // similar to pods, this is not immutable and is protected by the same podLock. + // Note that Kubelet.pods do not contain mirror pods as they are filtered + // out beforehand. + mirrorPods util.StringSet // Needed to report events for containers belonging to deleted/modified pods. // Tracks references for reporting events @@ -288,6 +299,9 @@ type Kubelet struct { // A pod status cache currently used to store rejected pods and their statuses. podStatusesLock sync.RWMutex podStatuses map[string]api.PodStatus + + // A mirror pod manager which provides helper functions. + mirrorManager mirrorManager } // getRootDir returns the full path to the directory under which kubelet can @@ -1240,7 +1254,7 @@ type podContainerChangesSpec struct { containersToKeep map[dockertools.DockerID]int } -func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) { +func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) { podFullName := GetPodFullName(pod) uid := pod.UID glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) @@ -1343,10 +1357,10 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dock }, nil } -func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerContainers) error { +func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) error { podFullName := GetPodFullName(pod) uid := pod.UID - containerChanges, err := kl.computePodContainerChanges(pod, containersInPod) + containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, containersInPod) glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) if err != nil { return err @@ -1416,6 +1430,13 @@ func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerConta kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID) } + if !hasMirrorPod && isStaticPod(pod) { + glog.V(4).Infof("Creating a mirror pod %q", podFullName) + if err := kl.mirrorManager.CreateMirrorPod(*pod, kl.hostname); err != nil { + glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err) + } + } + return nil } @@ -1496,7 +1517,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error { +func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, start time.Time) error { defer func() { metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() @@ -1543,7 +1564,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric } // Run the sync in an async manifest worker. - kl.podWorkers.UpdatePod(pod, func() { + kl.podWorkers.UpdatePod(pod, kl.mirrorPods.Has(podFullName), func() { metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) }) @@ -1604,6 +1625,9 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric return err } + // Remove any orphaned mirror pods. + deleteOrphanedMirrorPods(pods, mirrorPods, kl.mirrorManager) + return err } @@ -1704,18 +1728,18 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } } - pods, err := kl.GetPods() + pods, mirrorPods, err := kl.GetPods() if err != nil { glog.Errorf("Failed to get bound pods.") return } - if err := handler.SyncPods(pods, podSyncTypes, start); err != nil { + if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil { glog.Errorf("Couldn't sync containers: %v", err) } } } -// Updated the Kubelet's internal pods with those provided by the update. +// Update the Kubelet's internal pods with those provided by the update. // Records new and updated pods in newPods and updatedPods. func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { kl.podLock.Lock() @@ -1723,6 +1747,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy switch u.Op { case SET: glog.V(3).Infof("SET: Containers changed") + newPods, newMirrorPods := filterAndCategorizePods(u.Pods) // Store the new pods. Don't worry about filtering host ports since those // pods will never be looked up. @@ -1730,13 +1755,15 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy for i := range kl.pods { existingPods[kl.pods[i].UID] = struct{}{} } - for i := range u.Pods { - if _, ok := existingPods[u.Pods[i].UID]; !ok { - podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate + for _, pod := range newPods { + if _, ok := existingPods[pod.UID]; !ok { + podSyncTypes[pod.UID] = metrics.SyncPodCreate } } + // Actually update the pods. + kl.pods = newPods + kl.mirrorPods = newMirrorPods - kl.pods = u.Pods kl.handleHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") @@ -1746,7 +1773,8 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy for i := range u.Pods { podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate } - kl.pods = updatePods(u.Pods, kl.pods) + allPods := updatePods(u.Pods, kl.pods) + kl.pods, kl.mirrorPods = filterAndCategorizePods(allPods) kl.handleHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") @@ -1818,11 +1846,12 @@ func (kl *Kubelet) GetHostname() string { return kl.hostname } -// GetPods returns all pods bound to the kubelet and their spec. -func (kl *Kubelet) GetPods() ([]api.Pod, error) { +// GetPods returns all pods bound to the kubelet and their spec, and the mirror +// pod map. +func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet, error) { kl.podLock.RLock() defer kl.podLock.RUnlock() - return append([]api.Pod{}, kl.pods...), nil + return append([]api.Pod{}, kl.pods...), kl.mirrorPods, nil } // GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b9414f4be9..89fecc7595 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -53,11 +53,12 @@ func init() { } type TestKubelet struct { - kubelet *Kubelet - fakeDocker *dockertools.FakeDockerClient - fakeCadvisor *cadvisor.Mock - fakeKubeClient *client.Fake - waitGroup *sync.WaitGroup + kubelet *Kubelet + fakeDocker *dockertools.FakeDockerClient + fakeCadvisor *cadvisor.Mock + fakeKubeClient *client.Fake + waitGroup *sync.WaitGroup + fakeMirrorManager *fakeMirrorManager } func newTestKubelet(t *testing.T) *TestKubelet { @@ -83,8 +84,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { waitGroup := new(sync.WaitGroup) kubelet.podWorkers = newPodWorkers( fakeDockerCache, - func(pod *api.Pod, containers dockertools.DockerContainers) error { - err := kubelet.syncPod(pod, containers) + func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error { + err := kubelet.syncPod(pod, hasMirrorPod, containers) waitGroup.Done() return err }, @@ -100,8 +101,9 @@ func newTestKubelet(t *testing.T) *TestKubelet { } mockCadvisor := &cadvisor.Mock{} kubelet.cadvisor = mockCadvisor - - return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup} + mirrorManager := newFakeMirrorMananger() + kubelet.mirrorManager = mirrorManager + return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, mirrorManager} } func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { @@ -442,7 +444,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -475,7 +477,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -524,7 +526,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -577,7 +579,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -627,7 +629,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -684,7 +686,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -753,7 +755,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { }, } waitGroup.Add(2) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -793,7 +795,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ID: "9876", }, } - if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now()); err != nil { + if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } // Validate nothing happened. @@ -801,7 +803,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { fakeDocker.ClearCalls() ready = true - if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now()); err != nil { + if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) @@ -839,7 +841,7 @@ func TestSyncPodsDeletes(t *testing.T) { ID: "4567", }, } - err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -893,7 +895,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { }, } kubelet.pods = append(kubelet.pods, bound) - err := kubelet.syncPod(&bound, dockerContainers) + err := kubelet.syncPod(&bound, false, dockerContainers) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -934,7 +936,7 @@ func TestSyncPodBadHash(t *testing.T) { }, } kubelet.pods = append(kubelet.pods, bound) - err := kubelet.syncPod(&bound, dockerContainers) + err := kubelet.syncPod(&bound, false, dockerContainers) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -988,7 +990,7 @@ func TestSyncPodUnhealthy(t *testing.T) { }, } kubelet.pods = append(kubelet.pods, bound) - err := kubelet.syncPod(&bound, dockerContainers) + err := kubelet.syncPod(&bound, false, dockerContainers) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1678,7 +1680,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { }, } kubelet.pods = append(kubelet.pods, bound) - err := kubelet.syncPod(&bound, dockerContainers) + err := kubelet.syncPod(&bound, false, dockerContainers) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2075,7 +2077,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { }, }, }, - }, emptyPodUIDs, time.Now()) + }, emptyPodUIDs, util.NewStringSet(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3176,7 +3178,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) } // Sync with empty pods so that the entry in status map will be removed. - kl.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now()) + kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()) if len(kl.podStatuses) != 0 { t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses) } @@ -3388,3 +3390,57 @@ func TestUpdateNodeStatusError(t *testing.T) { t.Errorf("unexpected actions: %v", kubeClient.Actions) } } + +func TestCreateMirrorPod(t *testing.T) { + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet + manager := testKubelet.fakeMirrorManager + pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "bar", + Namespace: "foo", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "file", + }, + }, + } + kl.pods = append(kl.pods, pod) + hasMirrorPod := false + err := kl.syncPod(&pod, hasMirrorPod, dockertools.DockerContainers{}) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + podFullName := GetPodFullName(&pod) + if !manager.HasPod(podFullName) { + t.Errorf("expected mirror pod %q to be created", podFullName) + } + if manager.NumOfPods() != 1 || !manager.HasPod(podFullName) { + t.Errorf("expected one mirror pod %q, got %v", podFullName, manager.GetPods()) + } +} + +func TestDeleteOrphanedMirrorPods(t *testing.T) { + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet + manager := testKubelet.fakeMirrorManager + orphanedPodNames := []string{"pod1_ns", "pod2_ns"} + mirrorPods := util.NewStringSet() + for _, name := range orphanedPodNames { + mirrorPods.Insert(name) + } + // Sync with an empty pod list to delete all mirror pods. + err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if manager.NumOfPods() != 0 { + t.Errorf("expected zero mirror pods, got %v", manager.GetPods()) + } + for _, name := range orphanedPodNames { + creates, deletes := manager.GetCounts(name) + if creates != 0 || deletes != 1 { + t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes) + } + } +} diff --git a/pkg/kubelet/mirror_manager.go b/pkg/kubelet/mirror_manager.go new file mode 100644 index 0000000000..97ddb4d6f1 --- /dev/null +++ b/pkg/kubelet/mirror_manager.go @@ -0,0 +1,138 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" +) + +// Kubelet discover pod updates from 3 sources: file, http, and apiserver. +// Pods from non-apiserver sources are called static pods, and API server is +// not aware of the existence of static pods. In order to monitor the status of +// such pods, kubelet create a mirror pod for each static pod via the API +// server. +// +// A mirror pod has the same pod full name (name and namespace) as its static +// counterpart (albeit different metadata such as UID, etc). By leveraging the +// fact that kubelet reports the pod status using the pod full name, the status +// of the mirror pod always reflects the acutal status of the static pod. +// When a static pod gets deleted, the associated orphaned mirror pods will +// also be removed. +// +// This file includes functions to manage the mirror pods. + +type mirrorManager interface { + CreateMirrorPod(api.Pod, string) error + DeleteMirrorPod(string) error +} + +type basicMirrorManager struct { + // mirror pods are stored in the kubelet directly because they need to be + // in sync with the internal pods. + apiserverClient client.Interface +} + +func newBasicMirrorManager(apiserverClient client.Interface) *basicMirrorManager { + return &basicMirrorManager{apiserverClient: apiserverClient} +} + +// Creates a mirror pod. +func (self *basicMirrorManager) CreateMirrorPod(pod api.Pod, hostname string) error { + if self.apiserverClient == nil { + return nil + } + // Indicate that the pod should be scheduled to the current node. + pod.Spec.Host = hostname + pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType + + _, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod) + return err +} + +// Deletes a mirror pod. +func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error { + if self.apiserverClient == nil { + return nil + } + name, namespace, err := ParsePodFullName(podFullName) + if err != nil { + glog.Errorf("Failed to parse a pod full name %q", podFullName) + return err + } + glog.V(4).Infof("Deleting a mirror pod %q", podFullName) + if err := self.apiserverClient.Pods(namespace).Delete(name); err != nil { + glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) + } + return nil +} + +// Delete all orphaned mirror pods. +func deleteOrphanedMirrorPods(pods []api.Pod, mirrorPods util.StringSet, manager mirrorManager) { + existingPods := util.NewStringSet() + for _, pod := range pods { + existingPods.Insert(GetPodFullName(&pod)) + } + for podFullName := range mirrorPods { + if !existingPods.Has(podFullName) { + manager.DeleteMirrorPod(podFullName) + } + } +} + +// Helper functions. +func getPodSource(pod *api.Pod) (string, error) { + if pod.Annotations != nil { + if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok { + return source, nil + } + } + return "", fmt.Errorf("cannot get source of pod %q", pod.UID) +} + +func isStaticPod(pod *api.Pod) bool { + source, err := getPodSource(pod) + return err == nil && source != ApiserverSource +} + +func isMirrorPod(pod *api.Pod) bool { + if value, ok := pod.Annotations[ConfigMirrorAnnotationKey]; !ok { + return false + } else { + return value == MirrorType + } +} + +// This function separate the mirror pods from regular pods to +// facilitate pods syncing and mirror pod creation/deletion. +func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, util.StringSet) { + filteredPods := []api.Pod{} + mirrorPods := util.NewStringSet() + for _, pod := range pods { + name := GetPodFullName(&pod) + if isMirrorPod(&pod) { + mirrorPods.Insert(name) + } else { + filteredPods = append(filteredPods, pod) + } + } + return filteredPods, mirrorPods +} diff --git a/pkg/kubelet/mirror_manager_test.go b/pkg/kubelet/mirror_manager_test.go new file mode 100644 index 0000000000..7bf42d9cb3 --- /dev/null +++ b/pkg/kubelet/mirror_manager_test.go @@ -0,0 +1,158 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "reflect" + "sync" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +type fakeMirrorManager struct { + mirrorPodLock sync.RWMutex + // Note that a real mirror manager does not store the mirror pods in + // itself. This fake manager does this to track calls. + mirrorPods util.StringSet + createCounts map[string]int + deleteCounts map[string]int +} + +func (self *fakeMirrorManager) CreateMirrorPod(pod api.Pod, _ string) error { + self.mirrorPodLock.Lock() + defer self.mirrorPodLock.Unlock() + podFullName := GetPodFullName(&pod) + self.mirrorPods.Insert(podFullName) + self.createCounts[podFullName]++ + return nil +} + +func (self *fakeMirrorManager) DeleteMirrorPod(podFullName string) error { + self.mirrorPodLock.Lock() + defer self.mirrorPodLock.Unlock() + self.mirrorPods.Delete(podFullName) + self.deleteCounts[podFullName]++ + return nil +} + +func newFakeMirrorMananger() *fakeMirrorManager { + m := fakeMirrorManager{} + m.mirrorPods = util.NewStringSet() + m.createCounts = make(map[string]int) + m.deleteCounts = make(map[string]int) + return &m +} + +func (self *fakeMirrorManager) HasPod(podFullName string) bool { + self.mirrorPodLock.RLock() + defer self.mirrorPodLock.RUnlock() + return self.mirrorPods.Has(podFullName) +} + +func (self *fakeMirrorManager) NumOfPods() int { + self.mirrorPodLock.RLock() + defer self.mirrorPodLock.RUnlock() + return self.mirrorPods.Len() +} + +func (self *fakeMirrorManager) GetPods() []string { + self.mirrorPodLock.RLock() + defer self.mirrorPodLock.RUnlock() + return self.mirrorPods.List() +} + +func (self *fakeMirrorManager) GetCounts(podFullName string) (int, int) { + self.mirrorPodLock.RLock() + defer self.mirrorPodLock.RUnlock() + return self.createCounts[podFullName], self.deleteCounts[podFullName] +} + +// Tests that mirror pods are filtered out properly from the pod update. +func TestFilterOutMirrorPods(t *testing.T) { + mirrorPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "987654321", + Name: "bar", + Namespace: "default", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "api", + ConfigMirrorAnnotationKey: "mirror", + }, + }, + } + staticPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "123456789", + Name: "bar", + Namespace: "default", + Annotations: map[string]string{ConfigSourceAnnotationKey: "file"}, + }, + } + + expectedPods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "999999999", + Name: "taco", + Namespace: "default", + Annotations: map[string]string{ConfigSourceAnnotationKey: "api"}, + }, + }, + staticPod, + } + updates := append(expectedPods, mirrorPod) + actualPods, actualMirrorPods := filterAndCategorizePods(updates) + if !reflect.DeepEqual(expectedPods, actualPods) { + t.Errorf("expected %#v, got %#v", expectedPods, actualPods) + } + if !actualMirrorPods.Has(GetPodFullName(&mirrorPod)) { + t.Errorf("mirror pod is not recorded") + } +} + +func TestParsePodFullName(t *testing.T) { + type nameTuple struct { + Name string + Namespace string + } + successfulCases := map[string]nameTuple{ + "bar_foo": {Name: "bar", Namespace: "foo"}, + "bar.org_foo.com": {Name: "bar.org", Namespace: "foo.com"}, + "bar-bar_foo": {Name: "bar-bar", Namespace: "foo"}, + } + failedCases := []string{"barfoo", "bar_foo_foo", ""} + + for podFullName, expected := range successfulCases { + name, namespace, err := ParsePodFullName(podFullName) + if err != nil { + t.Errorf("unexpected error when parsing the full name: %v", err) + continue + } + if name != expected.Name || namespace != expected.Namespace { + t.Errorf("expected name %q, namespace %q; got name %q, namespace %q", + expected.Name, expected.Namespace, name, namespace) + } + } + for _, podFullName := range failedCases { + _, _, err := ParsePodFullName(podFullName) + if err == nil { + t.Errorf("expected error when parsing the full name, got none") + } + } +} diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index e0c169099a..27e534c65d 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -28,7 +28,7 @@ import ( "github.com/golang/glog" ) -type syncPodFnType func(*api.Pod, dockertools.DockerContainers) error +type syncPodFnType func(*api.Pod, bool, dockertools.DockerContainers) error type podWorkers struct { // Protects podUpdates field. @@ -60,11 +60,15 @@ type workUpdate struct { // The pod state to reflect. pod *api.Pod + // Whether there exists a mirror pod for pod. + hasMirrorPod bool + // Function to call when the update is complete. updateCompleteFn func() } -func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers { +func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, + recorder record.EventRecorder) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan workUpdate{}, isWorking: map[types.UID]bool{}, @@ -92,7 +96,8 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { return } - err = p.syncPodFn(newWork.pod, containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod))) + err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod, + containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod))) if err != nil { glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) @@ -106,7 +111,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { } // Apply the new setting to the specified pod. updateComplete is called when the update is completed. -func (p *podWorkers) UpdatePod(pod *api.Pod, updateComplete func()) { +func (p *podWorkers) UpdatePod(pod *api.Pod, hasMirrorPod bool, updateComplete func()) { uid := pod.UID var podUpdates chan workUpdate var exists bool @@ -129,11 +134,13 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, updateComplete func()) { p.isWorking[pod.UID] = true podUpdates <- workUpdate{ pod: pod, + hasMirrorPod: hasMirrorPod, updateCompleteFn: updateComplete, } } else { p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{ pod: pod, + hasMirrorPod: hasMirrorPod, updateCompleteFn: updateComplete, } } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 6ae3cecad2..28a3c6f531 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -46,7 +46,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { podWorkers := newPodWorkers( fakeDockerCache, - func(pod *api.Pod, containers dockertools.DockerContainers) error { + func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error { func() { lock.Lock() defer lock.Unlock() @@ -54,7 +54,8 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { }() return nil }, - recorder) + recorder, + ) return podWorkers, processed } @@ -82,7 +83,7 @@ func TestUpdatePod(t *testing.T) { numPods := 20 for i := 0; i < numPods; i++ { for j := i; j < numPods; j++ { - podWorkers.UpdatePod(newPod(string(j), string(i)), func() {}) + podWorkers.UpdatePod(newPod(string(j), string(i)), false, func() {}) } } drainWorkers(podWorkers, numPods) @@ -115,7 +116,7 @@ func TestForgetNonExistingPodWorkers(t *testing.T) { numPods := 20 for i := 0; i < numPods; i++ { - podWorkers.UpdatePod(newPod(string(i), "name"), func() {}) + podWorkers.UpdatePod(newPod(string(i), "name"), false, func() {}) } drainWorkers(podWorkers, numPods) diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 4a4783dc1f..67cc41712a 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -104,7 +104,9 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error { return nil } glog.Infof("pod %q containers not running: syncing", pod.Name) - if err = kl.syncPod(&pod, dockerContainers); err != nil { + // We don't create mirror pods in this mode; pass a dummy boolean value + // to sycnPod. + if err = kl.syncPod(&pod, false, dockerContainers); err != nil { return fmt.Errorf("error syncing pod: %v", err) } if retry >= RunOnceMaxRetries { diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 996d295ef0..b494e870f1 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -38,6 +38,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy" "github.com/golang/glog" @@ -84,7 +85,7 @@ type HostInterface interface { GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) GetDockerVersion() ([]uint, error) GetMachineInfo() (*cadvisorApi.MachineInfo, error) - GetPods() ([]api.Pod, error) + GetPods() ([]api.Pod, util.StringSet, error) GetPodByName(namespace, name string) (*api.Pod, bool) GetPodStatus(name string, uid types.UID) (api.PodStatus, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) @@ -258,9 +259,9 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { } } -// handlePods returns a list of pod bounds to the Kubelet and their spec +// handlePods returns a list of pod bound to the Kubelet and their spec func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) { - pods, err := s.host.GetPods() + pods, _, err := s.host.GetPods() if err != nil { s.error(w, err) return diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 211aa57895..99d58b1a69 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -33,6 +33,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy" cadvisorApi "github.com/google/cadvisor/info/v1" @@ -44,7 +45,7 @@ type fakeKubelet struct { containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) machineInfoFunc func() (*cadvisorApi.MachineInfo, error) - podsFunc func() ([]api.Pod, error) + podsFunc func() ([]api.Pod, util.StringSet, error) logFunc func(w http.ResponseWriter, req *http.Request) runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) dockerVersionFunc func() ([]uint, error) @@ -79,7 +80,7 @@ func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) { return fk.machineInfoFunc() } -func (fk *fakeKubelet) GetPods() ([]api.Pod, error) { +func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet, error) { return fk.podsFunc() } diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index 8065c08c41..9ac0a127bd 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -18,11 +18,13 @@ package kubelet import ( "fmt" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) const ConfigSourceAnnotationKey = "kubernetes.io/config.source" +const ConfigMirrorAnnotationKey = "kubernetes.io/config.mirror" // PodOperation defines what changes will be made on a pod configuration. type PodOperation int @@ -49,6 +51,9 @@ const ( // Updates from all sources AllSource = "*" + // Used for ConfigMirrorAnnotationKey. + MirrorType = "mirror" + NamespaceDefault = api.NamespaceDefault ) @@ -67,7 +72,7 @@ type PodUpdate struct { Source string } -// GetPodFullName returns a name that uniquely identifies a pod across all config sources. +// GetPodFullName returns a name that uniquely identifies a pod. func GetPodFullName(pod *api.Pod) string { // Use underscore as the delimiter because it is not allowed in pod name // (DNS subdomain format), while allowed in the container name format. @@ -78,3 +83,12 @@ func GetPodFullName(pod *api.Pod) string { func BuildPodFullName(name, namespace string) string { return name + "_" + namespace } + +// Parse the pod full name. +func ParsePodFullName(podFullName string) (string, string, error) { + parts := strings.Split(podFullName, "_") + if len(parts) != 2 { + return "", "", fmt.Errorf("failed to parse the pod full name %q", podFullName) + } + return parts[0], parts[1], nil +}