fix . Dropping the database should delete all timeseries data from that database.

pull/47/head
John Shahid 2013-11-12 17:42:36 -05:00
parent 269cbd94ab
commit 10d7fba5ec
7 changed files with 144 additions and 9 deletions

View File

@ -96,6 +96,7 @@
- [Issue #36](https://github.com/influxdb/influxdb/issues/36). The regex operator should be =~ not ~=
- [Issue #39](https://github.com/influxdb/influxdb/issues/39). Return proper content types from the http api
- [Issue #42](https://github.com/influxdb/influxdb/issues/42). Make the api consistent with the docs
- [Issue #41](https://github.com/influxdb/influxdb/issues/41). Table/Points not deleted when database is dropped
### Deprecated

View File

@ -80,7 +80,11 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
return common.NewAuthorizationError("Insufficient permission to drop database")
}
return self.raftServer.DropDatabase(db)
if err := self.raftServer.DropDatabase(db); err != nil {
return err
}
return self.datastore.DropDatabase(db)
}
func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) {

View File

@ -42,7 +42,8 @@ const (
type DatastoreMock struct {
datastore.Datastore
Series *protocol.Series
Series *protocol.Series
DroppedDatabase string
}
func (self *DatastoreMock) WriteSeriesData(database string, series *protocol.Series) error {
@ -50,6 +51,11 @@ func (self *DatastoreMock) WriteSeriesData(database string, series *protocol.Ser
return nil
}
func (self *DatastoreMock) DropDatabase(database string) error {
self.DroppedDatabase = database
return nil
}
func stringToSeries(seriesString string, c *C) *protocol.Series {
series := &protocol.Series{}
err := json.Unmarshal([]byte(seriesString), &series)
@ -244,7 +250,7 @@ func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) {
servers := startAndVerifyCluster(1, c)
defer clean(servers...)
coordinator := NewCoordinatorImpl(nil, servers[0], servers[0].clusterConfig)
coordinator := NewCoordinatorImpl(&DatastoreMock{}, servers[0], servers[0].clusterConfig)
time.Sleep(REPLICATION_LAG)
@ -267,6 +273,7 @@ func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) {
// if the db is dropped it should remove the users as well
c.Assert(coordinator.DropDatabase(root, "db1"), IsNil)
c.Assert(coordinator.datastore.(*DatastoreMock).DroppedDatabase, Equals, "db1")
_, err = coordinator.AuthenticateDbUser("db1", "db_user", "pass")
c.Assert(err, ErrorMatches, ".*Invalid.*")
}

View File

@ -100,6 +100,43 @@ func (self *DatastoreSuite) TestPropagateErrorsProperly(c *C) {
c.Assert(err, ErrorMatches, "Whatever")
}
func (self *DatastoreSuite) TestDeletingData(c *C) {
cleanup(nil)
db := newDatastore(c)
defer cleanup(db)
mock := `
{
"points": [
{
"values": [
{
"int64_value": 3
}
],
"sequence_number": 1
}
],
"name": "foo",
"fields": ["value"]
}`
pointTime := time.Now().Unix()
series := stringToSeries(mock, pointTime, c)
err := db.WriteSeriesData("test", series)
c.Assert(err, IsNil)
q, err := parser.ParseQuery("select value from foo;")
c.Assert(err, IsNil)
yield := func(series *protocol.Series) error {
if len(series.Points) > 0 {
panic("Series contains points")
}
return nil
}
c.Assert(db.DropDatabase("test"), IsNil)
user := &MockUser{}
err = db.ExecuteQuery(user, "test", q, yield)
c.Assert(err, ErrorMatches, ".*Field value doesn't exist.*")
}
func (self *DatastoreSuite) TestCanWriteAndRetrievePoints(c *C) {
cleanup(nil)
db := newDatastore(c)

View File

@ -11,6 +11,7 @@ import (
type Datastore interface {
ExecuteQuery(user common.User, database string, query *parser.Query, yield func(*protocol.Series) error) error
WriteSeriesData(database string, series *protocol.Series) error
DropDatabase(database string) error
DeleteRangeOfSeries(database, series string, startTime, endTime time.Time) error
DeleteRangeOfRegex(user common.User, database string, regex *regexp.Regexp, startTime, endTime time.Time) error
Close()

View File

@ -112,6 +112,47 @@ func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol.
return self.db.Write(self.writeOptions, wb)
}
func (self *LevelDbDatastore) dropSeries(database, series string) error {
startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
wb := levigo.NewWriteBatch()
defer wb.Close()
for _, name := range self.getColumnNamesForSeries(database, series) {
if err := self.deleteRangeOfSeries(database, series, startTimeBytes, endTimeBytes); err != nil {
return err
}
indexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(database+"~"+series+"~"+name)...)
wb.Delete(indexKey)
}
// remove the column indeces for this time series
return self.db.Write(self.writeOptions, wb)
}
func (self *LevelDbDatastore) DropDatabase(database string) error {
wb := levigo.NewWriteBatch()
defer wb.Close()
err := self.getSeriesForDb(database, func(name string) error {
if err := self.dropSeries(database, name); err != nil {
return err
}
seriesKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~")...)
wb.Delete(seriesKey)
return nil
})
if err != nil {
return err
}
return self.db.Write(self.writeOptions, wb)
}
func (self *LevelDbDatastore) ExecuteQuery(user common.User, database string, query *parser.Query, yield func(*protocol.Series) error) error {
seriesAndColumns := query.GetReferencedColumns()
hasAccess := true
@ -154,13 +195,12 @@ func (self *LevelDbDatastore) Close() {
self.writeOptions = nil
}
func (self *LevelDbDatastore) DeleteRangeOfSeries(database, series string, startTime, endTime time.Time) error {
func (self *LevelDbDatastore) deleteRangeOfSeries(database, series string, startTimeBytes, endTimeBytes []byte) error {
columns := self.getColumnNamesForSeries(database, series)
fields, err := self.getFieldsForSeries(database, series, columns)
if err != nil {
return err
}
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime))
ro := levigo.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
@ -202,6 +242,11 @@ func (self *LevelDbDatastore) DeleteRangeOfSeries(database, series string, start
return nil
}
func (self *LevelDbDatastore) DeleteRangeOfSeries(database, series string, startTime, endTime time.Time) error {
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime))
return self.deleteRangeOfSeries(database, series, startTimeBytes, endTimeBytes)
}
func (self *LevelDbDatastore) DeleteRangeOfRegex(user common.User, database string, regex *regexp.Regexp, startTime, endTime time.Time) error {
series := self.getSeriesForDbAndRegex(database, regex)
hasAccess := true
@ -355,13 +400,12 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col
return yield(emptyResult)
}
func (self *LevelDbDatastore) getSeriesForDbAndRegex(database string, regex *regexp.Regexp) []string {
func (self *LevelDbDatastore) getSeriesForDb(database string, yield func(string) error) error {
it := self.db.NewIterator(self.readOptions)
defer it.Close()
seekKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~")...)
it.Seek(seekKey)
names := make([]string, 0)
dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX)
for it = it; it.Valid(); it.Next() {
key := it.Key()
@ -375,11 +419,22 @@ func (self *LevelDbDatastore) getSeriesForDbAndRegex(database string, regex *reg
break
}
name := parts[1]
if regex.MatchString(name) {
names = append(names, parts[1])
if err := yield(name); err != nil {
return err
}
}
}
return nil
}
func (self *LevelDbDatastore) getSeriesForDbAndRegex(database string, regex *regexp.Regexp) []string {
names := []string{}
self.getSeriesForDb(database, func(name string) error {
if regex.MatchString(name) {
names = append(names, name)
}
return nil
})
return names
}

View File

@ -297,6 +297,36 @@ func (self *IntegrationSuite) TestHttpPostWithTime(c *C) {
c.Assert(values["val2"], Equals, 2.0)
}
// test for issue #41
func (self *IntegrationSuite) TestDbDelete(c *C) {
err := self.server.WriteData(`
[
{
"name": "test_deletetions",
"columns": ["val1", "val2"],
"points":[["v1", 2]]
}
]`, "time_precision=s")
c.Assert(err, IsNil)
bs, err := self.server.RunQuery("select val1 from test_deletetions")
c.Assert(err, IsNil)
data := []*h.SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(data, HasLen, 1)
req, err := http.NewRequest("DELETE", "http://localhost:8086/db/db1?u=root&p=root", nil)
c.Assert(err, IsNil)
resp, err := http.DefaultClient.Do(req)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusNoContent)
// recreate the database and the user
c.Assert(self.createUser(), IsNil)
// this shouldn't return any data
bs, err = self.server.RunQuery("select val1 from test_deletetions")
c.Assert(err, ErrorMatches, ".*Field val1 doesn't exist.*")
}
func (self *IntegrationSuite) TestReading(c *C) {
if !*benchmark {
c.Skip("Benchmarking is disabled")