From 9254aa8b546d0ea699866e3f7cece2288951b4c1 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Thu, 17 Oct 2013 18:10:47 -0400 Subject: [PATCH] implement the write api. --- src/hapi/api.go | 109 +++++++++++++++++++++++++++++++++++++++++-- src/hapi/api_test.go | 68 +++++++++++++++++++++++++-- 2 files changed, 169 insertions(+), 8 deletions(-) diff --git a/src/hapi/api.go b/src/hapi/api.go index 98f123a192..82ef8e6859 100644 --- a/src/hapi/api.go +++ b/src/hapi/api.go @@ -2,9 +2,11 @@ package hapi import ( log "code.google.com/p/log4go" + "coordinator" "encoding/json" "engine" "github.com/bmizerany/pat" + "io/ioutil" "net" "net/http" "protocol" @@ -12,16 +14,18 @@ import ( ) type HttpServer struct { - conn net.Listener - config *Configuration - engine engine.EngineI - shutdown chan bool + conn net.Listener + config *Configuration + engine engine.EngineI + coordinator coordinator.Coordinator + shutdown chan bool } -func NewHttpServer(config *Configuration, theEngine engine.EngineI) *HttpServer { +func NewHttpServer(config *Configuration, theEngine engine.EngineI, theCoordinator coordinator.Coordinator) *HttpServer { self := &HttpServer{} self.config = config self.engine = theEngine + self.coordinator = theCoordinator self.shutdown = make(chan bool) return self } @@ -121,6 +125,7 @@ func (self *HttpServer) query(w http.ResponseWriter, r *http.Request) { if err != nil { w.Write([]byte(err.Error())) w.WriteHeader(http.StatusInternalServerError) + return } writer.done() @@ -135,6 +140,100 @@ func (self *HttpServer) query(w http.ResponseWriter, r *http.Request) { //   {"name": "seriesname", "columns": ["time", "email"], "points": [[], []]} // ] func (self *HttpServer) writePoints(w http.ResponseWriter, r *http.Request) { + db := r.URL.Query().Get(":db") + + series, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + serializedSeries := []*SerializedSeries{} + err = json.Unmarshal(series, &serializedSeries) + if err != nil { + w.Write([]byte(err.Error())) + w.WriteHeader(http.StatusBadRequest) + return + } + + // convert the wire format to the internal representation of the time series + for _, s := range serializedSeries { + if len(s.Points) == 0 { + continue + } + + fields := []*protocol.FieldDefinition{} + for idx, column := range s.Columns { + var fieldType protocol.FieldDefinition_Type + switch s.Points[0][idx].(type) { + case int: + fieldType = protocol.FieldDefinition_INT64 + case float64: + fieldType = protocol.FieldDefinition_DOUBLE + case string: + fieldType = protocol.FieldDefinition_STRING + case bool: + fieldType = protocol.FieldDefinition_BOOL + } + + _column := column + fields = append(fields, &protocol.FieldDefinition{ + Name: &_column, + Type: &fieldType, + }) + } + + points := []*protocol.Point{} + for _, point := range s.Points { + values := []*protocol.FieldValue{} + var timestamp *int64 + for idx, field := range fields { + if s.Columns[idx] == "time" { + _timestamp := point[idx].(int64) + timestamp = &_timestamp + } + + switch *field.Type { + case protocol.FieldDefinition_STRING: + if str, ok := point[idx].(string); ok { + values = append(values, &protocol.FieldValue{StringValue: &str}) + continue + } + case protocol.FieldDefinition_INT64: + if integer, ok := point[idx].(int); ok { + temp := int64(integer) + values = append(values, &protocol.FieldValue{Int64Value: &temp}) + continue + } + case protocol.FieldDefinition_DOUBLE: + if double, ok := point[idx].(float64); ok { + values = append(values, &protocol.FieldValue{DoubleValue: &double}) + continue + } + case protocol.FieldDefinition_BOOL: + if boolean, ok := point[idx].(bool); ok { + values = append(values, &protocol.FieldValue{BoolValue: &boolean}) + continue + } + } + + // if we reached this line then the dynamic type didn't match + w.WriteHeader(http.StatusBadRequest) + return + } + points = append(points, &protocol.Point{ + Values: values, + Timestamp: timestamp, + }) + } + + series := &protocol.Series{ + Name: &s.Name, + Fields: fields, + Points: points, + } + + self.coordinator.WriteSeriesData(db, series) + } } type Point struct { diff --git a/src/hapi/api_test.go b/src/hapi/api_test.go index c5c9fa12cf..817238d31f 100644 --- a/src/hapi/api_test.go +++ b/src/hapi/api_test.go @@ -1,6 +1,7 @@ package hapi import ( + "bytes" "common" "encoding/json" "fmt" @@ -9,6 +10,7 @@ import ( "net" "net/http" "net/url" + "parser" "protocol" "testing" "time" @@ -20,8 +22,9 @@ func Test(t *testing.T) { } type ApiSuite struct { - listener net.Listener - server *HttpServer + listener net.Listener + server *HttpServer + coordinator *MockCoordinator } var _ = Suite(&ApiSuite{}) @@ -82,8 +85,21 @@ func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Se return yield(series[1]) } +type MockCoordinator struct { + series []*protocol.Series +} + +func (self *MockCoordinator) DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error { + return nil +} +func (self *MockCoordinator) WriteSeriesData(db string, series *protocol.Series) error { + self.series = append(self.series, series) + return nil +} + func (self *ApiSuite) SetUpSuite(c *C) { - self.server = NewHttpServer(nil, &MockEngine{}) + self.coordinator = &MockCoordinator{} + self.server = NewHttpServer(nil, &MockEngine{}, self.coordinator) var err error self.listener, err = net.Listen("tcp4", ":") c.Assert(err, IsNil) @@ -141,3 +157,49 @@ func (self *ApiSuite) TestChunkedQuery(c *C) { c.Assert(series.Points, HasLen, 2) } } + +func (self *ApiSuite) TestWriteData(c *C) { + data := ` +[ + { + "points": [ + ["1", 1, 1.0, true], + ["2", 2, 2.0, false], + ["3", 3, 3.0, true] + ], + "name": "foo", + "columns": ["column_one", "column_two", "column_three", "column_four"] + } +] +` + + port := self.listener.Addr().(*net.TCPAddr).Port + addr := fmt.Sprintf("http://localhost:%d/api/db/foo/series", port) + resp, err := http.Post(addr, "application/json", bytes.NewBufferString(data)) + c.Assert(err, IsNil) + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + fmt.Printf("body: %s\n", string(body)) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(self.coordinator.series, HasLen, 1) + series := self.coordinator.series[0] + c.Assert(series.Fields, HasLen, 4) + + // check the types + c.Assert(*series.Fields[0].Name, Equals, "column_one") + c.Assert(*series.Fields[0].Type, Equals, protocol.FieldDefinition_STRING) + // TODO: cannot get an int64 from a json object + c.Assert(*series.Fields[1].Name, Equals, "column_two") + c.Assert(*series.Fields[1].Type, Equals, protocol.FieldDefinition_DOUBLE) + c.Assert(*series.Fields[2].Name, Equals, "column_three") + c.Assert(*series.Fields[2].Type, Equals, protocol.FieldDefinition_DOUBLE) + c.Assert(*series.Fields[3].Name, Equals, "column_four") + c.Assert(*series.Fields[3].Type, Equals, protocol.FieldDefinition_BOOL) + + // check the values + c.Assert(series.Points, HasLen, 3) + c.Assert(*series.Points[0].Values[0].StringValue, Equals, "1") + c.Assert(*series.Points[0].Values[1].DoubleValue, Equals, 1.0) + c.Assert(*series.Points[0].Values[2].DoubleValue, Equals, 1.0) + c.Assert(*series.Points[0].Values[3].BoolValue, Equals, true) +}