Refactor kubelet server into its own package

Refactor Kubelet's server functionality into a server package. Most
notably, move pkg/kubelet/server.go into
pkg/kubelet/server/server.go. This will lead to better separation of
concerns and a more readable code hierarchy.
pull/6/head
Tim St. Clair 2015-12-10 12:14:26 -08:00
parent 114f6f76dc
commit 89bc7992f9
12 changed files with 55 additions and 36 deletions

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/server"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util"
@ -162,7 +163,7 @@ type KubeletServer struct {
type KubeletBootstrap interface {
BirthCry()
StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool)
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool)
ListenAndServeReadOnly(address net.IP, port uint)
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]kubelet.RunPodResult, error)
@ -572,8 +573,8 @@ func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
}
// InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
// certificate and key file are generated. Returns a configured kubelet.TLSOptions object.
func (s *KubeletServer) InitializeTLS() (*kubelet.TLSOptions, error) {
// certificate and key file are generated. Returns a configured server.TLSOptions object.
func (s *KubeletServer) InitializeTLS() (*server.TLSOptions, error) {
if s.TLSCertFile == "" && s.TLSPrivateKeyFile == "" {
s.TLSCertFile = path.Join(s.CertDirectory, "kubelet.crt")
s.TLSPrivateKeyFile = path.Join(s.CertDirectory, "kubelet.key")
@ -582,7 +583,7 @@ func (s *KubeletServer) InitializeTLS() (*kubelet.TLSOptions, error) {
}
glog.V(4).Infof("Using self-signed cert (%s, %s)", s.TLSCertFile, s.TLSPrivateKeyFile)
}
tlsOptions := &kubelet.TLSOptions{
tlsOptions := &server.TLSOptions{
Config: &tls.Config{
// Change default from SSLv3 to TLSv1.0 (because of POODLE vulnerability).
MinVersion: tls.VersionTLS10,
@ -694,7 +695,7 @@ func SimpleKubelet(client *client.Client,
readOnlyPort uint,
masterServiceNamespace string,
volumePlugins []volume.VolumePlugin,
tlsOptions *kubelet.TLSOptions,
tlsOptions *server.TLSOptions,
cadvisorInterface cadvisor.Interface,
configFilePath string,
cloud cloudprovider.Interface,
@ -884,7 +885,7 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
type KubeletConfig struct {
Address net.IP
AllowPrivileged bool
Auth kubelet.AuthInterface
Auth server.AuthInterface
Builder KubeletBuilder
CAdvisorInterface cadvisor.Interface
CgroupRoot string
@ -955,7 +956,7 @@ type KubeletConfig struct {
StreamingConnectionIdleTimeout time.Duration
SyncFrequency time.Duration
SystemContainer string
TLSOptions *kubelet.TLSOptions
TLSOptions *server.TLSOptions
Writer io.Writer
VolumePlugins []volume.VolumePlugin

View File

@ -29,7 +29,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/httpstream"
)

View File

@ -32,7 +32,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
"k8s.io/kubernetes/pkg/kubelet"
kubeletserver "k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/httpstream"
)
@ -204,7 +204,7 @@ func TestGetListener(t *testing.T) {
}
// fakePortForwarder simulates port forwarding for testing. It implements
// kubelet.PortForwarder.
// kubeletserver.PortForwarder.
type fakePortForwarder struct {
lock sync.Mutex
// stores data expected from the stream per port
@ -215,7 +215,7 @@ type fakePortForwarder struct {
send map[uint16]string
}
var _ kubelet.PortForwarder = &fakePortForwarder{}
var _ kubeletserver.PortForwarder = &fakePortForwarder{}
func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
defer stream.Close()
@ -250,7 +250,7 @@ func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedF
received: make(map[uint16]string),
send: serverSends,
}
kubelet.ServePortForward(w, req, pf, "pod", "uid", 0, 10*time.Second)
kubeletserver.ServePortForward(w, req, pf, "pod", "uid", 0, 10*time.Second)
for port, expected := range expectedFromClient {
actual, ok := pf.received[port]

View File

@ -47,6 +47,10 @@ var (
// Required Image is absent on host and PullPolicy is NeverPullImage
ErrImageNeverPull = errors.New("ErrImageNeverPull")
// ErrContainerNotFound returned when a container in the given pod with the
// given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors.New("no matching container")
// Get http error when pulling image from registry
RegistryUnavailable = errors.New("RegistryUnavailable")
)

View File

@ -58,6 +58,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
@ -127,12 +128,6 @@ const (
backOffPeriod = time.Second * 10
)
var (
// ErrContainerNotFound returned when a container in the given pod with the
// given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors.New("no matching container")
)
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*api.Pod)
@ -3215,7 +3210,7 @@ func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, contai
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
container := pod.FindContainerByName(containerName)
if container == nil {
return nil, ErrContainerNotFound
return nil, kubecontainer.ErrContainerNotFound
}
ci, err := kl.cadvisor.DockerContainer(container.ID.ID, req)
@ -3252,12 +3247,12 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
return kl.machineInfo, nil
}
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *TLSOptions, auth AuthInterface, enableDebuggingHandlers bool) {
ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers)
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
server.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers)
}
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
ListenAndServeKubeletReadOnlyServer(kl, address, port)
server.ListenAndServeKubeletReadOnlyServer(kl, address, port)
}
// GetRuntime returns the current Runtime implementation in use by the kubelet. This func

View File

@ -778,8 +778,8 @@ func TestGetContainerInfoWithNoContainers(t *testing.T) {
if err == nil {
t.Errorf("expected error from cadvisor client, got none")
}
if err != ErrContainerNotFound {
t.Errorf("expected error %v, got %v", ErrContainerNotFound.Error(), err.Error())
if err != kubecontainer.ErrContainerNotFound {
t.Errorf("expected error %v, got %v", kubecontainer.ErrContainerNotFound.Error(), err.Error())
}
if stats != nil {
t.Errorf("non-nil stats when dockertools returned no containers")
@ -808,8 +808,8 @@ func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
if err == nil {
t.Errorf("Expected error from cadvisor client, got none")
}
if err != ErrContainerNotFound {
t.Errorf("Expected error %v, got %v", ErrContainerNotFound.Error(), err.Error())
if err != kubecontainer.ErrContainerNotFound {
t.Errorf("Expected error %v, got %v", kubecontainer.ErrContainerNotFound.Error(), err.Error())
}
if stats != nil {
t.Errorf("non-nil stats when dockertools returned no containers")

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package server
import (
"k8s.io/kubernetes/pkg/auth/authenticator"

18
pkg/kubelet/server/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package server contains functions related to serving Kubelet's external interface.
package server

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package server
import (
"crypto/tls"
@ -48,7 +48,7 @@ import (
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/httplog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flushwriter"
@ -1146,7 +1146,7 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
switch err {
case nil:
break
case ErrContainerNotFound:
case kubecontainer.ErrContainerNotFound:
http.Error(w, err.Error(), http.StatusNotFound)
return
default:

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package server
import (
"bytes"
@ -38,6 +38,7 @@ import (
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/auth/user"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/httpstream"
@ -271,7 +272,7 @@ func TestContainerNotFound(t *testing.T) {
expectedContainerName := "slowstartcontainer"
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
return nil, ErrContainerNotFound
return nil, kubecontainer.ErrContainerNotFound
}
resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName))
if err != nil {

View File

@ -34,8 +34,8 @@ import (
"github.com/prometheus/common/model"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
@ -150,7 +150,7 @@ func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nod
// getContainerInfo contacts kubelet for the container information. The "Stats"
// in the returned ContainerInfo is subject to the requirements in statsRequest.
func getContainerInfo(c *client.Client, nodeName string, req *kubelet.StatsRequest) (map[string]cadvisorapi.ContainerInfo, error) {
func getContainerInfo(c *client.Client, nodeName string, req *server.StatsRequest) (map[string]cadvisorapi.ContainerInfo, error) {
reqBody, err := json.Marshal(req)
if err != nil {
return nil, err
@ -243,7 +243,7 @@ func getOneTimeResourceUsageOnNode(
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
}
// Get information of all containers on the node.
containerInfos, err := getContainerInfo(c, nodeName, &kubelet.StatsRequest{
containerInfos, err := getContainerInfo(c, nodeName, &server.StatsRequest{
ContainerName: "/",
NumStats: numStats,
Subcontainers: true,
@ -590,7 +590,7 @@ func (r *resourceCollector) Stop() {
// collectStats gets the latest stats from kubelet's /stats/container, computes
// the resource usage, and pushes it to the buffer.
func (r *resourceCollector) collectStats(oldStats map[string]*cadvisorapi.ContainerStats) {
infos, err := getContainerInfo(r.client, r.node, &kubelet.StatsRequest{
infos, err := getContainerInfo(r.client, r.node, &server.StatsRequest{
ContainerName: "/",
NumStats: 1,
Subcontainers: true,