timestamp madness. Fix the aggregators and the datastore to use microseconds.

pull/17/head
John Shahid 2013-10-18 16:25:33 -04:00
parent f530dba9ef
commit 41a2c8a8f3
8 changed files with 197 additions and 151 deletions

11
src/common/time.go Normal file
View File

@ -0,0 +1,11 @@
package common
import (
"time"
)
type EnhancedTime time.Time
func TimeToMicroseconds(t time.Time) int64 {
return t.Unix()*int64(time.Second/time.Microsecond) + int64(t.Nanosecond())/int64(time.Microsecond)
}

View File

@ -41,6 +41,7 @@ func stringToSeries(seriesString string, timestamp int64, c *C) *protocol.Series
series := &protocol.Series{}
err := json.Unmarshal([]byte(seriesString), &series)
c.Assert(err, IsNil)
timestamp *= 1000000
for _, point := range series.Points {
point.Timestamp = &timestamp
}
@ -110,8 +111,8 @@ func (self *DatastoreSuite) TestCanWriteAndRetrievePoints(c *C) {
c.Assert(len(resultSeries.Fields), Equals, 1)
c.Assert(*resultSeries.Points[0].SequenceNumber, Equals, uint32(2))
c.Assert(*resultSeries.Points[1].SequenceNumber, Equals, uint32(1))
c.Assert(*resultSeries.Points[0].Timestamp, Equals, pointTime)
c.Assert(*resultSeries.Points[1].Timestamp, Equals, pointTime)
c.Assert(*resultSeries.Points[0].GetTimestampInMicroseconds(), Equals, pointTime*1000000)
c.Assert(*resultSeries.Points[1].GetTimestampInMicroseconds(), Equals, pointTime*1000000)
c.Assert(*resultSeries.Points[0].Values[0].Int64Value, Equals, int64(2))
c.Assert(*resultSeries.Points[1].Values[0].Int64Value, Equals, int64(3))
c.Assert(resultSeries, Not(DeepEquals), series)
@ -192,8 +193,8 @@ func (self *DatastoreSuite) TestCanWriteDataWithDifferentTimesAndSeries(c *C) {
c.Assert(len(results.Fields), Equals, 1)
c.Assert(*results.Points[0].SequenceNumber, Equals, uint32(1))
c.Assert(*results.Points[1].SequenceNumber, Equals, uint32(3))
c.Assert(*results.Points[0].Timestamp, Equals, now)
c.Assert(*results.Points[1].Timestamp, Equals, secondAgo)
c.Assert(*results.Points[0].GetTimestampInMicroseconds(), Equals, now*1000000)
c.Assert(*results.Points[1].GetTimestampInMicroseconds(), Equals, secondAgo*1000000)
c.Assert(*results.Points[0].Values[0].DoubleValue, Equals, float64(0.1))
c.Assert(*results.Points[1].Values[0].DoubleValue, Equals, float64(23.2))
results = executeQuery("db1", "select val from foo;", db, c)
@ -259,8 +260,8 @@ func (self *DatastoreSuite) TestCanQueryBasedOnTime(c *C) {
c.Assert(len(results.Fields), Equals, 1)
c.Assert(*results.Points[0].SequenceNumber, Equals, uint32(3))
c.Assert(*results.Points[1].SequenceNumber, Equals, uint32(3))
c.Assert(*results.Points[0].Timestamp, Equals, now)
c.Assert(*results.Points[1].Timestamp, Equals, minutesAgo)
c.Assert(*results.Points[0].GetTimestampInMicroseconds(), Equals, now*1000000)
c.Assert(*results.Points[1].GetTimestampInMicroseconds(), Equals, minutesAgo*1000000)
c.Assert(*results.Points[0].Values[0].Int64Value, Equals, int64(3))
c.Assert(*results.Points[1].Values[0].Int64Value, Equals, int64(4))
}

View File

@ -3,6 +3,7 @@ package datastore
import (
"bytes"
"code.google.com/p/goprotobuf/proto"
"common"
"encoding/binary"
"errors"
"fmt"
@ -108,7 +109,7 @@ func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol.
for _, point := range series.Points {
timestampBuffer := bytes.NewBuffer(make([]byte, 0, 8))
sequenceNumberBuffer := bytes.NewBuffer(make([]byte, 0, 8))
binary.Write(timestampBuffer, binary.BigEndian, self.convertTimestampToUint(point.Timestamp))
binary.Write(timestampBuffer, binary.BigEndian, self.convertTimestampToUint(point.GetTimestampInMicroseconds()))
binary.Write(sequenceNumberBuffer, binary.BigEndian, uint64(*point.SequenceNumber))
pointKey := append(append(id, timestampBuffer.Bytes()...), sequenceNumberBuffer.Bytes()...)
data, err2 := proto.Marshal(point.Values[fieldIndex])
@ -152,7 +153,7 @@ func (self *LevelDbDatastore) DeleteRangeOfSeries(database, series string, start
if err != nil {
return err
}
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(startTime, endTime)
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime))
ro := levigo.NewReadOptions()
defer ro.Close()
rangesToCompact := make([]*levigo.Range, 0)
@ -206,6 +207,10 @@ func (self *LevelDbDatastore) DeleteRangeOfRegex(database string, regex *regexp.
}
func (self *LevelDbDatastore) byteArraysForStartAndEndTimes(startTime, endTime int64) ([]byte, []byte) {
if startTime < 1382361894000 {
panic("wtf")
}
startTimeBuffer := bytes.NewBuffer(make([]byte, 0, 8))
binary.Write(startTimeBuffer, binary.BigEndian, self.convertTimestampToUint(&startTime))
startTimeBytes := startTimeBuffer.Bytes()
@ -216,7 +221,7 @@ func (self *LevelDbDatastore) byteArraysForStartAndEndTimes(startTime, endTime i
}
func (self *LevelDbDatastore) executeQueryForSeries(database, series string, columns []string, query *parser.Query, yield func(*protocol.Series) error) error {
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(query.GetStartTime().Unix(), query.GetEndTime().Unix())
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(query.GetStartTime()), common.TimeToMicroseconds(query.GetEndTime()))
fields, err := self.getFieldsForSeries(database, series, columns)
if err != nil {
@ -306,7 +311,7 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col
var sequence uint64
binary.Read(bytes.NewBuffer(rawColumnValues[i].sequence), binary.BigEndian, &sequence)
seq32 := uint32(sequence)
point.Timestamp = &time
point.SetTimestampInMicroseconds(time)
point.SequenceNumber = &seq32
rawColumnValues[i] = nil
}

View File

@ -83,7 +83,7 @@ func NewCountAggregator(*parser.Query, *parser.Value) (Aggregator, error) {
//
type TimestampAggregator struct {
duration *time.Duration
duration *int64
timestamps map[string]map[interface{}]int64
}
@ -94,9 +94,9 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{}
self.timestamps[series] = timestamps
}
if self.duration != nil {
timestamps[group] = time.Unix(*p.Timestamp, 0).Round(*self.duration).Unix()
timestamps[group] = *p.GetTimestampInMicroseconds() / *self.duration * *self.duration
} else {
timestamps[group] = *p.Timestamp
timestamps[group] = *p.GetTimestampInMicroseconds()
}
return nil
}
@ -125,9 +125,16 @@ func NewTimestampAggregator(query *parser.Query, _ *parser.Value) (Aggregator, e
return nil, err
}
var durationPtr *int64
if duration != nil {
newDuration := int64(*duration / time.Microsecond)
durationPtr = &newDuration
}
return &TimestampAggregator{
timestamps: make(map[string]map[interface{}]int64),
duration: duration,
duration: durationPtr,
}, nil
}

View File

@ -70,7 +70,8 @@ func getValueFromPoint(value *protocol.FieldValue, fType protocol.FieldDefinitio
}
func getTimestampFromPoint(window time.Duration, point *protocol.Point) int64 {
return time.Unix(*point.Timestamp, 0).Round(window).Unix()
multiplier := int64(window / time.Microsecond)
return *point.GetTimestampInMicroseconds() / int64(multiplier) * int64(multiplier)
}
type Mapper func(*protocol.Point) interface{}
@ -287,10 +288,10 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(database string, query *pa
timestamp := *timestampAggregator.GetValue(table, groupId)[0].Int64Value
/* groupPoints := []*protocol.Point{} */
point := &protocol.Point{
Timestamp: &timestamp,
SequenceNumber: &sequenceNumber,
Values: []*protocol.FieldValue{},
}
point.SetTimestampInMicroseconds(timestamp)
for _, aggregator := range aggregators {
// point.Values = append(point.Values, aggregator.GetValue(table, groupId)[0])

View File

@ -104,7 +104,7 @@ func (self *EngineSuite) TestBasicQuery(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -137,7 +137,7 @@ func (self *EngineSuite) TestCountQuery(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
},
{
@ -146,7 +146,7 @@ func (self *EngineSuite) TestCountQuery(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 2
}
],
@ -170,7 +170,7 @@ func (self *EngineSuite) TestCountQuery(c *C) {
"int64_value": 2
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -199,7 +199,7 @@ func (self *EngineSuite) TestCountQueryWithRegexTables(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -219,7 +219,7 @@ func (self *EngineSuite) TestCountQueryWithRegexTables(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -243,7 +243,7 @@ func (self *EngineSuite) TestCountQueryWithRegexTables(c *C) {
"int64_value": 1
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -263,7 +263,7 @@ func (self *EngineSuite) TestCountQueryWithRegexTables(c *C) {
"int64_value": 1
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -292,7 +292,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
},
{
@ -301,7 +301,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) {
"string_value": "another_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -328,7 +328,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
},
{
@ -340,7 +340,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClause(c *C) {
"string_value": "another_value"
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -396,7 +396,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C
"int64_value": 1
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
},
{
@ -408,7 +408,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C
"int64_value": 2
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
},
{
@ -420,7 +420,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C
"int64_value": 1
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -455,7 +455,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C
"int64_value": 1
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
},
{
@ -470,7 +470,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C
"int64_value": 2
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
},
{
@ -485,7 +485,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByClauseWithMultipleColumns(c *C
"int64_value": 1
}
],
"timestamp": 1381346631,
"timestamp": 1381346631000000,
"sequence_number": 1
}
],
@ -521,7 +521,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346641,
"timestamp": 1381346641000000,
"sequence_number": 1
},
{
@ -530,7 +530,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) {
"string_value": "another_value"
}
],
"timestamp": 1381346701,
"timestamp": 1381346701000000,
"sequence_number": 1
},
{
@ -539,7 +539,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) {
"string_value": "some_value"
}
],
"timestamp": 1381346721,
"timestamp": 1381346721000000,
"sequence_number": 1
}
],
@ -563,7 +563,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) {
"int64_value": 1
}
],
"timestamp": 1381346640,
"timestamp": 1381346640000000,
"sequence_number": 1
},
{
@ -572,7 +572,7 @@ func (self *EngineSuite) TestCountQueryWithGroupByTime(c *C) {
"int64_value": 2
}
],
"timestamp": 1381346700,
"timestamp": 1381346700000000,
"sequence_number": 1
}
],
@ -592,9 +592,9 @@ func (self *EngineSuite) TestCountQueryWithGroupByTimeAndColumn(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "string_value": "some_value" }], "timestamp": 1381346641, "sequence_number": 1 },
{ "values": [{ "string_value": "another_value" }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "string_value": "some_value" }], "timestamp": 1381346721, "sequence_number": 1 }
{ "values": [{ "string_value": "some_value" }], "timestamp": 1381346641000000, "sequence_number": 1 },
{ "values": [{ "string_value": "another_value" }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "string_value": "some_value" }], "timestamp": 1381346721000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -606,9 +606,9 @@ func (self *EngineSuite) TestCountQueryWithGroupByTimeAndColumn(c *C) {
runQuery(engine, "select count(*), column_one from foo group by time(1m), column_one;", c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }, { "string_value": "some_value" }], "timestamp": 1381346640, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }, { "string_value": "another_value" }], "timestamp": 1381346700, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }, { "string_value": "some_value" }], "timestamp": 1381346700, "sequence_number": 1 }
{ "values": [{ "int64_value": 1 }, { "string_value": "some_value" }], "timestamp": 1381346640000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }, { "string_value": "another_value" }], "timestamp": 1381346700000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }, { "string_value": "some_value" }], "timestamp": 1381346700000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -623,9 +623,9 @@ func (self *EngineSuite) TestMinQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346641, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346721, "sequence_number": 1 }
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346641000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346721000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -637,8 +637,8 @@ func (self *EngineSuite) TestMinQueryWithGroupByTime(c *C) {
runQuery(engine, "select min(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346640, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346700, "sequence_number": 1 }
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346640000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346700000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -652,9 +652,9 @@ func (self *EngineSuite) TestMaxQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346641, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346721, "sequence_number": 1 }
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346641000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346721000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -666,8 +666,8 @@ func (self *EngineSuite) TestMaxQueryWithGroupByTime(c *C) {
runQuery(engine, "select max(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346640, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346700, "sequence_number": 1 }
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346640000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346700000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -682,9 +682,9 @@ func (self *EngineSuite) TestMaxMinQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346641, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346721, "sequence_number": 1 }
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346641000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346721000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -696,8 +696,8 @@ func (self *EngineSuite) TestMaxMinQueryWithGroupByTime(c *C) {
runQuery(engine, "select max(column_one), min(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int64_value": 3 }, { "int64_value": 3 }], "timestamp": 1381346640, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }, { "int64_value": 4 }], "timestamp": 1381346700, "sequence_number": 1 }
{ "values": [{ "int64_value": 3 }, { "int64_value": 3 }], "timestamp": 1381346640000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }, { "int64_value": 4 }], "timestamp": 1381346700000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -713,20 +713,20 @@ func (self *EngineSuite) TestPercentileQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 2 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 9 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 2 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 9 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -738,8 +738,8 @@ func (self *EngineSuite) TestPercentileQueryWithGroupByTime(c *C) {
runQuery(engine, "select percentile(column_one, 80) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346700, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346760, "sequence_number": 1 }
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346700000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346760000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -753,20 +753,20 @@ func (self *EngineSuite) TestMedianQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 2 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 9 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346741, "sequence_number": 1 }
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 2 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 9 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346771000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -778,8 +778,8 @@ func (self *EngineSuite) TestMedianQueryWithGroupByTime(c *C) {
runQuery(engine, "select median(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346700, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346760, "sequence_number": 1 }
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346700000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346760000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -793,20 +793,20 @@ func (self *EngineSuite) TestMeanQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 2 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 9 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 2 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 9 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -818,8 +818,8 @@ func (self *EngineSuite) TestMeanQueryWithGroupByTime(c *C) {
runQuery(engine, "select mean(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "double_value": 4 }], "timestamp": 1381346700, "sequence_number": 1 },
{ "values": [{ "double_value": 6 }], "timestamp": 1381346760, "sequence_number": 1 }
{ "values": [{ "double_value": 4 }], "timestamp": 1381346700000000, "sequence_number": 1 },
{ "values": [{ "double_value": 6 }], "timestamp": 1381346760000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -833,12 +833,12 @@ func (self *EngineSuite) TestSumQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -850,8 +850,8 @@ func (self *EngineSuite) TestSumQueryWithGroupByTime(c *C) {
runQuery(engine, "select sum(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int64_value": 11 }], "timestamp": 1381346700, "sequence_number": 1 },
{ "values": [{ "int64_value": 16 }], "timestamp": 1381346760, "sequence_number": 1 }
{ "values": [{ "int64_value": 11 }], "timestamp": 1381346700000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 16 }], "timestamp": 1381346760000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -865,21 +865,21 @@ func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) {
engine := createEngine(c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346741, "sequence_number": 1 }
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346701000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 8 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 7 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 6 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 5 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 4 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346771000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [
@ -891,8 +891,8 @@ func (self *EngineSuite) TestModeQueryWithGroupByTime(c *C) {
runQuery(engine, "select mode(column_one) from foo group by time(1m);", c, `[
{
"points": [
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346700, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346760, "sequence_number": 1 }
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346700000000, "sequence_number": 1 },
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346760000000, "sequence_number": 1 }
],
"name": "foo",
"fields": [

View File

@ -6,19 +6,13 @@ import (
"strconv"
"strings"
"time"
"unicode"
)
// this file provides the high level api of the query object
var (
ZERO_TIME = time.Unix(0, 0)
charToPeriod = map[byte]int64{
's': int64(1),
'm': int64(time.Minute / time.Second),
'h': int64(time.Hour / time.Second),
'd': int64(24 * time.Hour / time.Second),
'w': int64(7 * 24 * time.Hour / time.Second),
}
ZERO_TIME = time.Unix(0, 0)
)
func uniq(slice []string) []string {
@ -92,7 +86,7 @@ func (self *Query) GetEndTime() time.Time {
func parseTime(expr *Expression) (int64, error) {
if value, ok := expr.GetLeftValue(); ok {
if value.IsFunctionCall() && value.Name == "now" {
return time.Now().Unix(), nil
return time.Now().UnixNano(), nil
}
if value.IsFunctionCall() {
@ -100,15 +94,34 @@ func parseTime(expr *Expression) (int64, error) {
}
name := value.Name
if period, ok := charToPeriod[name[len(name)-1]]; ok {
parsedInt, err := strconv.Atoi(name[:len(name)-1])
if err != nil {
return 0, err
}
return int64(parsedInt) * period, nil
parsedInt, err := strconv.ParseInt(name[:len(name)-1], 10, 64)
if err != nil {
return 0, err
}
parsedInt, err := strconv.Atoi(name)
switch name[len(name)-1] {
case 'u':
return parsedInt * int64(time.Microsecond), nil
case 's':
return parsedInt * int64(time.Second), nil
case 'm':
return parsedInt * int64(time.Minute), nil
case 'h':
return parsedInt * int64(time.Hour), nil
case 'd':
return parsedInt * 24 * int64(time.Hour), nil
case 'w':
return parsedInt * 7 * 24 * int64(time.Hour), nil
}
lastChar := name[len(name)-1]
if !unicode.IsDigit(rune(lastChar)) {
return 0, fmt.Errorf("Invalid character '%c'", lastChar)
}
extraDigit := int64(lastChar - '0')
parsedInt = parsedInt*10 + extraDigit
return int64(parsedInt), err
}
@ -230,11 +243,11 @@ func getTime(condition *WhereCondition, isParsingStartTime bool) (*WhereConditio
return nil, ZERO_TIME, fmt.Errorf("Cannot use time with '%s'", expr.Operation)
}
seconds, err := parseTime(timeExpression)
nanoseconds, err := parseTime(timeExpression)
if err != nil {
return nil, ZERO_TIME, err
}
return nil, time.Unix(seconds, 0), nil
return nil, time.Unix(nanoseconds/int64(time.Second), nanoseconds%int64(time.Second)), nil
}
leftCondition, _ := condition.GetLeftWhereCondition()

View File

@ -13,3 +13,11 @@ func UnmarshalPoint(data []byte) (point *Point, err error) {
func MarshalPoint(point *Point) (data []byte, err error) {
return proto.Marshal(point)
}
func (self *Point) GetTimestampInMicroseconds() *int64 {
return self.Timestamp
}
func (self *Point) SetTimestampInMicroseconds(t int64) {
self.Timestamp = &t
}