diff --git a/httpd/handler.go b/httpd/handler.go index e48c26b79f..cd06b7f0a0 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -99,6 +99,14 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) "process_continuous_queries", "POST", "/process_continuous_queries", false, false, h.serveProcessContinuousQueries, }, + route{ + "index-json", // Query serving route. + "GET", "/index.json", true, true, h.serveIndex, + }, + route{ + "index", // Query serving route. + "GET", "/", true, true, h.serveIndex, + }, ) for _, r := range h.routes { @@ -253,6 +261,74 @@ func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +// serveIndex returns the current index of the node as the body of the response (optionally in json) +// Takes optional parameters: +// index - If specified, will poll for index before returning +// timeout - time in milliseconds to wait until index is met before erring out +// default timeout if not specified is 100 milliseconds +func (h *Handler) serveIndex(w http.ResponseWriter, r *http.Request) { + index, _ := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64) + timeout, _ := strconv.Atoi(r.URL.Query().Get("timeout")) + + if index > 0 { + var d time.Duration + if timeout == 0 { + d = 100 * time.Millisecond + } else { + d = time.Duration(timeout) * time.Millisecond + } + err := h.pollForIndex(index, d) + if err != nil { + w.WriteHeader(http.StatusRequestTimeout) + return + } + } + if !strings.HasSuffix(strings.ToLower(r.URL.Path), ".json") { + w.Write([]byte(fmt.Sprintf("%d", h.server.Index()))) + return + } + w.Header().Add("content-type", "application/json") + + pretty := r.URL.Query().Get("pretty") == "true" + + data := struct { + Index uint64 `json:"index"` + }{ + Index: h.server.Index(), + } + var b []byte + if pretty { + b, _ = json.MarshalIndent(data, "", " ") + } else { + b, _ = json.Marshal(data) + } + w.Write(b) +} + +// pollForIndex will poll until either the index is met or it times out +// timeout is in milliseconds +func (h *Handler) pollForIndex(index uint64, timeout time.Duration) error { + done := make(chan struct{}) + + go func() { + for { + if h.server.Index() >= index { + done <- struct{}{} + } + time.Sleep(10 * time.Millisecond) + } + }() + + for { + select { + case <-done: + return nil + case <-time.Tick(timeout): + return fmt.Errorf("timed out") + } + } +} + // serveDataNodes returns a list of all data nodes in the cluster. func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) { // Generate a list of objects for encoding to the API. diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 148537e82e..b20bce3819 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -536,6 +536,76 @@ func TestHandler_GzipDisabled(t *testing.T) { } } +func TestHandler_Index(t *testing.T) { + srvr := OpenAuthlessServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("GET", s.URL, nil, nil, "") + + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } + + if body != "1" { + t.Fatalf("unexpected body. expected %q, actual %q", "1", body) + } +} + +func TestHandler_IndexJson(t *testing.T) { + srvr := OpenAuthlessServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("GET", s.URL+`/index.json`, nil, nil, "") + + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } + + var data = struct { + Id uint64 `json:"index"` + }{} + + if err := json.Unmarshal([]byte(body), &data); err != nil { + t.Error(err) + } + if data.Id != 1 { + t.Log("body: ", body) + t.Fatalf("unexpected index, expected 1, actual: %d", data.Id) + } +} + +func TestHandler_IndexSpecified(t *testing.T) { + srvr := OpenAuthlessServer(NewMessagingClient()) + srvr.CreateDatabase("foo") + srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) + + s := NewHTTPServer(srvr) + defer s.Close() + + status, _ := MustHTTP("GET", s.URL, map[string]string{"index": "2", "timeout": "200"}, nil, "") + + // Write some data + _, _ = MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`) + + if status != http.StatusOK { + t.Fatalf("unexpected status, expected: %d, actual: %d", http.StatusOK, status) + } +} + +func TestHandler_IndexSpecifiedExpectTimeout(t *testing.T) { + srvr := OpenAuthlessServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, _ := MustHTTP("GET", s.URL, map[string]string{"index": "2", "timeout": "1"}, nil, "") + + if status != http.StatusRequestTimeout { + t.Fatalf("unexpected status, expected: %d, actual: %d", http.StatusRequestTimeout, status) + } +} + func TestHandler_Ping(t *testing.T) { srvr := OpenAuthlessServer(NewMessagingClient()) s := NewHTTPServer(srvr)