Add healthz server support to healthcheck package

pull/6/head
Zihong Zheng 2017-05-05 14:42:27 -07:00
parent 8f6df26755
commit eed08362d8
2 changed files with 142 additions and 2 deletions

View File

@ -22,6 +22,8 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
"github.com/renstrom/dedent"
@ -29,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/clock"
"k8s.io/kubernetes/pkg/api"
)
@ -233,3 +236,92 @@ func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) erro
}
return nil
}
// HealthzUpdater allows callers to update healthz timestamp only.
type HealthzUpdater interface {
UpdateTimestamp()
}
// HealthzServer returns 200 "OK" by default. Once timestamp has been
// updated, it verifies we don't exceed max no respond duration since
// last update.
type HealthzServer struct {
listener Listener
httpFactory HTTPServerFactory
clock clock.Clock
addr string
port int32
healthTimeout time.Duration
lastUpdated atomic.Value
}
// NewDefaultHealthzServer returns a default healthz http server.
func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer {
return newHealthzServer(nil, nil, nil, addr, healthTimeout)
}
func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer {
if listener == nil {
listener = stdNetListener{}
}
if httpServerFactory == nil {
httpServerFactory = stdHTTPServerFactory{}
}
if c == nil {
c = clock.RealClock{}
}
return &HealthzServer{
listener: listener,
httpFactory: httpServerFactory,
clock: c,
addr: addr,
healthTimeout: healthTimeout,
}
}
// UpdateTimestamp updates the lastUpdated timestamp.
func (hs *HealthzServer) UpdateTimestamp() {
hs.lastUpdated.Store(hs.clock.Now())
}
// Run starts the healthz http server and returns.
func (hs *HealthzServer) Run() {
serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs})
server := hs.httpFactory.New(hs.addr, serveMux)
listener, err := hs.listener.Listen(hs.addr)
if err != nil {
glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err)
return
}
go func() {
glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)
if err := server.Serve(listener); err != nil {
glog.Errorf("Healhz closed: %v", err)
return
}
glog.Errorf("Unexpected healhz closed.")
}()
}
type healthzHandler struct {
hs *HealthzServer
}
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
lastUpdated := time.Time{}
if val := h.hs.lastUpdated.Load(); val != nil {
lastUpdated = val.(time.Time)
}
currentTime := h.hs.clock.Now()
resp.Header().Set("Content-Type", "application/json")
if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
}
fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
}

View File

@ -22,11 +22,13 @@ import (
"net/http"
"net/http/httptest"
"testing"
"github.com/davecgh/go-spew/spew"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/clock"
"github.com/davecgh/go-spew/spew"
)
type fakeListener struct {
@ -108,6 +110,11 @@ type hcPayload struct {
LocalEndpoints int
}
type healthzPayload struct {
LastUpdated string
CurrentTime string
}
func TestServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
@ -355,3 +362,44 @@ func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints in
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
}
}
func TestHealthzServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
fakeClock := clock.NewFakeClock(time.Now())
hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
// Should return 200 "OK" by default.
testHealthzHandler(server, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max no respond duration.
hs.UpdateTimestamp()
fakeClock.Step(25 * time.Second)
testHealthzHandler(server, http.StatusServiceUnavailable, t)
// Should return 200 "OK" if timestamp is valid.
hs.UpdateTimestamp()
fakeClock.Step(5 * time.Second)
testHealthzHandler(server, http.StatusOK, t)
}
func testHealthzHandler(server HTTPServer, status int, t *testing.T) {
handler := server.(*fakeHTTPServer).handler
req, err := http.NewRequest("GET", "/healthz", nil)
if err != nil {
t.Fatal(err)
}
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
if resp.Code != status {
t.Errorf("expected status code %v, got %v", status, resp.Code)
}
var payload healthzPayload
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
t.Fatal(err)
}
}