convert to AfterIndex

pull/5428/head
David Norton 2015-12-15 23:57:20 -05:00
parent 05da43d9f6
commit 9f93f0b84a
2 changed files with 35 additions and 47 deletions

View File

@ -60,7 +60,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case "HEAD":
h.WrapHandler("ping", h.servePing).ServeHTTP(w, r)
case "GET":
h.WrapHandler("cache", h.serveCache).ServeHTTP(w, r)
h.WrapHandler("snapshot", h.serveSnapshot).ServeHTTP(w, r)
case "POST":
h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r)
default:
@ -72,37 +72,25 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveExec(w http.ResponseWriter, r *http.Request) {
}
// serveCache is a long polling http connection to server cache updates
func (h *Handler) serveCache(w http.ResponseWriter, r *http.Request) {
// serveSnapshot is a long polling http connection to server cache updates
func (h *Handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
// get the current index that client has
clientIndex, _ := strconv.Atoi(r.URL.Query().Get("index"))
index, c := h.store.Snapshot()
index, _ := strconv.Atoi(r.URL.Query().Get("index"))
// If the client has an older index, send the updated cache
if index > clientIndex {
w.Header().Add("influxdb-meta-index", strconv.Itoa(index))
w.Write(c)
select {
case <-h.store.AfterIndex(index):
// Send updated snapshot to client.
ss, err := h.store.Snapshot()
if err != nil {
h.Logger.Println(err)
http.Error(w, "", http.StatusInternalServerError)
return
}
w.Write(ss)
case <-w.(http.CloseNotifier).CloseNotify():
// Client closed the connection so we're done.
return
}
// Make sure if the client disconnects we signal the query to abort
closing := make(chan struct{})
if notifier, ok := w.(http.CloseNotifier); ok {
notify := notifier.CloseNotify()
go func() {
<-notify
close(closing)
}()
}
// block until we get a data change, or an error which signals
// scenarios like leader change, store closing, etc.
h.store.WaitForDataChanged(closing)
index, c = h.store.Snapshot()
w.Header().Add("influxdb-meta-index", strconv.Itoa(index))
w.Write(c)
return
}
// servePing returns a simple response to let the client know the server is running.
@ -138,6 +126,10 @@ func (w gzipResponseWriter) Flush() {
w.Writer.(*gzip.Writer).Flush()
}
func (w gzipResponseWriter) CloseNotify() <-chan bool {
return w.ResponseWriter.(http.CloseNotifier).CloseNotify()
}
// determines if the client can accept compressed responses, and encodes accordingly
func gzipFilter(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View File

@ -1,13 +1,10 @@
package meta
import (
"errors"
"sync"
)
import "sync"
type store struct {
index int
mu sync.RWMutex
index int
closing chan struct{}
cache []byte
cacheChanged chan struct{}
@ -39,25 +36,24 @@ func (s *store) SetCache(b []byte) {
s.cacheChanged = make(chan struct{})
}
func (s *store) Snapshot() (int, []byte) {
func (s *store) Snapshot() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.index, s.cache
return s.cache, nil
}
func (s *store) WaitForDataChanged(closing chan struct{}) error {
// AfterIndex returns a channel that will be closed to signal
// the caller when an updated snapshot is available.
func (s *store) AfterIndex(index int) chan struct{} {
s.mu.RLock()
ch := s.cacheChanged
s.mu.RUnlock()
defer s.mu.RUnlock()
for {
select {
case <-ch:
return nil
case <-closing:
return errors.New("client closed")
case <-s.closing:
return errors.New("store closed")
}
if index < s.index {
// Client needs update so return a closed channel.
ch := make(chan struct{})
close(ch)
return ch
}
return s.cacheChanged
}