Track the sources that the kubelet has seen, and only delete pods

when every source has been seen at least once.
pull/6/head
Brendan Burns 2014-12-16 21:11:27 -08:00
parent bb28949291
commit 7da0378f3c
14 changed files with 191 additions and 74 deletions

View File

@ -53,6 +53,10 @@ type PodConfig struct {
// the channel of denormalized changes passed to listeners
updates chan kubelet.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources util.StringSet
}
// NewPodConfig creates an object that can merge many configuration sources into a stream
@ -64,6 +68,7 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: util.StringSet{},
}
return podConfig
}
@ -71,9 +76,22 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}
// SeenAllSources returns true if this config has received a SET
// message from all configured sources, false otherwise.
func (c *PodConfig) SeenAllSources() bool {
if c.pods == nil {
return false
}
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen)
return c.pods.seenSources(c.sources.List()...)
}
// Updates returns a channel of updates to the configuration, properly denormalized.
func (c *PodConfig) Updates() <-chan kubelet.PodUpdate {
return c.updates
@ -98,6 +116,10 @@ type podStorage struct {
// on the updates channel
updateLock sync.Mutex
updates chan<- kubelet.PodUpdate
// contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.Mutex
sourcesSeen util.StringSet
}
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
@ -105,9 +127,10 @@ type podStorage struct {
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage {
return &podStorage{
pods: make(map[string]map[string]*api.BoundPod),
mode: mode,
updates: updates,
pods: make(map[string]map[string]*api.BoundPod),
mode: mode,
updates: updates,
sourcesSeen: util.StringSet{},
}
}
@ -138,12 +161,12 @@ func (s *podStorage) Merge(source string, change interface{}) error {
s.updates <- *updates
}
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source}
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source}
}
default:
@ -212,6 +235,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
case kubelet.SET:
glog.V(4).Infof("Setting pods for source %s : %v", source, update)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[string]*api.BoundPod)
@ -254,6 +278,18 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
return adds, updates, deletes
}
func (s *podStorage) markSourceSet(source string) {
s.sourcesSeenLock.Lock()
defer s.sourcesSeenLock.Unlock()
s.sourcesSeen.Insert(source)
}
func (s *podStorage) seenSources(sources ...string) bool {
s.sourcesSeenLock.Lock()
defer s.sourcesSeenLock.Unlock()
return s.sourcesSeen.HasAll(sources...)
}
func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) {
names := util.StringSet{}
for i := range pods {
@ -280,7 +316,7 @@ func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.Boun
func (s *podStorage) Sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, kubelet.AllSource}
}
// Object implements config.Accessor

View File

@ -25,6 +25,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
)
const (
NoneSource = ""
TestSource = "test"
)
func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
select {
case update := <-ch:
@ -58,23 +63,23 @@ func CreateValidPod(name, namespace, source string) api.BoundPod {
}
}
func CreatePodUpdate(op kubelet.PodOperation, pods ...api.BoundPod) kubelet.PodUpdate {
func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.BoundPod) kubelet.PodUpdate {
// We deliberately return an empty slice instead of a nil pointer here
// because reflect.DeepEqual differentiates between the two and we need to
// pick one for consistency.
newPods := make([]api.BoundPod, len(pods))
if len(pods) == 0 {
return kubelet.PodUpdate{newPods, op}
return kubelet.PodUpdate{newPods, op, source}
}
for i := range pods {
newPods[i] = pods[i]
}
return kubelet.PodUpdate{newPods, op}
return kubelet.PodUpdate{newPods, op, source}
}
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) {
config := NewPodConfig(mode)
channel := config.Channel("test")
channel := config.Channel(TestSource)
ch := config.Updates()
return channel, ch, config
}
@ -102,63 +107,63 @@ func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))
}
func TestNewPodAddedInvalidNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "", ""))
channel <- podUpdate
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource))
}
func TestNewPodAddedDefaultNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test")))
}
func TestNewPodAddedDifferentNamespaces(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test")))
// see an update in another namespace
podUpdate = CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test")))
}
func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))
// add an invalid update
podUpdate = CreatePodUpdate(kubelet.UPDATE, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
podUpdate = CreatePodUpdate(kubelet.UPDATE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
channel <- podUpdate
expectNoPodUpdate(t, ch)
}
@ -167,45 +172,45 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))
// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
channel <- CreatePodUpdate(kubelet.ADD, NoneSource, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
}
func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))
// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod))
channel <- CreatePodUpdate(kubelet.ADD, NoneSource, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, pod))
}
func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))
// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)
@ -213,22 +218,22 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
// an kubelet.ADD should be converted to kubelet.UPDATE
pod := CreateValidPod("foo", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.ADD, pod)
podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, pod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
podUpdate = CreatePodUpdate(kubelet.REMOVE, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}})
podUpdate = CreatePodUpdate(kubelet.REMOVE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}})
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, NoneSource, pod))
}
func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test")))
// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)
@ -236,10 +241,10 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
// should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE
pod := CreateValidPod("foo2", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test"))
podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test"))
channel <- podUpdate
expectPodUpdate(t, ch,
CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "new", "test")),
CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "new", "test")),
CreatePodUpdate(kubelet.UPDATE, pod))
CreatePodUpdate(kubelet.REMOVE, NoneSource, CreateValidPod("foo", "new", "test")),
CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo4", "new", "test")),
CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
}

View File

@ -79,7 +79,7 @@ func (s *sourceEtcd) run() {
}
glog.V(4).Infof("Received state from etcd watch: %+v", pods)
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.EtcdSource}
}
}
}

View File

@ -73,14 +73,14 @@ func (s *sourceFile) extractFromPath() error {
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.FileSource}
case statInfo.Mode().IsRegular():
pod, err := extractFromFile(path)
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.FileSource}
default:
return fmt.Errorf("path is not a directory or file")

View File

@ -119,7 +119,7 @@ func TestReadFromFile(t *testing.T) {
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, api.BoundPod{
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: simpleSubdomainSafeHash(file.Name()),
UID: simpleSubdomainSafeHash(file.Name()),
@ -170,7 +170,7 @@ func TestExtractFromValidDataFile(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, expectedPod)
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, expectedPod)
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
@ -191,7 +191,7 @@ func TestExtractFromEmptyDir(t *testing.T) {
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET)
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource)
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
@ -239,7 +239,7 @@ func TestExtractFromDir(t *testing.T) {
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, pods...)
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, pods...)
sort.Sort(sortedPods(update.Pods))
sort.Sort(sortedPods(expected.Pods))
if !reflect.DeepEqual(expected, update) {

View File

@ -97,7 +97,7 @@ func (s *sourceURL) extractFromURL() error {
if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault
}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.HTTPSource}
return nil
}
@ -138,7 +138,7 @@ func (s *sourceURL) extractFromURL() error {
pod.Namespace = api.NamespaceDefault
}
}
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET}
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.HTTPSource}
return nil
}

View File

@ -124,6 +124,7 @@ func TestExtractFromHTTP(t *testing.T) {
desc: "Single manifest",
manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo"},
expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
@ -141,6 +142,7 @@ func TestExtractFromHTTP(t *testing.T) {
{Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}},
},
expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: "1",
@ -169,7 +171,7 @@ func TestExtractFromHTTP(t *testing.T) {
{
desc: "Empty Array",
manifests: []api.ContainerManifest{},
expected: CreatePodUpdate(kubelet.SET),
expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource),
},
}
for _, testCase := range testCases {

View File

@ -76,7 +76,7 @@ func TestGetContainerID(t *testing.T) {
t.Errorf("Failed to find container %#v", dockerContainer)
}
fakeDocker.clearCalls()
fakeDocker.ClearCalls()
dockerContainer, found, _ = dockerContainers.FindPodContainer("foobar", "", "foo")
verifyCalls(t, fakeDocker, []string{})
if dockerContainer != nil || found {

View File

@ -40,10 +40,14 @@ type FakeDockerClient struct {
VersionInfo docker.Env
}
func (f *FakeDockerClient) clearCalls() {
func (f *FakeDockerClient) ClearCalls() {
f.Lock()
defer f.Unlock()
f.called = []string{}
f.Stopped = []string{}
f.pulled = []string{}
f.Created = []string{}
f.Removed = []string{}
}
func (f *FakeDockerClient) AssertCalls(calls []string) (err error) {

View File

@ -53,6 +53,8 @@ type SyncHandler interface {
SyncPods([]api.BoundPod) error
}
type SourcesReadyFn func() bool
type volumeMap map[string]volume.Interface
// New creates a new Kubelet for use in main
@ -66,7 +68,8 @@ func NewMainKubelet(
pullQPS float32,
pullBurst int,
minimumGCAge time.Duration,
maxContainerCount int) *Kubelet {
maxContainerCount int,
sourcesReady SourcesReadyFn) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
@ -82,6 +85,7 @@ func NewMainKubelet(
pullBurst: pullBurst,
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
sourcesReady: sourcesReady,
}
}
@ -112,6 +116,7 @@ type Kubelet struct {
podWorkers *podWorkers
resyncInterval time.Duration
pods []api.BoundPod
sourcesReady SourcesReadyFn
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
@ -907,7 +912,12 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
}
})
}
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return nil
}
// Kill any containers we don't need.
for _, container := range dockerContainers {
// Don't kill containers that are in the desired pods.

View File

@ -55,6 +55,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools
kubelet.etcdClient = fakeEtcdClient
kubelet.rootDirectory = "/tmp/kubelet"
kubelet.podWorkers = newPodWorkers()
kubelet.sourcesReady = func() bool { return true }
return kubelet, fakeEtcdClient, fakeDocker
}
@ -513,6 +514,49 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) {
fakeDocker.Unlock()
}
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.sourcesReady = func() bool { return ready }
fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_foo_bar.new.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"})
// A map iteration is used to delete containers, so must not depend on
// order here.
expectedToStop := map[string]bool{
"1234": true,
"9876": true,
}
if len(fakeDocker.Stopped) != 2 ||
!expectedToStop[fakeDocker.Stopped[0]] ||
!expectedToStop[fakeDocker.Stopped[1]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
}
}
func TestSyncPodsDeletes(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{

View File

@ -59,6 +59,7 @@ func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{},
WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20,
}
updates <- PodUpdate{[]api.BoundPod{}, SET, ServerSource}
glog.Fatal(s.ListenAndServe())
}
@ -143,7 +144,7 @@ func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) {
if pod.UID == "" {
pod.UID = "1"
}
s.updates <- PodUpdate{[]api.BoundPod{pod}, SET}
s.updates <- PodUpdate{[]api.BoundPod{pod}, SET, ServerSource}
}
@ -166,8 +167,7 @@ func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) {
pods[i].Name = fmt.Sprintf("%d", i+1)
pods[i].Spec = specs[i]
}
s.updates <- PodUpdate{pods, SET}
s.updates <- PodUpdate{pods, SET, ServerSource}
}
// handleContainerLogs handles containerLogs request against the Kubelet

View File

@ -36,6 +36,18 @@ const (
REMOVE
// Pods with the given ids have been updated in this source
UPDATE
// These constants identify the sources of pods
// Updates from a file
FileSource = "file"
// Updates from etcd
EtcdSource = "etcd"
// Updates from querying a web page
HTTPSource = "http"
// Updates received to the kubelet server
ServerSource = "server"
// Updates from all sources
AllSource = "*"
)
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
@ -48,8 +60,9 @@ const (
// functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated.
type PodUpdate struct {
Pods []api.BoundPod
Op PodOperation
Pods []api.BoundPod
Op PodOperation
Source string
}
// GetPodFullName returns a name that uniquely identifies a pod across all config sources.

View File

@ -176,7 +176,7 @@ func RunKubelet(kcfg *KubeletConfig) {
}
cfg := makePodSourceConfig(kcfg)
k := createAndInitKubelet(kcfg)
k := createAndInitKubelet(kcfg, cfg)
// process pods and exit.
if kcfg.Runonce {
if _, err := k.RunOnce(cfg.Updates()); err != nil {
@ -194,7 +194,7 @@ func startKubelet(k *kubelet.Kubelet, cfg *config.PodConfig, kc *KubeletConfig)
// start the kubelet server
if kc.EnableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
kubelet.ListenAndServeKubeletServer(k, cfg.Channel(kubelet.ServerSource), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
}, 0)
}
}
@ -205,17 +205,19 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// define file config source
if kc.ConfigFile != "" {
config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel("file"))
glog.Infof("Adding manifest file: %v", kc.ConfigFile)
config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
}
// define url config source
if kc.ManifestURL != "" {
config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel("http"))
glog.Infof("Adding manifest url: %v", kc.ManifestURL)
config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel(kubelet.HTTPSource))
}
if kc.EtcdClient != nil {
glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster())
config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel("etcd"))
config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource))
}
return cfg
}
@ -247,7 +249,7 @@ type KubeletConfig struct {
Runonce bool
}
func createAndInitKubelet(kc *KubeletConfig) *kubelet.Kubelet {
func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) *kubelet.Kubelet {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
@ -261,7 +263,8 @@ func createAndInitKubelet(kc *KubeletConfig) *kubelet.Kubelet {
float32(kc.RegistryPullQPS),
kc.RegistryBurst,
kc.MinimumGCAge,
kc.MaxContainerCount)
kc.MaxContainerCount,
pc.SeenAllSources)
k.BirthCry()