Merge pull request #1656 from influxdb/http-index

Basic index endpoints for http
pull/1658/head
Cory LaNou 2015-02-19 19:08:28 -07:00
commit 862d06f201
2 changed files with 146 additions and 0 deletions

View File

@ -99,6 +99,14 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string)
"process_continuous_queries",
"POST", "/process_continuous_queries", false, false, h.serveProcessContinuousQueries,
},
route{
"index-json", // Query serving route.
"GET", "/index.json", true, true, h.serveIndex,
},
route{
"index", // Query serving route.
"GET", "/", true, true, h.serveIndex,
},
)
for _, r := range h.routes {
@ -253,6 +261,74 @@ func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// serveIndex returns the current index of the node as the body of the response (optionally in json)
// Takes optional parameters:
// index - If specified, will poll for index before returning
// timeout - time in milliseconds to wait until index is met before erring out
// default timeout if not specified is 100 milliseconds
func (h *Handler) serveIndex(w http.ResponseWriter, r *http.Request) {
index, _ := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64)
timeout, _ := strconv.Atoi(r.URL.Query().Get("timeout"))
if index > 0 {
var d time.Duration
if timeout == 0 {
d = 100 * time.Millisecond
} else {
d = time.Duration(timeout) * time.Millisecond
}
err := h.pollForIndex(index, d)
if err != nil {
w.WriteHeader(http.StatusRequestTimeout)
return
}
}
if !strings.HasSuffix(strings.ToLower(r.URL.Path), ".json") {
w.Write([]byte(fmt.Sprintf("%d", h.server.Index())))
return
}
w.Header().Add("content-type", "application/json")
pretty := r.URL.Query().Get("pretty") == "true"
data := struct {
Index uint64 `json:"index"`
}{
Index: h.server.Index(),
}
var b []byte
if pretty {
b, _ = json.MarshalIndent(data, "", " ")
} else {
b, _ = json.Marshal(data)
}
w.Write(b)
}
// pollForIndex will poll until either the index is met or it times out
// timeout is in milliseconds
func (h *Handler) pollForIndex(index uint64, timeout time.Duration) error {
done := make(chan struct{})
go func() {
for {
if h.server.Index() >= index {
done <- struct{}{}
}
time.Sleep(10 * time.Millisecond)
}
}()
for {
select {
case <-done:
return nil
case <-time.Tick(timeout):
return fmt.Errorf("timed out")
}
}
}
// serveDataNodes returns a list of all data nodes in the cluster.
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) {
// Generate a list of objects for encoding to the API.

View File

@ -536,6 +536,76 @@ func TestHandler_GzipDisabled(t *testing.T) {
}
}
func TestHandler_Index(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("GET", s.URL, nil, nil, "")
if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}
if body != "1" {
t.Fatalf("unexpected body. expected %q, actual %q", "1", body)
}
}
func TestHandler_IndexJson(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
s := NewHTTPServer(srvr)
defer s.Close()
status, body := MustHTTP("GET", s.URL+`/index.json`, nil, nil, "")
if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}
var data = struct {
Id uint64 `json:"index"`
}{}
if err := json.Unmarshal([]byte(body), &data); err != nil {
t.Error(err)
}
if data.Id != 1 {
t.Log("body: ", body)
t.Fatalf("unexpected index, expected 1, actual: %d", data.Id)
}
}
func TestHandler_IndexSpecified(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
defer s.Close()
status, _ := MustHTTP("GET", s.URL, map[string]string{"index": "2", "timeout": "200"}, nil, "")
// Write some data
_, _ = MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`)
if status != http.StatusOK {
t.Fatalf("unexpected status, expected: %d, actual: %d", http.StatusOK, status)
}
}
func TestHandler_IndexSpecifiedExpectTimeout(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
s := NewHTTPServer(srvr)
defer s.Close()
status, _ := MustHTTP("GET", s.URL, map[string]string{"index": "2", "timeout": "1"}, nil, "")
if status != http.StatusRequestTimeout {
t.Fatalf("unexpected status, expected: %d, actual: %d", http.StatusRequestTimeout, status)
}
}
func TestHandler_Ping(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
s := NewHTTPServer(srvr)