Datastore will delete and compact all the column data for a given series and time range

pull/17/head
Paul Dix 2013-10-17 18:02:40 -04:00
parent 24502f9cbe
commit 77819383f4
3 changed files with 105 additions and 3 deletions

View File

@ -391,3 +391,41 @@ func (self *DatastoreSuite) TestReturnsResultsInAscendingOrder(c *C) {
c.Assert(*results.Points[1].Values[0].StringValue, Equals, "todd")
c.Assert(*results.Points[2].Values[0].StringValue, Equals, "john")
}
func (self *DatastoreSuite) TestCanDeleteARangeOfData(c *C) {
cleanup(nil)
db := newDatastore(c)
defer cleanup(db)
minutesAgo := time.Now().Add(-5 * time.Minute).Unix()
mock := `{
"points":[
{"values":[{"int_value":3},{"string_value":"paul"}],"sequence_number":2},
{"values":[{"int_value":1},{"string_value":"todd"}],"sequence_number":1}],
"name":"user_things",
"fields":[{"type":"INT32","name":"count"},{"type":"STRING","name":"name"}]
}`
series := stringToSeries(mock, minutesAgo, c)
err := db.WriteSeriesData("foobar", series)
c.Assert(err, IsNil)
results := executeQuery("foobar", "select count, name from user_things;", db, c)
c.Assert(results, DeepEquals, series)
mock = `{
"points":[
{"values":[{"int_value":3},{"string_value":"john"}],"sequence_number":1}],
"name":"user_things",
"fields":[{"type":"INT32","name":"count"},{"type":"STRING","name":"name"}]
}`
series = stringToSeries(mock, time.Now().Unix(), c)
err = db.WriteSeriesData("foobar", series)
c.Assert(err, IsNil)
results = executeQuery("foobar", "select count, name from user_things;", db, c)
c.Assert(len(results.Points), Equals, 3)
err = db.DeleteRangeOfSeries("foobar", "user_things", time.Now().Add(-time.Hour), time.Now().Add(-time.Minute))
c.Assert(err, IsNil)
results = executeQuery("foobar", "select count, name from user_things;", db, c)
c.Assert(len(results.Points), Equals, 1)
c.Assert(results, DeepEquals, series)
}

View File

@ -3,10 +3,14 @@ package datastore
import (
"parser"
"protocol"
"regexp"
"time"
)
type Datastore interface {
ExecuteQuery(database string, query *parser.Query, yield func(*protocol.Series) error) error
WriteSeriesData(database string, series *protocol.Series) error
DeleteRangeOfSeries(database, series string, startTime, endTime time.Time) error
DeleteRangeOfRegex(database string, regex regexp.Regexp, startTime, endTime time.Time) error
Close()
}

View File

@ -10,8 +10,10 @@ import (
"math"
"parser"
"protocol"
"regexp"
"strings"
"sync"
"time"
)
type LevelDbDatastore struct {
@ -132,15 +134,73 @@ func (self *LevelDbDatastore) Close() {
self.db.Close()
}
func (self *LevelDbDatastore) executeQueryForSeries(database, series string, columns []string, query *parser.Query, yield func(*protocol.Series) error) error {
startTime := query.GetStartTime().Unix()
func (self *LevelDbDatastore) DeleteRangeOfSeries(database, series string, startTime, endTime time.Time) error {
columns := self.getColumnNamesForSeries(database, series)
fields, err := self.getFieldsForSeries(database, series, columns)
if err != nil {
return err
}
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(startTime.Unix(), endTime.Unix())
ro := levigo.NewReadOptions()
defer ro.Close()
rangesToCompact := make([]*levigo.Range, 0)
for _, field := range fields {
it := self.db.NewIterator(ro)
defer it.Close()
wo := levigo.NewWriteOptions()
defer wo.Close()
wb := levigo.NewWriteBatch()
startKey := append(field.Id, startTimeBytes...)
endKey := startKey
it.Seek(startKey)
if it.Valid() {
if !bytes.Equal(it.Key()[:8], field.Id) {
it.Next()
if it.Valid() {
startKey = it.Key()
}
}
}
for it = it; it.Valid(); it.Next() {
k := it.Key()
if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 {
break
}
wb.Delete(k)
endKey = k
}
err = self.db.Write(wo, wb)
if err != nil {
return err
}
rangesToCompact = append(rangesToCompact, &levigo.Range{startKey, endKey})
}
for _, r := range rangesToCompact {
go func(r *levigo.Range) {
self.db.CompactRange(*r)
}(r)
}
return nil
}
func (self *LevelDbDatastore) DeleteRangeOfRegex(database string, regex regexp.Regexp, startTime, endTime time.Time) error {
return errors.New("Not implemented yet!")
}
func (self *LevelDbDatastore) byteArraysForStartAndEndTimes(startTime, endTime int64) ([]byte, []byte) {
startTimeBuffer := bytes.NewBuffer(make([]byte, 0, 8))
binary.Write(startTimeBuffer, binary.BigEndian, self.convertTimestampToUint(&startTime))
startTimeBytes := startTimeBuffer.Bytes()
endTime := query.GetEndTime().Unix()
endTimeBuffer := bytes.NewBuffer(make([]byte, 0, 8))
binary.Write(endTimeBuffer, binary.BigEndian, self.convertTimestampToUint(&endTime))
endTimeBytes := endTimeBuffer.Bytes()
return startTimeBytes, endTimeBytes
}
func (self *LevelDbDatastore) executeQueryForSeries(database, series string, columns []string, query *parser.Query, yield func(*protocol.Series) error) error {
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(query.GetStartTime().Unix(), query.GetEndTime().Unix())
fields, err := self.getFieldsForSeries(database, series, columns)
if err != nil {
return err