diff --git a/services/meta/handler.go b/services/meta/handler.go index 63663088d1..e1515f964f 100644 --- a/services/meta/handler.go +++ b/services/meta/handler.go @@ -60,7 +60,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case "HEAD": h.WrapHandler("ping", h.servePing).ServeHTTP(w, r) case "GET": - h.WrapHandler("cache", h.serveCache).ServeHTTP(w, r) + h.WrapHandler("snapshot", h.serveSnapshot).ServeHTTP(w, r) case "POST": h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r) default: @@ -72,37 +72,25 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveExec(w http.ResponseWriter, r *http.Request) { } -// serveCache is a long polling http connection to server cache updates -func (h *Handler) serveCache(w http.ResponseWriter, r *http.Request) { +// serveSnapshot is a long polling http connection to server cache updates +func (h *Handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { // get the current index that client has - clientIndex, _ := strconv.Atoi(r.URL.Query().Get("index")) - index, c := h.store.Snapshot() + index, _ := strconv.Atoi(r.URL.Query().Get("index")) - // If the client has an older index, send the updated cache - if index > clientIndex { - w.Header().Add("influxdb-meta-index", strconv.Itoa(index)) - w.Write(c) + select { + case <-h.store.AfterIndex(index): + // Send updated snapshot to client. + ss, err := h.store.Snapshot() + if err != nil { + h.Logger.Println(err) + http.Error(w, "", http.StatusInternalServerError) + return + } + w.Write(ss) + case <-w.(http.CloseNotifier).CloseNotify(): + // Client closed the connection so we're done. return } - - // Make sure if the client disconnects we signal the query to abort - closing := make(chan struct{}) - if notifier, ok := w.(http.CloseNotifier); ok { - notify := notifier.CloseNotify() - go func() { - <-notify - close(closing) - }() - } - - // block until we get a data change, or an error which signals - // scenarios like leader change, store closing, etc. - h.store.WaitForDataChanged(closing) - - index, c = h.store.Snapshot() - w.Header().Add("influxdb-meta-index", strconv.Itoa(index)) - w.Write(c) - return } // servePing returns a simple response to let the client know the server is running. @@ -138,6 +126,10 @@ func (w gzipResponseWriter) Flush() { w.Writer.(*gzip.Writer).Flush() } +func (w gzipResponseWriter) CloseNotify() <-chan bool { + return w.ResponseWriter.(http.CloseNotifier).CloseNotify() +} + // determines if the client can accept compressed responses, and encodes accordingly func gzipFilter(inner http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/services/meta/store.go b/services/meta/store.go index 9300141f8d..3f1e82b809 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -1,13 +1,10 @@ package meta -import ( - "errors" - "sync" -) +import "sync" type store struct { - index int mu sync.RWMutex + index int closing chan struct{} cache []byte cacheChanged chan struct{} @@ -39,25 +36,24 @@ func (s *store) SetCache(b []byte) { s.cacheChanged = make(chan struct{}) } -func (s *store) Snapshot() (int, []byte) { +func (s *store) Snapshot() ([]byte, error) { s.mu.RLock() defer s.mu.RUnlock() - return s.index, s.cache + return s.cache, nil } -func (s *store) WaitForDataChanged(closing chan struct{}) error { +// AfterIndex returns a channel that will be closed to signal +// the caller when an updated snapshot is available. +func (s *store) AfterIndex(index int) chan struct{} { s.mu.RLock() - ch := s.cacheChanged - s.mu.RUnlock() + defer s.mu.RUnlock() - for { - select { - case <-ch: - return nil - case <-closing: - return errors.New("client closed") - case <-s.closing: - return errors.New("store closed") - } + if index < s.index { + // Client needs update so return a closed channel. + ch := make(chan struct{}) + close(ch) + return ch } + + return s.cacheChanged }