diff --git a/src/hapi/api.go b/src/hapi/api.go index 5927c2e401..8df7cd7bf2 100644 --- a/src/hapi/api.go +++ b/src/hapi/api.go @@ -5,6 +5,7 @@ import ( "coordinator" "encoding/json" "engine" + "fmt" "github.com/bmizerany/pat" "io/ioutil" "net" @@ -70,6 +71,7 @@ type Writer interface { type AllPointsWriter struct { memSeries map[string]*protocol.Series w http.ResponseWriter + precision TimePrecision } func (self *AllPointsWriter) yield(series *protocol.Series) error { @@ -84,7 +86,7 @@ func (self *AllPointsWriter) yield(series *protocol.Series) error { } func (self *AllPointsWriter) done() { - data, err := serializeMultipleSeries(self.memSeries) + data, err := serializeMultipleSeries(self.memSeries, self.precision) if err != nil { self.w.Write([]byte(err.Error())) self.w.WriteHeader(http.StatusInternalServerError) @@ -95,11 +97,12 @@ func (self *AllPointsWriter) done() { } type ChunkWriter struct { - w http.ResponseWriter + w http.ResponseWriter + precision TimePrecision } func (self *ChunkWriter) yield(series *protocol.Series) error { - data, err := serializeSingleSeries(series) + data, err := serializeSingleSeries(series, self.precision) if err != nil { return err } @@ -112,16 +115,46 @@ func (self *ChunkWriter) yield(series *protocol.Series) error { func (self *ChunkWriter) done() { } +type TimePrecision int + +const ( + MicrosecondPrecision TimePrecision = iota + MillisecondPrecision + SecondPrecision +) + +func TimePrecisionFromString(s string) (TimePrecision, error) { + switch s { + case "u": + return MicrosecondPrecision, nil + case "m": + return MillisecondPrecision, nil + case "s": + return SecondPrecision, nil + case "": + return MillisecondPrecision, nil + } + + return 0, fmt.Errorf("Unknown time precision %s", s) +} + func (self *HttpServer) query(w http.ResponseWriter, r *http.Request) { query := r.URL.Query().Get("q") db := r.URL.Query().Get(":db") + + precision, err := TimePrecisionFromString(r.URL.Query().Get("time_precision")) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + } + var writer Writer if r.URL.Query().Get("chunked") == "true" { - writer = &ChunkWriter{w} + writer = &ChunkWriter{w, precision} } else { - writer = &AllPointsWriter{map[string]*protocol.Series{}, w} + writer = &AllPointsWriter{map[string]*protocol.Series{}, w, precision} } - err := self.engine.RunQuery(db, query, writer.yield) + err = self.engine.RunQuery(db, query, writer.yield) if err != nil { w.Write([]byte(err.Error())) w.WriteHeader(http.StatusInternalServerError) @@ -158,6 +191,11 @@ func removeTimestampFieldDefinition(fields []*protocol.FieldDefinition) []*proto func (self *HttpServer) writePoints(w http.ResponseWriter, r *http.Request) { db := r.URL.Query().Get(":db") + precision, err := TimePrecisionFromString(r.URL.Query().Get("time_precision")) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + } series, err := ioutil.ReadAll(r.Body) if err != nil { @@ -213,8 +251,15 @@ func (self *HttpServer) writePoints(w http.ResponseWriter, r *http.Request) { for idx, field := range fields { if *field.Name == "time" { - // by default the timestamp is in milliseconds - _timestamp := int64(point[idx].(float64)) * 1000 + _timestamp := int64(point[idx].(float64)) + switch precision { + case SecondPrecision: + _timestamp *= 1000 + fallthrough + case MillisecondPrecision: + _timestamp *= 1000 + } + timestamp = &_timestamp continue } @@ -278,16 +323,16 @@ type SerializedSeries struct { Points [][]interface{} `json:"points"` } -func serializeSingleSeries(series *protocol.Series) ([]byte, error) { +func serializeSingleSeries(series *protocol.Series, precision TimePrecision) ([]byte, error) { arg := map[string]*protocol.Series{"": series} - return json.Marshal(serializeSeries(arg)[0]) + return json.Marshal(serializeSeries(arg, precision)[0]) } -func serializeMultipleSeries(series map[string]*protocol.Series) ([]byte, error) { - return json.Marshal(serializeSeries(series)) +func serializeMultipleSeries(series map[string]*protocol.Series, precision TimePrecision) ([]byte, error) { + return json.Marshal(serializeSeries(series, precision)) } -func serializeSeries(memSeries map[string]*protocol.Series) []*SerializedSeries { +func serializeSeries(memSeries map[string]*protocol.Series, precision TimePrecision) []*SerializedSeries { serializedSeries := []*SerializedSeries{} for _, series := range memSeries { @@ -298,7 +343,16 @@ func serializeSeries(memSeries map[string]*protocol.Series) []*SerializedSeries points := [][]interface{}{} for _, row := range series.Points { - rowValues := []interface{}{*row.Timestamp, *row.SequenceNumber} + timestamp := *row.GetTimestampInMicroseconds() + switch precision { + case SecondPrecision: + timestamp /= 1000 + fallthrough + case MillisecondPrecision: + timestamp /= 1000 + } + + rowValues := []interface{}{timestamp, *row.SequenceNumber} for idx, value := range row.Values { switch *series.Fields[idx].Type { case protocol.FieldDefinition_STRING: diff --git a/src/hapi/api_test.go b/src/hapi/api_test.go index 6a8f938d9b..f102ccace8 100644 --- a/src/hapi/api_test.go +++ b/src/hapi/api_test.go @@ -40,14 +40,14 @@ func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Se "values": [ { "string_value": "some_value"},{"int64_value": 1} ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 }, { "values": [ {"string_value": "some_value"},{"int64_value": 2} ], - "timestamp": 1381346632, + "timestamp": 1381346632000000, "sequence_number": 2 } ], @@ -60,14 +60,14 @@ func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Se "values": [ { "string_value": "some_value"},{"int64_value": 3} ], - "timestamp": 1381346633, + "timestamp": 1381346633000000, "sequence_number": 1 }, { "values": [ {"string_value": "some_value"},{"int64_value": 4} ], - "timestamp": 1381346634, + "timestamp": 1381346634000000, "sequence_number": 2 } ], @@ -117,6 +117,28 @@ func (self *ApiSuite) SetUpTest(c *C) { self.coordinator.series = nil } +func (self *ApiSuite) TestQueryWithSecondsPrecision(c *C) { + port := self.listener.Addr().(*net.TCPAddr).Port + query := "select * from foo where column_one == 'some_value';" + query = url.QueryEscape(query) + addr := fmt.Sprintf("http://localhost:%d/api/db/foo/series?q=%s&time_precision=s", port, query) + resp, err := http.Get(addr) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + data, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + series := []SerializedSeries{} + err = json.Unmarshal(data, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "foo") + // time, seq, column_one, column_two + c.Assert(series[0].Columns, HasLen, 4) + c.Assert(series[0].Points, HasLen, 4) + c.Assert(int(series[0].Points[0][0].(float64)), Equals, 1381346631) +} + func (self *ApiSuite) TestNotChunkedQuery(c *C) { port := self.listener.Addr().(*net.TCPAddr).Port query := "select * from foo where column_one == 'some_value';" @@ -136,6 +158,8 @@ func (self *ApiSuite) TestNotChunkedQuery(c *C) { // time, seq, column_one, column_two c.Assert(series[0].Columns, HasLen, 4) c.Assert(series[0].Points, HasLen, 4) + // timestamp precision is milliseconds by default + c.Assert(int(series[0].Points[0][0].(float64)), Equals, 1381346631000) } func (self *ApiSuite) TestChunkedQuery(c *C) { @@ -163,6 +187,41 @@ func (self *ApiSuite) TestChunkedQuery(c *C) { } } +func (self *ApiSuite) TestWriteDataWithTimeInSeconds(c *C) { + data := ` +[ + { + "points": [ + [1382131686, "1"] + ], + "name": "foo", + "columns": ["time", "column_one"] + } +] +` + + port := self.listener.Addr().(*net.TCPAddr).Port + addr := fmt.Sprintf("http://localhost:%d/api/db/foo/series?time_precision=s", port) + resp, err := http.Post(addr, "application/json", bytes.NewBufferString(data)) + c.Assert(err, IsNil) + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + fmt.Printf("body: %s\n", string(body)) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(self.coordinator.series, HasLen, 1) + series := self.coordinator.series[0] + + // check the types + c.Assert(series.Fields, HasLen, 1) + c.Assert(*series.Fields[0].Name, Equals, "column_one") + c.Assert(*series.Fields[0].Type, Equals, protocol.FieldDefinition_STRING) + + // check the values + c.Assert(series.Points, HasLen, 1) + c.Assert(*series.Points[0].Values[0].StringValue, Equals, "1") + c.Assert(*series.Points[0].GetTimestampInMicroseconds(), Equals, int64(1382131686000000)) +} + func (self *ApiSuite) TestWriteDataWithTime(c *C) { data := ` [