Change 'this' varable to more sensible abbreviation

pull/6/head
Daniel Smith 2014-06-08 20:35:07 -07:00
parent e9b61b77cb
commit 62aba06180
1 changed files with 63 additions and 63 deletions

View File

@ -70,26 +70,26 @@ type Kubelet struct {
// Starts background goroutines. If file, manifest_url, or address are empty,
// they are not watched. Never returns.
func (sl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) {
func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) {
fileChannel := make(chan api.ContainerManifest)
etcdChannel := make(chan []api.ContainerManifest)
httpChannel := make(chan api.ContainerManifest)
serverChannel := make(chan api.ContainerManifest)
go util.Forever(func() { sl.WatchFile(file, fileChannel) }, 20*time.Second)
go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second)
if manifest_url != "" {
go util.Forever(func() { sl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second)
go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second)
}
if etcd_servers != "" {
servers := []string{etcd_servers}
log.Printf("Creating etcd client pointing to %v", servers)
sl.Client = etcd.NewClient(servers)
go util.Forever(func() { sl.SyncAndSetupEtcdWatch(etcdChannel) }, 20*time.Second)
kl.Client = etcd.NewClient(servers)
go util.Forever(func() { kl.SyncAndSetupEtcdWatch(etcdChannel) }, 20*time.Second)
}
if address != "" {
log.Printf("Starting to listen on %s:%d", address, port)
handler := KubeletServer{
Kubelet: sl,
Kubelet: kl,
UpdateChannel: serverChannel,
}
s := &http.Server{
@ -102,7 +102,7 @@ func (sl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string,
}
go util.Forever(func() { s.ListenAndServe() }, 0)
}
sl.RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel, sl)
kl.RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel, kl)
}
// Interface implemented by Kubelet, for testability
@ -111,8 +111,8 @@ type SyncHandler interface {
}
// Log an event to the etcd backend.
func (sl *Kubelet) LogEvent(event *api.Event) error {
if sl.Client == nil {
func (kl *Kubelet) LogEvent(event *api.Event) error {
if kl.Client == nil {
return fmt.Errorf("no etcd client connection.")
}
event.Timestamp = time.Now().Unix()
@ -122,7 +122,7 @@ func (sl *Kubelet) LogEvent(event *api.Event) error {
}
var response *etcd.Response
response, err = sl.Client.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */)
response, err = kl.Client.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */)
// TODO(bburns) : examine response here.
if err != nil {
log.Printf("Error writing event: %s\n", err)
@ -135,8 +135,8 @@ func (sl *Kubelet) LogEvent(event *api.Event) error {
// Does this container exist on this host? Returns true if so, and the name under which the container is running.
// Returns an error if one occurs.
func (sl *Kubelet) ContainerExists(manifest *api.ContainerManifest, container *api.Container) (exists bool, foundName string, err error) {
containers, err := sl.ListContainers()
func (kl *Kubelet) ContainerExists(manifest *api.ContainerManifest, container *api.Container) (exists bool, foundName string, err error) {
containers, err := kl.ListContainers()
if err != nil {
return false, "", err
}
@ -145,15 +145,15 @@ func (sl *Kubelet) ContainerExists(manifest *api.ContainerManifest, container *a
if manifestId == manifest.Id && containerName == container.Name {
// TODO(bburns) : This leads to an extra list. Convert this to use the returned ID and a straight call
// to inspect
data, err := sl.GetContainerByName(name)
data, err := kl.GetContainerByName(name)
return data != nil, name, err
}
}
return false, "", nil
}
func (sl *Kubelet) GetContainerID(name string) (string, error) {
containerList, err := sl.DockerClient.ListContainers(docker.ListContainersOptions{})
func (kl *Kubelet) GetContainerID(name string) (string, error) {
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return "", err
}
@ -167,17 +167,17 @@ func (sl *Kubelet) GetContainerID(name string) (string, error) {
// Get a container by name.
// returns the container data from Docker, or an error if one exists.
func (sl *Kubelet) GetContainerByName(name string) (*docker.Container, error) {
id, err := sl.GetContainerID(name)
func (kl *Kubelet) GetContainerByName(name string) (*docker.Container, error) {
id, err := kl.GetContainerID(name)
if err != nil {
return nil, err
}
return sl.DockerClient.InspectContainer(id)
return kl.DockerClient.InspectContainer(id)
}
func (sl *Kubelet) ListContainers() ([]string, error) {
func (kl *Kubelet) ListContainers() ([]string, error) {
result := []string{}
containerList, err := sl.DockerClient.ListContainers(docker.ListContainersOptions{})
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return result, err
}
@ -187,9 +187,9 @@ func (sl *Kubelet) ListContainers() ([]string, error) {
return result, err
}
func (sl *Kubelet) pullImage(image string) error {
sl.pullLock.Lock()
defer sl.pullLock.Unlock()
func (kl *Kubelet) pullImage(image string) error {
kl.pullLock.Lock()
defer kl.pullLock.Unlock()
cmd := exec.Command("docker", "pull", image)
err := cmd.Start()
if err != nil {
@ -236,8 +236,8 @@ func dockerNameToManifestAndContainer(name string) (manifestId, containerName st
return
}
func (sl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) {
err = sl.pullImage(container.Image)
func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) {
err = kl.pullImage(container.Image)
if err != nil {
return "", err
}
@ -289,24 +289,24 @@ func (sl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.
Cmd: cmdList,
},
}
dockerContainer, err := sl.DockerClient.CreateContainer(opts)
dockerContainer, err := kl.DockerClient.CreateContainer(opts)
if err != nil {
return "", err
}
return name, sl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{
return name, kl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{
PortBindings: portBindings,
Binds: binds,
})
}
func (sl *Kubelet) KillContainer(name string) error {
id, err := sl.GetContainerID(name)
func (kl *Kubelet) KillContainer(name string) error {
id, err := kl.GetContainerID(name)
if err != nil {
return err
}
err = sl.DockerClient.StopContainer(id, 10)
err = kl.DockerClient.StopContainer(id, 10)
manifestId, containerName := dockerNameToManifestAndContainer(name)
sl.LogEvent(&api.Event{
kl.LogEvent(&api.Event{
Event: "STOP",
Manifest: &api.ContainerManifest{
Id: manifestId,
@ -321,17 +321,17 @@ func (sl *Kubelet) KillContainer(name string) error {
// Watch a file for changes to the set of tasks that should run on this Kubelet
// This function loops forever and is intended to be run as a goroutine
func (sl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) {
func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) {
var lastData []byte
for {
time.Sleep(sl.FileCheckFrequency)
time.Sleep(kl.FileCheckFrequency)
var manifest api.ContainerManifest
data, err := ioutil.ReadFile(file)
if err != nil {
log.Printf("Couldn't read file: %s : %v", file, err)
continue
}
if err = sl.ExtractYAMLData(data, &manifest); err != nil {
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
continue
}
if !bytes.Equal(lastData, data) {
@ -346,13 +346,13 @@ func (sl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerMani
// Watch an HTTP endpoint for changes to the set of tasks that should run on this Kubelet
// This function runs forever and is intended to be run as a goroutine
func (sl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) {
func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) {
var lastData []byte
client := &http.Client{}
for {
time.Sleep(sl.HTTPCheckFrequency)
time.Sleep(kl.HTTPCheckFrequency)
var config api.ContainerManifest
data, err := sl.SyncHTTP(client, url, &config)
data, err := kl.SyncHTTP(client, url, &config)
log.Printf("Containers: %#v", config)
if err != nil {
log.Printf("Error syncing HTTP: %#v", err)
@ -369,7 +369,7 @@ func (sl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManif
// SyncHTTP reads from url a yaml manifest and populates config. Returns the
// raw bytes, if something was read. Returns an error if something goes wrong.
// 'client' is used to execute the request, to allow caching of clients.
func (sl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) {
func (kl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) {
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
@ -383,7 +383,7 @@ func (sl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.Contain
if err != nil {
return nil, err
}
if err = sl.ExtractYAMLData(body, &config); err != nil {
if err = kl.ExtractYAMLData(body, &config); err != nil {
return body, err
}
return body, nil
@ -391,17 +391,17 @@ func (sl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.Contain
// Take an etcd Response object, and turn it into a structured list of containers
// Return a list of containers, or an error if one occurs.
func (sl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
if response.Node == nil || len(response.Node.Value) == 0 {
return nil, fmt.Errorf("no nodes field: %#v", response)
}
var manifests []api.ContainerManifest
err := sl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
return manifests, err
}
func (sl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error {
response, err := sl.Client.Get(key+"/kubelet", true, false)
func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error {
response, err := kl.Client.Get(key+"/kubelet", true, false)
if err != nil {
log.Printf("Error on get on %s: %#v", key, err)
switch err.(type) {
@ -413,7 +413,7 @@ func (sl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []ap
}
return err
}
manifests, err := sl.ResponseToManifests(response)
manifests, err := kl.ResponseToManifests(response)
if err != nil {
log.Printf("Error parsing response (%#v): %s", response, err)
return err
@ -426,7 +426,7 @@ func (sl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []ap
// Sync with etcd, and set up an etcd watch for new configurations
// The channel to send new configurations across
// This function loops forever and is intended to be run in a go routine.
func (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) {
func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) {
hostname, err := exec.Command("hostname", "-f").Output()
if err != nil {
log.Printf("Couldn't determine hostname : %v", err)
@ -435,7 +435,7 @@ func (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerMan
key := "/registry/hosts/" + strings.TrimSpace(string(hostname))
// First fetch the initial configuration (watch only gives changes...)
for {
err = sl.getKubeletStateFromEtcd(key, changeChannel)
err = kl.getKubeletStateFromEtcd(key, changeChannel)
if err == nil {
// We got a successful response, etcd is up, set up the watch.
break
@ -444,23 +444,23 @@ func (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerMan
}
done := make(chan bool)
go util.Forever(func() { sl.TimeoutWatch(done) }, 0)
go util.Forever(func() { kl.TimeoutWatch(done) }, 0)
for {
// The etcd client will close the watch channel when it exits. So we need
// to create and service a new one every time.
watchChannel := make(chan *etcd.Response)
// We don't push this through Forever because if it dies, we just do it again in 30 secs.
// anyway.
go sl.WatchEtcd(watchChannel, changeChannel)
go kl.WatchEtcd(watchChannel, changeChannel)
sl.getKubeletStateFromEtcd(key, changeChannel)
kl.getKubeletStateFromEtcd(key, changeChannel)
log.Printf("Setting up a watch for configuration changes in etcd for %s", key)
sl.Client.Watch(key, 0, true, watchChannel, done)
kl.Client.Watch(key, 0, true, watchChannel, done)
}
}
// Timeout the watch after 30 seconds
func (sl *Kubelet) TimeoutWatch(done chan bool) {
func (kl *Kubelet) TimeoutWatch(done chan bool) {
t := time.Tick(30 * time.Second)
for _ = range t {
done <- true
@ -468,7 +468,7 @@ func (sl *Kubelet) TimeoutWatch(done chan bool) {
}
// Extract data from YAML file into a list of containers.
func (sl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
err := yaml.Unmarshal(buf, output)
if err != nil {
log.Printf("Couldn't unmarshal configuration: %v", err)
@ -479,7 +479,7 @@ func (sl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
// Watch etcd for changes, receives config objects from the etcd client watch.
// This function loops forever and is intended to be run as a goroutine.
func (sl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) {
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) {
defer util.HandleCrash()
for {
watchResponse := <-watchChannel
@ -498,7 +498,7 @@ func (sl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel c
}
log.Printf("Got data: %v", watchResponse.Node.Value)
var manifests []api.ContainerManifest
if err := sl.ExtractYAMLData([]byte(watchResponse.Node.Value), &manifests); err != nil {
if err := kl.ExtractYAMLData([]byte(watchResponse.Node.Value), &manifests); err != nil {
continue
}
// Ok, we have a valid configuration, send to channel for
@ -508,21 +508,21 @@ func (sl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel c
}
// Sync the configured list of containers (desired state) with the host current state
func (sl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
log.Printf("Desired:%#v", config)
var err error
desired := map[string]bool{}
for _, manifest := range config {
for _, element := range manifest.Containers {
var exists bool
exists, actualName, err := sl.ContainerExists(&manifest, &element)
exists, actualName, err := kl.ContainerExists(&manifest, &element)
if err != nil {
log.Printf("Error detecting container: %#v skipping.", err)
continue
}
if !exists {
log.Printf("%#v doesn't exist, creating", element)
actualName, err = sl.RunContainer(&manifest, &element)
actualName, err = kl.RunContainer(&manifest, &element)
// For some reason, list gives back names that start with '/'
actualName = "/" + actualName
@ -538,12 +538,12 @@ func (sl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
desired[actualName] = true
}
}
existingContainers, _ := sl.ListContainers()
existingContainers, _ := kl.ListContainers()
log.Printf("Existing:\n%#v Desired: %#v", existingContainers, desired)
for _, container := range existingContainers {
if !desired[container] {
log.Printf("Killing: %s", container)
err = sl.KillContainer(container)
err = kl.KillContainer(container)
if err != nil {
log.Printf("Error killing container: %#v", err)
}
@ -558,7 +558,7 @@ func (sl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds.
// Never returns.
func (sl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileChannel, serverChannel, httpChannel <-chan api.ContainerManifest, handler SyncHandler) {
func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileChannel, serverChannel, httpChannel <-chan api.ContainerManifest, handler SyncHandler) {
var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest
for {
select {
@ -574,7 +574,7 @@ func (sl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileC
case manifest := <-serverChannel:
log.Printf("Got new manifest from our server... %v", manifest)
lastServer = []api.ContainerManifest{manifest}
case <-time.After(sl.SyncFrequency):
case <-time.After(kl.SyncFrequency):
}
manifests := append([]api.ContainerManifest{}, lastFile...)
@ -588,8 +588,8 @@ func (sl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileC
}
}
func (sl *Kubelet) GetContainerInfo(name string) (string, error) {
info, err := sl.DockerClient.InspectContainer(name)
func (kl *Kubelet) GetContainerInfo(name string) (string, error) {
info, err := kl.DockerClient.InspectContainer(name)
if err != nil {
return "{}", err
}