From 169c6a5dfaad0524799af74fe2bb54531c0c8785 Mon Sep 17 00:00:00 2001 From: David Norton <dgnorton@gmail.com> Date: Thu, 17 Dec 2015 07:53:10 -0500 Subject: [PATCH] store and handler to interface --- services/meta/handler.go | 44 +++++++++++++++++++++-------------- services/meta/service.go | 20 +++++++++------- services/meta/service_test.go | 23 ++++++++++-------- services/meta/store.go | 2 +- 4 files changed, 52 insertions(+), 37 deletions(-) diff --git a/services/meta/handler.go b/services/meta/handler.go index e1515f964f..950877d95e 100644 --- a/services/meta/handler.go +++ b/services/meta/handler.go @@ -15,31 +15,39 @@ import ( "github.com/influxdb/influxdb/uuid" ) -// Handler represents an HTTP handler for the InfluxDB server. -type Handler struct { +//type store interface { +// AfterIndex(index int) <-chan struct{} +// Database(name string) (*DatabaseInfo, error) +//} + +// handler represents an HTTP handler for the meta service. +type handler struct { config *Config Version string - Logger *log.Logger + logger *log.Logger loggingEnabled bool // Log every HTTP access. pprofEnabled bool - store *store + store interface { + AfterIndex(index int) <-chan struct{} + Snapshot() ([]byte, error) + SetCache(b []byte) + } } -// NewHandler returns a new instance of handler with routes. -func NewHandler(c *Config, s *store) *Handler { - h := &Handler{ +// newHandler returns a new instance of handler with routes. +func newHandler(c *Config) *handler { + h := &handler{ config: c, - Logger: log.New(os.Stderr, "[meta-http] ", log.LstdFlags), + logger: log.New(os.Stderr, "[meta-http] ", log.LstdFlags), loggingEnabled: c.LoggingEnabled, - store: s, } return h } // SetRoutes sets the provided routes on the handler. -func (h *Handler) WrapHandler(name string, hf http.HandlerFunc) http.Handler { +func (h *handler) WrapHandler(name string, hf http.HandlerFunc) http.Handler { var handler http.Handler handler = http.HandlerFunc(hf) handler = gzipFilter(handler) @@ -47,15 +55,15 @@ func (h *Handler) WrapHandler(name string, hf http.HandlerFunc) http.Handler { handler = cors(handler) handler = requestID(handler) if h.loggingEnabled { - handler = logging(handler, name, h.Logger) + handler = logging(handler, name, h.logger) } - handler = recovery(handler, name, h.Logger) // make sure recovery is always last + handler = recovery(handler, name, h.logger) // make sure recovery is always last return handler } // ServeHTTP responds to HTTP request to the handler. -func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "HEAD": h.WrapHandler("ping", h.servePing).ServeHTTP(w, r) @@ -69,11 +77,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // serveExec executes the requested command. -func (h *Handler) serveExec(w http.ResponseWriter, r *http.Request) { +func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { } // serveSnapshot is a long polling http connection to server cache updates -func (h *Handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { +func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { // get the current index that client has index, _ := strconv.Atoi(r.URL.Query().Get("index")) @@ -82,7 +90,7 @@ func (h *Handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { // Send updated snapshot to client. ss, err := h.store.Snapshot() if err != nil { - h.Logger.Println(err) + h.logger.Println(err) http.Error(w, "", http.StatusInternalServerError) return } @@ -94,7 +102,7 @@ func (h *Handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { } // servePing returns a simple response to let the client know the server is running. -func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { +func (h *handler) servePing(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ACK")) } @@ -147,7 +155,7 @@ func gzipFilter(inner http.Handler) http.Handler { // versionHeader takes a HTTP handler and returns a HTTP handler // and adds the X-INFLUXBD-VERSION header to outgoing responses. -func versionHeader(inner http.Handler, h *Handler) http.Handler { +func versionHeader(inner http.Handler, h *handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Add("X-InfluxDB-Version", h.Version) inner.ServeHTTP(w, r) diff --git a/services/meta/service.go b/services/meta/service.go index 8f62baf1b3..7771c6da56 100644 --- a/services/meta/service.go +++ b/services/meta/service.go @@ -12,7 +12,7 @@ import ( type Service struct { config *Config - Handler *Handler + handler *handler ln net.Listener raftAddr string httpAddr string @@ -20,9 +20,11 @@ type Service struct { cert string err chan error - store *store - Logger *log.Logger + + store interface { + Close() error + } } // NewService returns a new instance of Service. @@ -44,11 +46,13 @@ func (s *Service) Open() error { s.Logger.Println("Starting meta service") // Open the store - s.store = newStore(s.config) + store := newStore(s.config) + s.store = store - handler := NewHandler(s.config, s.store) - handler.Logger = s.Logger - s.Handler = handler + handler := newHandler(s.config) + handler.logger = s.Logger + handler.store = store + s.handler = handler // Open listener. if s.https { @@ -86,7 +90,7 @@ func (s *Service) Open() error { func (s *Service) serve() { // The listener was closed so exit // See https://github.com/golang/go/issues/4373 - err := http.Serve(s.ln, s.Handler) + err := http.Serve(s.ln, s.handler) if err != nil && !strings.Contains(err.Error(), "closed") { s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err) } diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 07e40954c7..b72427c974 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -1,4 +1,4 @@ -package meta_test +package meta import ( "fmt" @@ -6,13 +6,12 @@ import ( "net/http" "net/url" "testing" - - "github.com/influxdb/influxdb/services/meta" + "time" ) func TestService_Open(t *testing.T) { cfg := newConfig() - s := meta.NewService(cfg) + s := NewService(cfg) if err := s.Open(); err != nil { t.Fatal(err) } @@ -23,7 +22,7 @@ func TestService_Open(t *testing.T) { func TestService_PingEndpoint(t *testing.T) { cfg := newConfig() - s := meta.NewService(cfg) + s := NewService(cfg) if err := s.Open(); err != nil { t.Fatal(err) } @@ -51,7 +50,7 @@ func TestService_PingEndpoint(t *testing.T) { func TestService_LongPollCache(t *testing.T) { cfg := newConfig() - s := meta.NewService(cfg) + s := NewService(cfg) if err := s.Open(); err != nil { t.Fatal(err) } @@ -84,11 +83,15 @@ func TestService_LongPollCache(t *testing.T) { } go reqFunc(0) - go reqFunc(0) + go reqFunc(1) + go func() { + time.Sleep(1 * time.Second) + s.handler.store.SetCache([]byte("world")) + }() for n := 0; n < 2; n++ { b := <-ch - t.Log(b) + t.Log(string(b)) println("client read cache update") } close(ch) @@ -98,8 +101,8 @@ func TestService_LongPollCache(t *testing.T) { } } -func newConfig() *meta.Config { - cfg := meta.NewConfig() +func newConfig() *Config { + cfg := NewConfig() cfg.HTTPdBindAddress = "127.0.0.1:0" return cfg } diff --git a/services/meta/store.go b/services/meta/store.go index 3f1e82b809..504443ae04 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -44,7 +44,7 @@ func (s *store) Snapshot() ([]byte, error) { // AfterIndex returns a channel that will be closed to signal // the caller when an updated snapshot is available. -func (s *store) AfterIndex(index int) chan struct{} { +func (s *store) AfterIndex(index int) <-chan struct{} { s.mu.RLock() defer s.mu.RUnlock()