Merge pull request #233 from influxdb/limit-delete-batches
limit the number of keys in the delete batch.pull/240/head
commit
506a262cef
|
@ -621,12 +621,23 @@ func (self *LevelDbDatastore) deleteRangeOfSeriesCommon(database, series string,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
count := 0
|
||||||
for it = it; it.Valid(); it.Next() {
|
for it = it; it.Valid(); it.Next() {
|
||||||
k := it.Key()
|
k := it.Key()
|
||||||
if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 {
|
if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
wb.Delete(k)
|
wb.Delete(k)
|
||||||
|
count++
|
||||||
|
// delete every one million keys which is approximately 24 megabytes
|
||||||
|
if count == ONE_MEGABYTE {
|
||||||
|
err = self.db.Write(self.writeOptions, wb)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
wb.Clear()
|
||||||
|
count = 0
|
||||||
|
}
|
||||||
endKey = k
|
endKey = k
|
||||||
}
|
}
|
||||||
err = self.db.Write(self.writeOptions, wb)
|
err = self.db.Write(self.writeOptions, wb)
|
||||||
|
|
|
@ -935,6 +935,43 @@ func (self *IntegrationSuite) TestDeleteQuery(c *C) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *IntegrationSuite) TestLargeDeletes(c *C) {
|
||||||
|
numberOfPoints := 2 * 1024 * 1024
|
||||||
|
points := []interface{}{}
|
||||||
|
for i := 0; i < numberOfPoints; i++ {
|
||||||
|
points = append(points, []interface{}{i})
|
||||||
|
}
|
||||||
|
pointsString, _ := json.Marshal(points)
|
||||||
|
err := self.server.WriteData(fmt.Sprintf(`
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"name": "test_large_deletes",
|
||||||
|
"columns": ["val1"],
|
||||||
|
"points":%s
|
||||||
|
}
|
||||||
|
]`, string(pointsString)))
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
bs, err := self.server.RunQuery("select count(val1) from test_large_deletes", "m")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
data := []*h.SerializedSeries{}
|
||||||
|
err = json.Unmarshal(bs, &data)
|
||||||
|
c.Assert(data, HasLen, 1)
|
||||||
|
c.Assert(data[0].Points, HasLen, 1)
|
||||||
|
c.Assert(data[0].Points[0][1], Equals, float64(numberOfPoints))
|
||||||
|
|
||||||
|
query := "delete from test_large_deletes"
|
||||||
|
_, err = self.server.RunQuery(query, "m")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
// this shouldn't return any data
|
||||||
|
bs, err = self.server.RunQuery("select count(val1) from test_large_deletes", "m")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
data = []*h.SerializedSeries{}
|
||||||
|
err = json.Unmarshal(bs, &data)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(data, HasLen, 0)
|
||||||
|
}
|
||||||
|
|
||||||
func (self *IntegrationSuite) TestReading(c *C) {
|
func (self *IntegrationSuite) TestReading(c *C) {
|
||||||
if !*benchmark {
|
if !*benchmark {
|
||||||
c.Skip("Benchmarking is disabled")
|
c.Skip("Benchmarking is disabled")
|
||||||
|
|
Loading…
Reference in New Issue