pvt #59166364. http api take a time precision parameter and convert timestamps from/to the given precision.
parent
00a5dd0c5e
commit
9431233305
|
@ -5,6 +5,7 @@ import (
|
|||
"coordinator"
|
||||
"encoding/json"
|
||||
"engine"
|
||||
"fmt"
|
||||
"github.com/bmizerany/pat"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
|
@ -70,6 +71,7 @@ type Writer interface {
|
|||
type AllPointsWriter struct {
|
||||
memSeries map[string]*protocol.Series
|
||||
w http.ResponseWriter
|
||||
precision TimePrecision
|
||||
}
|
||||
|
||||
func (self *AllPointsWriter) yield(series *protocol.Series) error {
|
||||
|
@ -84,7 +86,7 @@ func (self *AllPointsWriter) yield(series *protocol.Series) error {
|
|||
}
|
||||
|
||||
func (self *AllPointsWriter) done() {
|
||||
data, err := serializeMultipleSeries(self.memSeries)
|
||||
data, err := serializeMultipleSeries(self.memSeries, self.precision)
|
||||
if err != nil {
|
||||
self.w.Write([]byte(err.Error()))
|
||||
self.w.WriteHeader(http.StatusInternalServerError)
|
||||
|
@ -96,10 +98,11 @@ func (self *AllPointsWriter) done() {
|
|||
|
||||
type ChunkWriter struct {
|
||||
w http.ResponseWriter
|
||||
precision TimePrecision
|
||||
}
|
||||
|
||||
func (self *ChunkWriter) yield(series *protocol.Series) error {
|
||||
data, err := serializeSingleSeries(series)
|
||||
data, err := serializeSingleSeries(series, self.precision)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -112,16 +115,46 @@ func (self *ChunkWriter) yield(series *protocol.Series) error {
|
|||
func (self *ChunkWriter) done() {
|
||||
}
|
||||
|
||||
type TimePrecision int
|
||||
|
||||
const (
|
||||
MicrosecondPrecision TimePrecision = iota
|
||||
MillisecondPrecision
|
||||
SecondPrecision
|
||||
)
|
||||
|
||||
func TimePrecisionFromString(s string) (TimePrecision, error) {
|
||||
switch s {
|
||||
case "u":
|
||||
return MicrosecondPrecision, nil
|
||||
case "m":
|
||||
return MillisecondPrecision, nil
|
||||
case "s":
|
||||
return SecondPrecision, nil
|
||||
case "":
|
||||
return MillisecondPrecision, nil
|
||||
}
|
||||
|
||||
return 0, fmt.Errorf("Unknown time precision %s", s)
|
||||
}
|
||||
|
||||
func (self *HttpServer) query(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query().Get("q")
|
||||
db := r.URL.Query().Get(":db")
|
||||
|
||||
precision, err := TimePrecisionFromString(r.URL.Query().Get("time_precision"))
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Write([]byte(err.Error()))
|
||||
}
|
||||
|
||||
var writer Writer
|
||||
if r.URL.Query().Get("chunked") == "true" {
|
||||
writer = &ChunkWriter{w}
|
||||
writer = &ChunkWriter{w, precision}
|
||||
} else {
|
||||
writer = &AllPointsWriter{map[string]*protocol.Series{}, w}
|
||||
writer = &AllPointsWriter{map[string]*protocol.Series{}, w, precision}
|
||||
}
|
||||
err := self.engine.RunQuery(db, query, writer.yield)
|
||||
err = self.engine.RunQuery(db, query, writer.yield)
|
||||
if err != nil {
|
||||
w.Write([]byte(err.Error()))
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
|
@ -158,6 +191,11 @@ func removeTimestampFieldDefinition(fields []*protocol.FieldDefinition) []*proto
|
|||
|
||||
func (self *HttpServer) writePoints(w http.ResponseWriter, r *http.Request) {
|
||||
db := r.URL.Query().Get(":db")
|
||||
precision, err := TimePrecisionFromString(r.URL.Query().Get("time_precision"))
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Write([]byte(err.Error()))
|
||||
}
|
||||
|
||||
series, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
|
@ -213,8 +251,15 @@ func (self *HttpServer) writePoints(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
for idx, field := range fields {
|
||||
if *field.Name == "time" {
|
||||
// by default the timestamp is in milliseconds
|
||||
_timestamp := int64(point[idx].(float64)) * 1000
|
||||
_timestamp := int64(point[idx].(float64))
|
||||
switch precision {
|
||||
case SecondPrecision:
|
||||
_timestamp *= 1000
|
||||
fallthrough
|
||||
case MillisecondPrecision:
|
||||
_timestamp *= 1000
|
||||
}
|
||||
|
||||
timestamp = &_timestamp
|
||||
continue
|
||||
}
|
||||
|
@ -278,16 +323,16 @@ type SerializedSeries struct {
|
|||
Points [][]interface{} `json:"points"`
|
||||
}
|
||||
|
||||
func serializeSingleSeries(series *protocol.Series) ([]byte, error) {
|
||||
func serializeSingleSeries(series *protocol.Series, precision TimePrecision) ([]byte, error) {
|
||||
arg := map[string]*protocol.Series{"": series}
|
||||
return json.Marshal(serializeSeries(arg)[0])
|
||||
return json.Marshal(serializeSeries(arg, precision)[0])
|
||||
}
|
||||
|
||||
func serializeMultipleSeries(series map[string]*protocol.Series) ([]byte, error) {
|
||||
return json.Marshal(serializeSeries(series))
|
||||
func serializeMultipleSeries(series map[string]*protocol.Series, precision TimePrecision) ([]byte, error) {
|
||||
return json.Marshal(serializeSeries(series, precision))
|
||||
}
|
||||
|
||||
func serializeSeries(memSeries map[string]*protocol.Series) []*SerializedSeries {
|
||||
func serializeSeries(memSeries map[string]*protocol.Series, precision TimePrecision) []*SerializedSeries {
|
||||
serializedSeries := []*SerializedSeries{}
|
||||
|
||||
for _, series := range memSeries {
|
||||
|
@ -298,7 +343,16 @@ func serializeSeries(memSeries map[string]*protocol.Series) []*SerializedSeries
|
|||
|
||||
points := [][]interface{}{}
|
||||
for _, row := range series.Points {
|
||||
rowValues := []interface{}{*row.Timestamp, *row.SequenceNumber}
|
||||
timestamp := *row.GetTimestampInMicroseconds()
|
||||
switch precision {
|
||||
case SecondPrecision:
|
||||
timestamp /= 1000
|
||||
fallthrough
|
||||
case MillisecondPrecision:
|
||||
timestamp /= 1000
|
||||
}
|
||||
|
||||
rowValues := []interface{}{timestamp, *row.SequenceNumber}
|
||||
for idx, value := range row.Values {
|
||||
switch *series.Fields[idx].Type {
|
||||
case protocol.FieldDefinition_STRING:
|
||||
|
|
|
@ -40,14 +40,14 @@ func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Se
|
|||
"values": [
|
||||
{ "string_value": "some_value"},{"int64_value": 1}
|
||||
],
|
||||
"timestamp": 1381346631,
|
||||
"timestamp": 1381346631000000,
|
||||
"sequence_number": 1
|
||||
},
|
||||
{
|
||||
"values": [
|
||||
{"string_value": "some_value"},{"int64_value": 2}
|
||||
],
|
||||
"timestamp": 1381346632,
|
||||
"timestamp": 1381346632000000,
|
||||
"sequence_number": 2
|
||||
}
|
||||
],
|
||||
|
@ -60,14 +60,14 @@ func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Se
|
|||
"values": [
|
||||
{ "string_value": "some_value"},{"int64_value": 3}
|
||||
],
|
||||
"timestamp": 1381346633,
|
||||
"timestamp": 1381346633000000,
|
||||
"sequence_number": 1
|
||||
},
|
||||
{
|
||||
"values": [
|
||||
{"string_value": "some_value"},{"int64_value": 4}
|
||||
],
|
||||
"timestamp": 1381346634,
|
||||
"timestamp": 1381346634000000,
|
||||
"sequence_number": 2
|
||||
}
|
||||
],
|
||||
|
@ -117,6 +117,28 @@ func (self *ApiSuite) SetUpTest(c *C) {
|
|||
self.coordinator.series = nil
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestQueryWithSecondsPrecision(c *C) {
|
||||
port := self.listener.Addr().(*net.TCPAddr).Port
|
||||
query := "select * from foo where column_one == 'some_value';"
|
||||
query = url.QueryEscape(query)
|
||||
addr := fmt.Sprintf("http://localhost:%d/api/db/foo/series?q=%s&time_precision=s", port, query)
|
||||
resp, err := http.Get(addr)
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
series := []SerializedSeries{}
|
||||
err = json.Unmarshal(data, &series)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(series, HasLen, 1)
|
||||
c.Assert(series[0].Name, Equals, "foo")
|
||||
// time, seq, column_one, column_two
|
||||
c.Assert(series[0].Columns, HasLen, 4)
|
||||
c.Assert(series[0].Points, HasLen, 4)
|
||||
c.Assert(int(series[0].Points[0][0].(float64)), Equals, 1381346631)
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestNotChunkedQuery(c *C) {
|
||||
port := self.listener.Addr().(*net.TCPAddr).Port
|
||||
query := "select * from foo where column_one == 'some_value';"
|
||||
|
@ -136,6 +158,8 @@ func (self *ApiSuite) TestNotChunkedQuery(c *C) {
|
|||
// time, seq, column_one, column_two
|
||||
c.Assert(series[0].Columns, HasLen, 4)
|
||||
c.Assert(series[0].Points, HasLen, 4)
|
||||
// timestamp precision is milliseconds by default
|
||||
c.Assert(int(series[0].Points[0][0].(float64)), Equals, 1381346631000)
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestChunkedQuery(c *C) {
|
||||
|
@ -163,6 +187,41 @@ func (self *ApiSuite) TestChunkedQuery(c *C) {
|
|||
}
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestWriteDataWithTimeInSeconds(c *C) {
|
||||
data := `
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
[1382131686, "1"]
|
||||
],
|
||||
"name": "foo",
|
||||
"columns": ["time", "column_one"]
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
port := self.listener.Addr().(*net.TCPAddr).Port
|
||||
addr := fmt.Sprintf("http://localhost:%d/api/db/foo/series?time_precision=s", port)
|
||||
resp, err := http.Post(addr, "application/json", bytes.NewBufferString(data))
|
||||
c.Assert(err, IsNil)
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
fmt.Printf("body: %s\n", string(body))
|
||||
c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||
c.Assert(self.coordinator.series, HasLen, 1)
|
||||
series := self.coordinator.series[0]
|
||||
|
||||
// check the types
|
||||
c.Assert(series.Fields, HasLen, 1)
|
||||
c.Assert(*series.Fields[0].Name, Equals, "column_one")
|
||||
c.Assert(*series.Fields[0].Type, Equals, protocol.FieldDefinition_STRING)
|
||||
|
||||
// check the values
|
||||
c.Assert(series.Points, HasLen, 1)
|
||||
c.Assert(*series.Points[0].Values[0].StringValue, Equals, "1")
|
||||
c.Assert(*series.Points[0].GetTimestampInMicroseconds(), Equals, int64(1382131686000000))
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestWriteDataWithTime(c *C) {
|
||||
data := `
|
||||
[
|
||||
|
|
Loading…
Reference in New Issue