From 41a2c8a8f3ebc61172e206330465f6d980580e06 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Fri, 18 Oct 2013 16:25:33 -0400 Subject: [PATCH] timestamp madness. Fix the aggregators and the datastore to use microseconds. --- src/common/time.go | 11 ++ src/datastore/datastore_test.go | 13 +- src/datastore/leveldb_datastore.go | 13 +- src/engine/aggregator.go | 15 +- src/engine/engine.go | 5 +- src/engine/engine_test.go | 234 ++++++++++++++-------------- src/parser/query_api.go | 49 +++--- src/protocol/protocol_extensions.go | 8 + 8 files changed, 197 insertions(+), 151 deletions(-) create mode 100644 src/common/time.go diff --git a/src/common/time.go b/src/common/time.go new file mode 100644 index 0000000000..e2a4eac57d --- /dev/null +++ b/src/common/time.go @@ -0,0 +1,11 @@ +package common + +import ( + "time" +) + +type EnhancedTime time.Time + +func TimeToMicroseconds(t time.Time) int64 { + return t.Unix()*int64(time.Second/time.Microsecond) + int64(t.Nanosecond())/int64(time.Microsecond) +} diff --git a/src/datastore/datastore_test.go b/src/datastore/datastore_test.go index 93492cbe6e..3306a4caf9 100644 --- a/src/datastore/datastore_test.go +++ b/src/datastore/datastore_test.go @@ -41,6 +41,7 @@ func stringToSeries(seriesString string, timestamp int64, c *C) *protocol.Series series := &protocol.Series{} err := json.Unmarshal([]byte(seriesString), &series) c.Assert(err, IsNil) + timestamp *= 1000000 for _, point := range series.Points { point.Timestamp = ×tamp } @@ -110,8 +111,8 @@ func (self *DatastoreSuite) TestCanWriteAndRetrievePoints(c *C) { c.Assert(len(resultSeries.Fields), Equals, 1) c.Assert(*resultSeries.Points[0].SequenceNumber, Equals, uint32(2)) c.Assert(*resultSeries.Points[1].SequenceNumber, Equals, uint32(1)) - c.Assert(*resultSeries.Points[0].Timestamp, Equals, pointTime) - c.Assert(*resultSeries.Points[1].Timestamp, Equals, pointTime) + c.Assert(*resultSeries.Points[0].GetTimestampInMicroseconds(), Equals, pointTime*1000000) + c.Assert(*resultSeries.Points[1].GetTimestampInMicroseconds(), Equals, pointTime*1000000) c.Assert(*resultSeries.Points[0].Values[0].Int64Value, Equals, int64(2)) c.Assert(*resultSeries.Points[1].Values[0].Int64Value, Equals, int64(3)) c.Assert(resultSeries, Not(DeepEquals), series) @@ -192,8 +193,8 @@ func (self *DatastoreSuite) TestCanWriteDataWithDifferentTimesAndSeries(c *C) { c.Assert(len(results.Fields), Equals, 1) c.Assert(*results.Points[0].SequenceNumber, Equals, uint32(1)) c.Assert(*results.Points[1].SequenceNumber, Equals, uint32(3)) - c.Assert(*results.Points[0].Timestamp, Equals, now) - c.Assert(*results.Points[1].Timestamp, Equals, secondAgo) + c.Assert(*results.Points[0].GetTimestampInMicroseconds(), Equals, now*1000000) + c.Assert(*results.Points[1].GetTimestampInMicroseconds(), Equals, secondAgo*1000000) c.Assert(*results.Points[0].Values[0].DoubleValue, Equals, float64(0.1)) c.Assert(*results.Points[1].Values[0].DoubleValue, Equals, float64(23.2)) results = executeQuery("db1", "select val from foo;", db, c) @@ -259,8 +260,8 @@ func (self *DatastoreSuite) TestCanQueryBasedOnTime(c *C) { c.Assert(len(results.Fields), Equals, 1) c.Assert(*results.Points[0].SequenceNumber, Equals, uint32(3)) c.Assert(*results.Points[1].SequenceNumber, Equals, uint32(3)) - c.Assert(*results.Points[0].Timestamp, Equals, now) - c.Assert(*results.Points[1].Timestamp, Equals, minutesAgo) + c.Assert(*results.Points[0].GetTimestampInMicroseconds(), Equals, now*1000000) + c.Assert(*results.Points[1].GetTimestampInMicroseconds(), Equals, minutesAgo*1000000) c.Assert(*results.Points[0].Values[0].Int64Value, Equals, int64(3)) c.Assert(*results.Points[1].Values[0].Int64Value, Equals, int64(4)) } diff --git a/src/datastore/leveldb_datastore.go b/src/datastore/leveldb_datastore.go index bb4d479ecc..30f302ff9b 100644 --- a/src/datastore/leveldb_datastore.go +++ b/src/datastore/leveldb_datastore.go @@ -3,6 +3,7 @@ package datastore import ( "bytes" "code.google.com/p/goprotobuf/proto" + "common" "encoding/binary" "errors" "fmt" @@ -108,7 +109,7 @@ func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol. for _, point := range series.Points { timestampBuffer := bytes.NewBuffer(make([]byte, 0, 8)) sequenceNumberBuffer := bytes.NewBuffer(make([]byte, 0, 8)) - binary.Write(timestampBuffer, binary.BigEndian, self.convertTimestampToUint(point.Timestamp)) + binary.Write(timestampBuffer, binary.BigEndian, self.convertTimestampToUint(point.GetTimestampInMicroseconds())) binary.Write(sequenceNumberBuffer, binary.BigEndian, uint64(*point.SequenceNumber)) pointKey := append(append(id, timestampBuffer.Bytes()...), sequenceNumberBuffer.Bytes()...) data, err2 := proto.Marshal(point.Values[fieldIndex]) @@ -152,7 +153,7 @@ func (self *LevelDbDatastore) DeleteRangeOfSeries(database, series string, start if err != nil { return err } - startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(startTime, endTime) + startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime)) ro := levigo.NewReadOptions() defer ro.Close() rangesToCompact := make([]*levigo.Range, 0) @@ -206,6 +207,10 @@ func (self *LevelDbDatastore) DeleteRangeOfRegex(database string, regex *regexp. } func (self *LevelDbDatastore) byteArraysForStartAndEndTimes(startTime, endTime int64) ([]byte, []byte) { + if startTime < 1382361894000 { + panic("wtf") + } + startTimeBuffer := bytes.NewBuffer(make([]byte, 0, 8)) binary.Write(startTimeBuffer, binary.BigEndian, self.convertTimestampToUint(&startTime)) startTimeBytes := startTimeBuffer.Bytes() @@ -216,7 +221,7 @@ func (self *LevelDbDatastore) byteArraysForStartAndEndTimes(startTime, endTime i } func (self *LevelDbDatastore) executeQueryForSeries(database, series string, columns []string, query *parser.Query, yield func(*protocol.Series) error) error { - startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(query.GetStartTime().Unix(), query.GetEndTime().Unix()) + startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(query.GetStartTime()), common.TimeToMicroseconds(query.GetEndTime())) fields, err := self.getFieldsForSeries(database, series, columns) if err != nil { @@ -306,7 +311,7 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col var sequence uint64 binary.Read(bytes.NewBuffer(rawColumnValues[i].sequence), binary.BigEndian, &sequence) seq32 := uint32(sequence) - point.Timestamp = &time + point.SetTimestampInMicroseconds(time) point.SequenceNumber = &seq32 rawColumnValues[i] = nil } diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index 8db76d4a84..a42e4e2879 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -83,7 +83,7 @@ func NewCountAggregator(*parser.Query, *parser.Value) (Aggregator, error) { // type TimestampAggregator struct { - duration *time.Duration + duration *int64 timestamps map[string]map[interface{}]int64 } @@ -94,9 +94,9 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{} self.timestamps[series] = timestamps } if self.duration != nil { - timestamps[group] = time.Unix(*p.Timestamp, 0).Round(*self.duration).Unix() + timestamps[group] = *p.GetTimestampInMicroseconds() / *self.duration * *self.duration } else { - timestamps[group] = *p.Timestamp + timestamps[group] = *p.GetTimestampInMicroseconds() } return nil } @@ -125,9 +125,16 @@ func NewTimestampAggregator(query *parser.Query, _ *parser.Value) (Aggregator, e return nil, err } + var durationPtr *int64 + + if duration != nil { + newDuration := int64(*duration / time.Microsecond) + durationPtr = &newDuration + } + return &TimestampAggregator{ timestamps: make(map[string]map[interface{}]int64), - duration: duration, + duration: durationPtr, }, nil } diff --git a/src/engine/engine.go b/src/engine/engine.go index b21f6478f6..12c3f60802 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -70,7 +70,8 @@ func getValueFromPoint(value *protocol.FieldValue, fType protocol.FieldDefinitio } func getTimestampFromPoint(window time.Duration, point *protocol.Point) int64 { - return time.Unix(*point.Timestamp, 0).Round(window).Unix() + multiplier := int64(window / time.Microsecond) + return *point.GetTimestampInMicroseconds() / int64(multiplier) * int64(multiplier) } type Mapper func(*protocol.Point) interface{} @@ -287,10 +288,10 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(database string, query *pa timestamp := *timestampAggregator.GetValue(table, groupId)[0].Int64Value /* groupPoints := []*protocol.Point{} */ point := &protocol.Point{ - Timestamp: ×tamp, SequenceNumber: &sequenceNumber, Values: []*protocol.FieldValue{}, } + point.SetTimestampInMicroseconds(timestamp) for _, aggregator := range aggregators { // point.Values = append(point.Values, aggregator.GetValue(table, groupId)[0]) diff --git a/src/engine/engine_test.go b/src/engine/engine_test.go index f304e4fd72..ee9610f998 100644 --- a/src/engine/engine_test.go +++ b/src/engine/engine_test.go @@ -104,7 +104,7 @@ func (self *EngineSuite) TestBasicQuery(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -137,7 +137,7 @@ func (self *EngineSuite) TestCountQuery(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 }, { @@ -146,7 +146,7 @@ func (self *EngineSuite) TestCountQuery(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 2 } ], @@ -170,7 +170,7 @@ func (self *EngineSuite) TestCountQuery(c *C) { "int64_value": 2 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -199,7 +199,7 @@ func (self *EngineSuite) TestCountQueryWithRegexTables(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -219,7 +219,7 @@ func (self *EngineSuite) TestCountQueryWithRegexTables(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -243,7 +243,7 @@ func (self *EngineSuite) TestCountQueryWithRegexTables(c *C) { "int64_value": 1 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -263,7 +263,7 @@ func (self *EngineSuite) TestCountQueryWithRegexTables(c *C) { "int64_value": 1 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -292,7 +292,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 }, { @@ -301,7 +301,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) { "string_value": "another_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -328,7 +328,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 }, { @@ -340,7 +340,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) { "string_value": "another_value" } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -396,7 +396,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C "int64_value": 1 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 }, { @@ -408,7 +408,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C "int64_value": 2 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 }, { @@ -420,7 +420,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C "int64_value": 1 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -455,7 +455,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C "int64_value": 1 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 }, { @@ -470,7 +470,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C "int64_value": 2 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 }, { @@ -485,7 +485,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C "int64_value": 1 } ], - "timestamp": 1381346631, + "timestamp": 1381346631000000, "sequence_number": 1 } ], @@ -521,7 +521,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346641, + "timestamp": 1381346641000000, "sequence_number": 1 }, { @@ -530,7 +530,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) { "string_value": "another_value" } ], - "timestamp": 1381346701, + "timestamp": 1381346701000000, "sequence_number": 1 }, { @@ -539,7 +539,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) { "string_value": "some_value" } ], - "timestamp": 1381346721, + "timestamp": 1381346721000000, "sequence_number": 1 } ], @@ -563,7 +563,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) { "int64_value": 1 } ], - "timestamp": 1381346640, + "timestamp": 1381346640000000, "sequence_number": 1 }, { @@ -572,7 +572,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) { "int64_value": 2 } ], - "timestamp": 1381346700, + "timestamp": 1381346700000000, "sequence_number": 1 } ], @@ -592,9 +592,9 @@ func (self *EngineSuite) TestCountQueryWithGroupByTimeAndColumn(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "string_value": "some_value" }], "timestamp": 1381346641, "sequence_number": 1 }, - { "values": [{ "string_value": "another_value" }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "string_value": "some_value" }], "timestamp": 1381346721, "sequence_number": 1 } + { "values": [{ "string_value": "some_value" }], "timestamp": 1381346641000000, "sequence_number": 1 }, + { "values": [{ "string_value": "another_value" }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "string_value": "some_value" }], "timestamp": 1381346721000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -606,9 +606,9 @@ func (self *EngineSuite) TestCountQueryWithGroupByTimeAndColumn(c *C) { runQuery(engine, "select count(*), column_one from foo group by time(1m), column_one;", c, `[ { "points": [ - { "values": [{ "int64_value": 1 }, { "string_value": "some_value" }], "timestamp": 1381346640, "sequence_number": 1 }, - { "values": [{ "int64_value": 1 }, { "string_value": "another_value" }], "timestamp": 1381346700, "sequence_number": 1 }, - { "values": [{ "int64_value": 1 }, { "string_value": "some_value" }], "timestamp": 1381346700, "sequence_number": 1 } + { "values": [{ "int64_value": 1 }, { "string_value": "some_value" }], "timestamp": 1381346640000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 1 }, { "string_value": "another_value" }], "timestamp": 1381346700000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 1 }, { "string_value": "some_value" }], "timestamp": 1381346700000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -623,9 +623,9 @@ func (self *EngineSuite) TestMinQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "int64_value": 3 }], "timestamp": 1381346641, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346721, "sequence_number": 1 } + { "values": [{ "int64_value": 3 }], "timestamp": 1381346641000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346721000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -637,8 +637,8 @@ func (self *EngineSuite) TestMinQueryWithGroupByTime(c *C) { runQuery(engine, "select min(column_one) from foo group by time(1m);", c, `[ { "points": [ - { "values": [{ "int64_value": 3 }], "timestamp": 1381346640, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346700, "sequence_number": 1 } + { "values": [{ "int64_value": 3 }], "timestamp": 1381346640000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346700000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -652,9 +652,9 @@ func (self *EngineSuite) TestMaxQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "int64_value": 3 }], "timestamp": 1381346641, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346721, "sequence_number": 1 } + { "values": [{ "int64_value": 3 }], "timestamp": 1381346641000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346721000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -666,8 +666,8 @@ func (self *EngineSuite) TestMaxQueryWithGroupByTime(c *C) { runQuery(engine, "select max(column_one) from foo group by time(1m);", c, `[ { "points": [ - { "values": [{ "int64_value": 3 }], "timestamp": 1381346640, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346700, "sequence_number": 1 } + { "values": [{ "int64_value": 3 }], "timestamp": 1381346640000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346700000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -682,9 +682,9 @@ func (self *EngineSuite) TestMaxMinQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "int64_value": 3 }], "timestamp": 1381346641, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346721, "sequence_number": 1 } + { "values": [{ "int64_value": 3 }], "timestamp": 1381346641000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346721000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -696,8 +696,8 @@ func (self *EngineSuite) TestMaxMinQueryWithGroupByTime(c *C) { runQuery(engine, "select max(column_one), min(column_one) from foo group by time(1m);", c, `[ { "points": [ - { "values": [{ "int64_value": 3 }, { "int64_value": 3 }], "timestamp": 1381346640, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }, { "int64_value": 4 }], "timestamp": 1381346700, "sequence_number": 1 } + { "values": [{ "int64_value": 3 }, { "int64_value": 3 }], "timestamp": 1381346640000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }, { "int64_value": 4 }], "timestamp": 1381346700000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -713,20 +713,20 @@ func (self *EngineSuite) TestPercentileQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 7 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 2 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 9 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 } + { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 7 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 2 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 9 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 7 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -738,8 +738,8 @@ func (self *EngineSuite) TestPercentileQueryWithGroupByTime(c *C) { runQuery(engine, "select percentile(column_one, 80) from foo group by time(1m);", c, `[ { "points": [ - { "values": [{ "int64_value": 6 }], "timestamp": 1381346700, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346760, "sequence_number": 1 } + { "values": [{ "int64_value": 6 }], "timestamp": 1381346700000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346760000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -753,20 +753,20 @@ func (self *EngineSuite) TestMedianQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 7 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 2 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 9 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 1 }], "timestamp": 1381346741, "sequence_number": 1 } + { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 7 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 2 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 9 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 7 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 1 }], "timestamp": 1381346771000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -778,8 +778,8 @@ func (self *EngineSuite) TestMedianQueryWithGroupByTime(c *C) { runQuery(engine, "select median(column_one) from foo group by time(1m);", c, `[ { "points": [ - { "values": [{ "int64_value": 4 }], "timestamp": 1381346700, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346760, "sequence_number": 1 } + { "values": [{ "int64_value": 4 }], "timestamp": 1381346700000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346760000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -793,20 +793,20 @@ func (self *EngineSuite) TestMeanQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 7 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 2 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 9 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 } + { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 7 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 2 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 9 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 7 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -818,8 +818,8 @@ func (self *EngineSuite) TestMeanQueryWithGroupByTime(c *C) { runQuery(engine, "select mean(column_one) from foo group by time(1m);", c, `[ { "points": [ - { "values": [{ "double_value": 4 }], "timestamp": 1381346700, "sequence_number": 1 }, - { "values": [{ "double_value": 6 }], "timestamp": 1381346760, "sequence_number": 1 } + { "values": [{ "double_value": 4 }], "timestamp": 1381346700000000, "sequence_number": 1 }, + { "values": [{ "double_value": 6 }], "timestamp": 1381346760000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -833,12 +833,12 @@ func (self *EngineSuite) TestSumQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 } + { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -850,8 +850,8 @@ func (self *EngineSuite) TestSumQueryWithGroupByTime(c *C) { runQuery(engine, "select sum(column_one) from foo group by time(1m);", c, `[ { "points": [ - { "values": [{ "int64_value": 11 }], "timestamp": 1381346700, "sequence_number": 1 }, - { "values": [{ "int64_value": 16 }], "timestamp": 1381346760, "sequence_number": 1 } + { "values": [{ "int64_value": 11 }], "timestamp": 1381346700000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 16 }], "timestamp": 1381346760000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -865,21 +865,21 @@ func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) { engine := createEngine(c, `[ { "points": [ - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 } + { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 7 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 6 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 4 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 } ], "name": "foo", "fields": [ @@ -891,8 +891,8 @@ func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) { runQuery(engine, "select mode(column_one) from foo group by time(1m);", c, `[ { "points": [ - { "values": [{ "int64_value": 1 }], "timestamp": 1381346700, "sequence_number": 1 }, - { "values": [{ "int64_value": 3 }], "timestamp": 1381346760, "sequence_number": 1 } + { "values": [{ "int64_value": 1 }], "timestamp": 1381346700000000, "sequence_number": 1 }, + { "values": [{ "int64_value": 3 }], "timestamp": 1381346760000000, "sequence_number": 1 } ], "name": "foo", "fields": [ diff --git a/src/parser/query_api.go b/src/parser/query_api.go index 3f5ef8d062..276f501b9b 100644 --- a/src/parser/query_api.go +++ b/src/parser/query_api.go @@ -6,19 +6,13 @@ import ( "strconv" "strings" "time" + "unicode" ) // this file provides the high level api of the query object var ( - ZERO_TIME = time.Unix(0, 0) - charToPeriod = map[byte]int64{ - 's': int64(1), - 'm': int64(time.Minute / time.Second), - 'h': int64(time.Hour / time.Second), - 'd': int64(24 * time.Hour / time.Second), - 'w': int64(7 * 24 * time.Hour / time.Second), - } + ZERO_TIME = time.Unix(0, 0) ) func uniq(slice []string) []string { @@ -92,7 +86,7 @@ func (self *Query) GetEndTime() time.Time { func parseTime(expr *Expression) (int64, error) { if value, ok := expr.GetLeftValue(); ok { if value.IsFunctionCall() && value.Name == "now" { - return time.Now().Unix(), nil + return time.Now().UnixNano(), nil } if value.IsFunctionCall() { @@ -100,15 +94,34 @@ func parseTime(expr *Expression) (int64, error) { } name := value.Name - if period, ok := charToPeriod[name[len(name)-1]]; ok { - parsedInt, err := strconv.Atoi(name[:len(name)-1]) - if err != nil { - return 0, err - } - return int64(parsedInt) * period, nil + + parsedInt, err := strconv.ParseInt(name[:len(name)-1], 10, 64) + if err != nil { + return 0, err } - parsedInt, err := strconv.Atoi(name) + switch name[len(name)-1] { + case 'u': + return parsedInt * int64(time.Microsecond), nil + case 's': + return parsedInt * int64(time.Second), nil + case 'm': + return parsedInt * int64(time.Minute), nil + case 'h': + return parsedInt * int64(time.Hour), nil + case 'd': + return parsedInt * 24 * int64(time.Hour), nil + case 'w': + return parsedInt * 7 * 24 * int64(time.Hour), nil + } + + lastChar := name[len(name)-1] + if !unicode.IsDigit(rune(lastChar)) { + return 0, fmt.Errorf("Invalid character '%c'", lastChar) + } + + extraDigit := int64(lastChar - '0') + parsedInt = parsedInt*10 + extraDigit return int64(parsedInt), err } @@ -230,11 +243,11 @@ func getTime(condition *WhereCondition, isParsingStartTime bool) (*WhereConditio return nil, ZERO_TIME, fmt.Errorf("Cannot use time with '%s'", expr.Operation) } - seconds, err := parseTime(timeExpression) + nanoseconds, err := parseTime(timeExpression) if err != nil { return nil, ZERO_TIME, err } - return nil, time.Unix(seconds, 0), nil + return nil, time.Unix(nanoseconds/int64(time.Second), nanoseconds%int64(time.Second)), nil } leftCondition, _ := condition.GetLeftWhereCondition() diff --git a/src/protocol/protocol_extensions.go b/src/protocol/protocol_extensions.go index 45adfee774..2af31b8aa5 100644 --- a/src/protocol/protocol_extensions.go +++ b/src/protocol/protocol_extensions.go @@ -13,3 +13,11 @@ func UnmarshalPoint(data []byte) (point *Point, err error) { func MarshalPoint(point *Point) (data []byte, err error) { return proto.Marshal(point) } + +func (self *Point) GetTimestampInMicroseconds() *int64 { + return self.Timestamp +} + +func (self *Point) SetTimestampInMicroseconds(t int64) { + self.Timestamp = &t +}