Fix another null value bug in the datastore. See DataStoreSuite.TestNullValues for more details

pull/57/head
John Shahid 2013-11-14 15:40:47 -05:00
parent 663f9fc22a
commit 0d8667e7fe
2 changed files with 38 additions and 3 deletions

View File

@ -502,6 +502,30 @@ func (self *DatastoreSuite) TestReturnsResultsInAscendingOrderWithNulls(c *C) {
c.Assert(results[0], DeepEquals, series)
}
func (self *DatastoreSuite) TestNullValues(c *C) {
cleanup(nil)
db := newDatastore(c)
defer cleanup(db)
minuteAgo := time.Now().Add(-time.Minute).Unix()
mock := `{
"points":[
{"values":[null, {"string_value": "dix"}],"sequence_number":1},
{"values":[{"string_value": "dix"}, null],"sequence_number":2},
{"values":[null, {"string_value": "dix"}],"sequence_number":3},
{"values":[{"string_value":"todd"}, null],"sequence_number":4}],
"name":"user_things",
"fields":["first_name", "last_name"]
}`
series := stringToSeries(mock, minuteAgo, c)
err := db.WriteSeriesData("foobar", series)
c.Assert(err, IsNil)
user := &MockUser{}
results := executeQuery(user, "foobar", "select * from user_things", db, c)
c.Assert(results, HasLen, 1)
c.Assert(results[0].Points, HasLen, 4)
}
func (self *DatastoreSuite) TestCanDeleteARangeOfData(c *C) {
cleanup(nil)
db := newDatastore(c)

View File

@ -370,8 +370,6 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col
// optimize for the case where we're pulling back only a single column or aggregate
for {
isValid := false
var pointTimeRaw []byte
var pointSequenceRaw []byte
point := &protocol.Point{Values: make([]*protocol.FieldValue, fieldCount, fieldCount)}
for i, it := range iterators {
@ -393,10 +391,23 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col
sequenceNumber := key[16:]
rawValue := &rawColumnValue{time: time, sequence: sequenceNumber, value: value}
pointTimeRaw, pointSequenceRaw = rawValue.updatePointTimeAndSequence(pointTimeRaw, pointSequenceRaw, query.Ascending)
rawColumnValues[i] = rawValue
}
var pointTimeRaw []byte
var pointSequenceRaw []byte
// choose the highest (or lowest in case of ascending queries) timestamp
// and sequence number. that will become the timestamp and sequence of
// the next point.
for _, value := range rawColumnValues {
if value == nil {
continue
}
pointTimeRaw, pointSequenceRaw = value.updatePointTimeAndSequence(pointTimeRaw,
pointSequenceRaw, query.Ascending)
}
for i, iterator := range iterators {
// if the value is nil, or doesn't match the point's timestamp and sequence number
// then skip it