store and handler to interface

pull/5428/head
David Norton 2015-12-17 07:53:10 -05:00
parent 9f93f0b84a
commit 169c6a5dfa
4 changed files with 52 additions and 37 deletions

View File

@ -15,31 +15,39 @@ import (
"github.com/influxdb/influxdb/uuid"
)
// Handler represents an HTTP handler for the InfluxDB server.
type Handler struct {
//type store interface {
// AfterIndex(index int) <-chan struct{}
// Database(name string) (*DatabaseInfo, error)
//}
// handler represents an HTTP handler for the meta service.
type handler struct {
config *Config
Version string
Logger *log.Logger
logger *log.Logger
loggingEnabled bool // Log every HTTP access.
pprofEnabled bool
store *store
store interface {
AfterIndex(index int) <-chan struct{}
Snapshot() ([]byte, error)
SetCache(b []byte)
}
}
// NewHandler returns a new instance of handler with routes.
func NewHandler(c *Config, s *store) *Handler {
h := &Handler{
// newHandler returns a new instance of handler with routes.
func newHandler(c *Config) *handler {
h := &handler{
config: c,
Logger: log.New(os.Stderr, "[meta-http] ", log.LstdFlags),
logger: log.New(os.Stderr, "[meta-http] ", log.LstdFlags),
loggingEnabled: c.LoggingEnabled,
store: s,
}
return h
}
// SetRoutes sets the provided routes on the handler.
func (h *Handler) WrapHandler(name string, hf http.HandlerFunc) http.Handler {
func (h *handler) WrapHandler(name string, hf http.HandlerFunc) http.Handler {
var handler http.Handler
handler = http.HandlerFunc(hf)
handler = gzipFilter(handler)
@ -47,15 +55,15 @@ func (h *Handler) WrapHandler(name string, hf http.HandlerFunc) http.Handler {
handler = cors(handler)
handler = requestID(handler)
if h.loggingEnabled {
handler = logging(handler, name, h.Logger)
handler = logging(handler, name, h.logger)
}
handler = recovery(handler, name, h.Logger) // make sure recovery is always last
handler = recovery(handler, name, h.logger) // make sure recovery is always last
return handler
}
// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
h.WrapHandler("ping", h.servePing).ServeHTTP(w, r)
@ -69,11 +77,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// serveExec executes the requested command.
func (h *Handler) serveExec(w http.ResponseWriter, r *http.Request) {
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
}
// serveSnapshot is a long polling http connection to server cache updates
func (h *Handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
// get the current index that client has
index, _ := strconv.Atoi(r.URL.Query().Get("index"))
@ -82,7 +90,7 @@ func (h *Handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
// Send updated snapshot to client.
ss, err := h.store.Snapshot()
if err != nil {
h.Logger.Println(err)
h.logger.Println(err)
http.Error(w, "", http.StatusInternalServerError)
return
}
@ -94,7 +102,7 @@ func (h *Handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
}
// servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
func (h *handler) servePing(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ACK"))
}
@ -147,7 +155,7 @@ func gzipFilter(inner http.Handler) http.Handler {
// versionHeader takes a HTTP handler and returns a HTTP handler
// and adds the X-INFLUXBD-VERSION header to outgoing responses.
func versionHeader(inner http.Handler, h *Handler) http.Handler {
func versionHeader(inner http.Handler, h *handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("X-InfluxDB-Version", h.Version)
inner.ServeHTTP(w, r)

View File

@ -12,7 +12,7 @@ import (
type Service struct {
config *Config
Handler *Handler
handler *handler
ln net.Listener
raftAddr string
httpAddr string
@ -20,9 +20,11 @@ type Service struct {
cert string
err chan error
store *store
Logger *log.Logger
store interface {
Close() error
}
}
// NewService returns a new instance of Service.
@ -44,11 +46,13 @@ func (s *Service) Open() error {
s.Logger.Println("Starting meta service")
// Open the store
s.store = newStore(s.config)
store := newStore(s.config)
s.store = store
handler := NewHandler(s.config, s.store)
handler.Logger = s.Logger
s.Handler = handler
handler := newHandler(s.config)
handler.logger = s.Logger
handler.store = store
s.handler = handler
// Open listener.
if s.https {
@ -86,7 +90,7 @@ func (s *Service) Open() error {
func (s *Service) serve() {
// The listener was closed so exit
// See https://github.com/golang/go/issues/4373
err := http.Serve(s.ln, s.Handler)
err := http.Serve(s.ln, s.handler)
if err != nil && !strings.Contains(err.Error(), "closed") {
s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
}

View File

@ -1,4 +1,4 @@
package meta_test
package meta
import (
"fmt"
@ -6,13 +6,12 @@ import (
"net/http"
"net/url"
"testing"
"github.com/influxdb/influxdb/services/meta"
"time"
)
func TestService_Open(t *testing.T) {
cfg := newConfig()
s := meta.NewService(cfg)
s := NewService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
}
@ -23,7 +22,7 @@ func TestService_Open(t *testing.T) {
func TestService_PingEndpoint(t *testing.T) {
cfg := newConfig()
s := meta.NewService(cfg)
s := NewService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
}
@ -51,7 +50,7 @@ func TestService_PingEndpoint(t *testing.T) {
func TestService_LongPollCache(t *testing.T) {
cfg := newConfig()
s := meta.NewService(cfg)
s := NewService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
}
@ -84,11 +83,15 @@ func TestService_LongPollCache(t *testing.T) {
}
go reqFunc(0)
go reqFunc(0)
go reqFunc(1)
go func() {
time.Sleep(1 * time.Second)
s.handler.store.SetCache([]byte("world"))
}()
for n := 0; n < 2; n++ {
b := <-ch
t.Log(b)
t.Log(string(b))
println("client read cache update")
}
close(ch)
@ -98,8 +101,8 @@ func TestService_LongPollCache(t *testing.T) {
}
}
func newConfig() *meta.Config {
cfg := meta.NewConfig()
func newConfig() *Config {
cfg := NewConfig()
cfg.HTTPdBindAddress = "127.0.0.1:0"
return cfg
}

View File

@ -44,7 +44,7 @@ func (s *store) Snapshot() ([]byte, error) {
// AfterIndex returns a channel that will be closed to signal
// the caller when an updated snapshot is available.
func (s *store) AfterIndex(index int) chan struct{} {
func (s *store) AfterIndex(index int) <-chan struct{} {
s.mu.RLock()
defer s.mu.RUnlock()