diff --git a/handler.go b/handler.go index 2f0bb17761..5f9f295713 100644 --- a/handler.go +++ b/handler.go @@ -8,6 +8,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/bmizerany/pat" "github.com/influxdb/influxdb/influxql" @@ -69,7 +70,7 @@ func NewHandler(s *Server) *Handler { h.mux.Get("/query", h.makeAuthenticationHandler(h.serveQuery)) // Data-ingest route. - h.mux.Post("/db/:db/series", h.makeAuthenticationHandler(h.serveWriteSeries)) + h.mux.Post("/write", h.makeAuthenticationHandler(h.serveWrite)) // Data node routes. h.mux.Get("/data_nodes", h.makeAuthenticationHandler(h.serveDataNodes)) @@ -156,59 +157,71 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, u *User) { _ = json.NewEncoder(w).Encode(results) } -// serveWriteSeries receives incoming series data and writes it to the database. -func (h *Handler) serveWriteSeries(w http.ResponseWriter, r *http.Request, u *User) { - // TODO: Authentication. +type batchWrite struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` +} - /* TEMPORARILY REMOVED FOR PROTOBUFS. - // Retrieve database from server. - db := h.server.Database(r.URL.Query().Get(":db")) - if db == nil { - h.error(w, ErrDatabaseNotFound.Error(), http.StatusNotFound) - return - } +// serveWrite receives incoming series data and writes it to the database. +func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, u *User) { + var br batchWrite - // Parse time precision from query parameters. - precision, err := parseTimePrecision(r.URL.Query().Get("time_precision")) - if err != nil { - h.error(w, err.Error(), http.StatusBadRequest) - return - } - - // Setup HTTP request reader. Wrap in a gzip reader if encoding set in header. - reader := r.Body - if r.Header.Get("Content-Encoding") == "gzip" { - if reader, err = gzip.NewReader(r.Body); err != nil { - h.error(w, err.Error(), http.StatusBadRequest) - return - } - } - - // Decode series from reader. - ss := []*serializedSeries{} - dec := json.NewDecoder(reader) + dec := json.NewDecoder(r.Body) dec.UseNumber() - if err := dec.Decode(&ss); err != nil { - h.error(w, err.Error(), http.StatusBadRequest) + + var writeError = func(result Result, statusCode int) { + w.WriteHeader(statusCode) + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(&result) return } - // Convert the wire format to the internal representation of the time series. - series, err := serializedSeriesSlice(ss).series(precision) - if err != nil { - h.error(w, err.Error(), http.StatusBadRequest) - return - } - - // Write series data to the database. - // TODO: Allow multiple series written to DB at once. - for _, s := range series { - if err := db.WriteSeries(s); err != nil { - h.error(w, err.Error(), http.StatusInternalServerError) + for { + if err := dec.Decode(&br); err != nil { + if err.Error() == "EOF" { + w.WriteHeader(http.StatusOK) + return + } + writeError(Result{Err: err}, http.StatusInternalServerError) return } + + if br.Database == "" { + writeError(Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError) + return + } + + if h.server.databases[br.Database] == nil { + writeError(Result{Err: fmt.Errorf("database not found: %q", br.Database)}, http.StatusNotFound) + return + } + + // TODO corylanou: Check if user can write to specified database + //if !user_can_write(br.Database) { + //writeError(&Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", u.Name)}, http.StatusUnauthorized) + //return + //} + + for _, p := range br.Points { + if p.Timestamp.IsZero() { + p.Timestamp = br.Timestamp + } + if len(br.Tags) > 0 { + for k, _ := range br.Tags { + if p.Tags[k] == "" { + p.Tags[k] = br.Tags[k] + } + } + } + if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []Point{p}); err != nil { + writeError(Result{Err: err}, http.StatusInternalServerError) + return + } + } } - */ } // serveMetastore returns a copy of the metastore. diff --git a/handler_test.go b/handler_test.go index 6665e5e53f..34d9114bfc 100644 --- a/handler_test.go +++ b/handler_test.go @@ -672,6 +672,72 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) { } } +func TestHandler_serveWriteSeries(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + srvr.CreateDatabase("foo") + srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) + s := NewHTTPServer(srvr) + defer s.Close() + + status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`) + + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } +} + +func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`) + + expectedStatus := http.StatusNotFound + if status != expectedStatus { + t.Fatalf("unexpected status: expected: %d, actual: %d", expectedStatus, status) + } + + response := `{"error":"database not found: \"foo\""}` + if body != response { + t.Fatalf("unexpected body: expected %s, actual %s", response, body) + } +} + +func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`) + + if status != http.StatusInternalServerError { + t.Fatalf("unexpected status: expected: %d, actual: %d", http.StatusInternalServerError, status) + } + + response := `{"error":"invalid character 'o' in literal false (expecting 'a')"}` + if body != response { + t.Fatalf("unexpected body: expected %s, actual %s", response, body) + } +} + +func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) { + srvr := OpenServer(NewMessagingClient()) + s := NewHTTPServer(srvr) + defer s.Close() + + status, body := MustHTTP("POST", s.URL+`/write`, nil, nil, `{}`) + + if status != http.StatusInternalServerError { + t.Fatalf("unexpected status: expected: %d, actual: %d", http.StatusInternalServerError, status) + } + + response := `{"error":"database is required"}` + if body != response { + t.Fatalf("unexpected body: expected %s, actual %s", response, body) + } +} + // Utility functions for this test suite. func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) { diff --git a/server.go b/server.go index 04f360eabf..e5a599c953 100644 --- a/server.go +++ b/server.go @@ -1402,6 +1402,9 @@ func (s *Server) createSeriesIfNotExists(database, name string, tags map[string] // Try to find series locally first. s.mu.RLock() idx := s.databases[database] + if idx == nil { + return 0, fmt.Errorf("database not found %q", database) + } if _, series := idx.MeasurementAndSeries(name, tags); series != nil { s.mu.RUnlock() return series.ID, nil