diff --git a/src/datastore/datastore_test.go b/src/datastore/datastore_test.go index 17a5df1386..8659e63539 100644 --- a/src/datastore/datastore_test.go +++ b/src/datastore/datastore_test.go @@ -502,6 +502,30 @@ func (self *DatastoreSuite) TestReturnsResultsInAscendingOrderWithNulls(c *C) { c.Assert(results[0], DeepEquals, series) } +func (self *DatastoreSuite) TestNullValues(c *C) { + cleanup(nil) + db := newDatastore(c) + defer cleanup(db) + + minuteAgo := time.Now().Add(-time.Minute).Unix() + mock := `{ + "points":[ + {"values":[null, {"string_value": "dix"}],"sequence_number":1}, + {"values":[{"string_value": "dix"}, null],"sequence_number":2}, + {"values":[null, {"string_value": "dix"}],"sequence_number":3}, + {"values":[{"string_value":"todd"}, null],"sequence_number":4}], + "name":"user_things", + "fields":["first_name", "last_name"] + }` + series := stringToSeries(mock, minuteAgo, c) + err := db.WriteSeriesData("foobar", series) + c.Assert(err, IsNil) + user := &MockUser{} + results := executeQuery(user, "foobar", "select * from user_things", db, c) + c.Assert(results, HasLen, 1) + c.Assert(results[0].Points, HasLen, 4) +} + func (self *DatastoreSuite) TestCanDeleteARangeOfData(c *C) { cleanup(nil) db := newDatastore(c) diff --git a/src/datastore/leveldb_datastore.go b/src/datastore/leveldb_datastore.go index 52e6ab39c7..bcc574e839 100644 --- a/src/datastore/leveldb_datastore.go +++ b/src/datastore/leveldb_datastore.go @@ -370,8 +370,6 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col // optimize for the case where we're pulling back only a single column or aggregate for { isValid := false - var pointTimeRaw []byte - var pointSequenceRaw []byte point := &protocol.Point{Values: make([]*protocol.FieldValue, fieldCount, fieldCount)} for i, it := range iterators { @@ -393,10 +391,23 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col sequenceNumber := key[16:] rawValue := &rawColumnValue{time: time, sequence: sequenceNumber, value: value} - pointTimeRaw, pointSequenceRaw = rawValue.updatePointTimeAndSequence(pointTimeRaw, pointSequenceRaw, query.Ascending) rawColumnValues[i] = rawValue } + var pointTimeRaw []byte + var pointSequenceRaw []byte + // choose the highest (or lowest in case of ascending queries) timestamp + // and sequence number. that will become the timestamp and sequence of + // the next point. + for _, value := range rawColumnValues { + if value == nil { + continue + } + + pointTimeRaw, pointSequenceRaw = value.updatePointTimeAndSequence(pointTimeRaw, + pointSequenceRaw, query.Ascending) + } + for i, iterator := range iterators { // if the value is nil, or doesn't match the point's timestamp and sequence number // then skip it