Merge pull request #10517 from influxdata/jmw-always-cleanup-fields-index
tsdb: clean up fields index for every kind of deletepull/10530/head
commit
cca97bf9b9
|
@ -1667,8 +1667,19 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
|||
ids.Add(sid)
|
||||
}
|
||||
|
||||
fielsetChanged := false
|
||||
for k := range measurements {
|
||||
if err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil {
|
||||
if dropped, err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil {
|
||||
return err
|
||||
} else if dropped {
|
||||
if err := e.cleanupMeasurement([]byte(k)); err != nil {
|
||||
return err
|
||||
}
|
||||
fielsetChanged = true
|
||||
}
|
||||
}
|
||||
if fielsetChanged {
|
||||
if err := e.fieldset.Save(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -1688,9 +1699,6 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
|||
name, tags := e.sfile.Series(id)
|
||||
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
|
||||
err = err1
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1711,13 +1719,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// DeleteMeasurement deletes a measurement and all related series.
|
||||
func (e *Engine) DeleteMeasurement(name []byte) error {
|
||||
// Delete the bulk of data outside of the fields lock.
|
||||
if err := e.deleteMeasurement(name); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *Engine) cleanupMeasurement(name []byte) error {
|
||||
// A sentinel error message to cause DeleteWithLock to not delete the measurement
|
||||
abortErr := fmt.Errorf("measurements still exist")
|
||||
|
||||
|
@ -1726,10 +1728,11 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
|
|||
// were writes to the measurement while we are deleting it.
|
||||
if err := e.fieldset.DeleteWithLock(string(name), func() error {
|
||||
encodedName := models.EscapeMeasurement(name)
|
||||
sep := len(encodedName)
|
||||
|
||||
// First scan the cache to see if any series exists for this measurement.
|
||||
if err := e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
|
||||
if bytes.HasPrefix(k, encodedName) {
|
||||
if bytes.HasPrefix(k, encodedName) && (k[sep] == ',' || k[sep] == keyFieldSeparator[0]) {
|
||||
return abortErr
|
||||
}
|
||||
return nil
|
||||
|
@ -1738,8 +1741,8 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
|
|||
}
|
||||
|
||||
// Check the filestore.
|
||||
return e.FileStore.WalkKeys(name, func(k []byte, typ byte) error {
|
||||
if bytes.HasPrefix(k, encodedName) {
|
||||
return e.FileStore.WalkKeys(name, func(k []byte, _ byte) error {
|
||||
if bytes.HasPrefix(k, encodedName) && (k[sep] == ',' || k[sep] == keyFieldSeparator[0]) {
|
||||
return abortErr
|
||||
}
|
||||
return nil
|
||||
|
@ -1750,11 +1753,11 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return e.fieldset.Save()
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteMeasurement deletes a measurement and all related series.
|
||||
func (e *Engine) deleteMeasurement(name []byte) error {
|
||||
func (e *Engine) DeleteMeasurement(name []byte) error {
|
||||
// Attempt to find the series keys.
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
itr, err := indexSet.MeasurementSeriesByExprIterator(name, nil)
|
||||
|
|
|
@ -42,7 +42,7 @@ type Index interface {
|
|||
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
|
||||
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
|
||||
DropSeries(seriesID uint64, key []byte, cascade bool) error
|
||||
DropMeasurementIfSeriesNotExist(name []byte) error
|
||||
DropMeasurementIfSeriesNotExist(name []byte) (bool, error)
|
||||
|
||||
// Used to clean up series in inmem index that were dropped with a shard.
|
||||
DropSeriesGlobal(key []byte) error
|
||||
|
|
|
@ -186,7 +186,9 @@ func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) {
|
|||
|
||||
// CreateSeriesListIfNotExists adds the series for the given measurement to the
|
||||
// index and sets its ID or returns the existing series object
|
||||
func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, keys, names [][]byte, tagsSlice []models.Tags, opt *tsdb.EngineOptions, ignoreLimits bool) error {
|
||||
func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measurements map[string]int,
|
||||
keys, names [][]byte, tagsSlice []models.Tags, opt *tsdb.EngineOptions, ignoreLimits bool) error {
|
||||
|
||||
seriesIDs, err := i.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -212,6 +214,7 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, keys,
|
|||
seriesIDSet.Lock()
|
||||
if !seriesIDSet.ContainsNoLock(ss.ID) {
|
||||
seriesIDSet.AddNoLock(ss.ID)
|
||||
measurements[ss.Measurement.Name]++
|
||||
}
|
||||
seriesIDSet.Unlock()
|
||||
}
|
||||
|
@ -247,6 +250,7 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, keys,
|
|||
seriesIDSet.Lock()
|
||||
if !seriesIDSet.ContainsNoLock(ss.ID) {
|
||||
seriesIDSet.AddNoLock(ss.ID)
|
||||
measurements[ss.Measurement.Name]++
|
||||
}
|
||||
seriesIDSet.Unlock()
|
||||
}
|
||||
|
@ -278,7 +282,10 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, keys,
|
|||
i.seriesSketch.Add(key)
|
||||
|
||||
// This series needs to be added to the bitset tracking undeleted series IDs.
|
||||
seriesIDSet.Add(seriesIDs[j])
|
||||
seriesIDSet.Lock()
|
||||
seriesIDSet.AddNoLock(seriesIDs[j])
|
||||
measurements[mms[j].Name]++
|
||||
seriesIDSet.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -756,20 +763,20 @@ func (i *Index) dropMeasurement(name string) error {
|
|||
|
||||
// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
|
||||
// series for the measurment.
|
||||
func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error {
|
||||
func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
m := i.measurements[string(name)]
|
||||
if m == nil {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if m.HasSeries() {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return i.dropMeasurement(string(name))
|
||||
return true, i.dropMeasurement(string(name))
|
||||
}
|
||||
|
||||
// DropSeriesGlobal removes the series key and its tags from the index.
|
||||
|
@ -1054,7 +1061,9 @@ func (i *Index) Rebuild() {
|
|||
|
||||
// assignExistingSeries assigns the existing series to shardID and returns the series, names and tags that
|
||||
// do not exists yet.
|
||||
func (i *Index) assignExistingSeries(shardID uint64, seriesIDSet *tsdb.SeriesIDSet, keys, names [][]byte, tagsSlice []models.Tags) ([][]byte, [][]byte, []models.Tags) {
|
||||
func (i *Index) assignExistingSeries(shardID uint64, seriesIDSet *tsdb.SeriesIDSet, measurements map[string]int,
|
||||
keys, names [][]byte, tagsSlice []models.Tags) ([][]byte, [][]byte, []models.Tags) {
|
||||
|
||||
i.mu.RLock()
|
||||
var n int
|
||||
for j, key := range keys {
|
||||
|
@ -1070,6 +1079,7 @@ func (i *Index) assignExistingSeries(shardID uint64, seriesIDSet *tsdb.SeriesIDS
|
|||
seriesIDSet.Lock()
|
||||
if !seriesIDSet.ContainsNoLock(ss.ID) {
|
||||
seriesIDSet.AddNoLock(ss.ID)
|
||||
measurements[string(names[j])]++
|
||||
}
|
||||
seriesIDSet.Unlock()
|
||||
}
|
||||
|
@ -1093,16 +1103,27 @@ type ShardIndex struct {
|
|||
// Bitset storing all undeleted series IDs associated with this shard.
|
||||
seriesIDSet *tsdb.SeriesIDSet
|
||||
|
||||
// mapping of measurements to the count of series ids in the set. protected
|
||||
// by the seriesIDSet lock.
|
||||
measurements map[string]int
|
||||
|
||||
opt tsdb.EngineOptions
|
||||
}
|
||||
|
||||
// DropSeries removes the provided series id from the local bitset that tracks
|
||||
// series in this shard only.
|
||||
func (idx *ShardIndex) DropSeries(seriesID uint64, _ []byte, _ bool) error {
|
||||
func (idx *ShardIndex) DropSeries(seriesID uint64, key []byte, _ bool) error {
|
||||
// Remove from shard-local bitset if it exists.
|
||||
idx.seriesIDSet.Lock()
|
||||
if idx.seriesIDSet.ContainsNoLock(seriesID) {
|
||||
idx.seriesIDSet.RemoveNoLock(seriesID)
|
||||
|
||||
name := models.ParseName(key)
|
||||
if curr := idx.measurements[string(name)]; curr <= 1 {
|
||||
delete(idx.measurements, string(name))
|
||||
} else {
|
||||
idx.measurements[string(name)] = curr - 1
|
||||
}
|
||||
}
|
||||
idx.seriesIDSet.Unlock()
|
||||
return nil
|
||||
|
@ -1110,13 +1131,23 @@ func (idx *ShardIndex) DropSeries(seriesID uint64, _ []byte, _ bool) error {
|
|||
|
||||
// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
|
||||
// series for the measurment.
|
||||
func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) error {
|
||||
return idx.Index.DropMeasurementIfSeriesNotExist(name)
|
||||
func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {
|
||||
idx.seriesIDSet.Lock()
|
||||
curr := idx.measurements[string(name)]
|
||||
idx.seriesIDSet.Unlock()
|
||||
if curr > 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// we always report the measurement was dropped if it does not exist in our
|
||||
// measurements mapping.
|
||||
_, err := idx.Index.DropMeasurementIfSeriesNotExist(name)
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
|
||||
func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error {
|
||||
keys, names, tagsSlice = idx.assignExistingSeries(idx.id, idx.seriesIDSet, keys, names, tagsSlice)
|
||||
keys, names, tagsSlice = idx.assignExistingSeries(idx.id, idx.seriesIDSet, idx.measurements, keys, names, tagsSlice)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -1165,7 +1196,7 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
|
|||
keys, names, tagsSlice = keys[:n], names[:n], tagsSlice[:n]
|
||||
}
|
||||
|
||||
if err := idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, keys, names, tagsSlice, &idx.opt, idx.opt.Config.MaxSeriesPerDatabase == 0); err != nil {
|
||||
if err := idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, idx.measurements, keys, names, tagsSlice, &idx.opt, idx.opt.Config.MaxSeriesPerDatabase == 0); err != nil {
|
||||
reason = err.Error()
|
||||
droppedKeys = append(droppedKeys, keys...)
|
||||
}
|
||||
|
@ -1174,7 +1205,7 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
|
|||
if len(droppedKeys) > 0 {
|
||||
dropped := len(droppedKeys) // number dropped before deduping
|
||||
bytesutil.SortDedup(droppedKeys)
|
||||
return &tsdb.PartialWriteError{
|
||||
return tsdb.PartialWriteError{
|
||||
Reason: reason,
|
||||
Dropped: dropped,
|
||||
DroppedKeys: droppedKeys,
|
||||
|
@ -1194,13 +1225,13 @@ func (idx *ShardIndex) SeriesN() int64 {
|
|||
// InitializeSeries is called during start-up.
|
||||
// This works the same as CreateSeriesListIfNotExists except it ignore limit errors.
|
||||
func (idx *ShardIndex) InitializeSeries(keys, names [][]byte, tags []models.Tags) error {
|
||||
return idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, keys, names, tags, &idx.opt, true)
|
||||
return idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, idx.measurements, keys, names, tags, &idx.opt, true)
|
||||
}
|
||||
|
||||
// CreateSeriesIfNotExists creates the provided series on the index if it is not
|
||||
// already present.
|
||||
func (idx *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
|
||||
return idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, false)
|
||||
return idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, idx.measurements, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, false)
|
||||
}
|
||||
|
||||
// TagSets returns a list of tag sets based on series filtering.
|
||||
|
@ -1216,10 +1247,11 @@ func (idx *ShardIndex) SeriesIDSet() *tsdb.SeriesIDSet {
|
|||
// NewShardIndex returns a new index for a shard.
|
||||
func NewShardIndex(id uint64, seriesIDSet *tsdb.SeriesIDSet, opt tsdb.EngineOptions) tsdb.Index {
|
||||
return &ShardIndex{
|
||||
Index: opt.InmemIndex.(*Index),
|
||||
id: id,
|
||||
seriesIDSet: seriesIDSet,
|
||||
opt: opt,
|
||||
Index: opt.InmemIndex.(*Index),
|
||||
id: id,
|
||||
seriesIDSet: seriesIDSet,
|
||||
measurements: make(map[string]int),
|
||||
opt: opt,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -115,6 +115,47 @@ func TestIndex_Bytes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIndex_MeasurementTracking(t *testing.T) {
|
||||
sfile := mustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
||||
s1 := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
|
||||
s2 := inmem.NewShardIndex(2, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
|
||||
b := func(s string) []byte { return []byte(s) }
|
||||
mt := func(k, v string) models.Tag { return models.Tag{Key: b(k), Value: b(v)} }
|
||||
|
||||
s1.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")})
|
||||
s1.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")})
|
||||
s2.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")})
|
||||
s2.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")})
|
||||
series1, _ := s1.Series(b("m,t=t1"))
|
||||
series2, _ := s1.Series(b("m,t=t2"))
|
||||
|
||||
if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
|
||||
s1.DropSeries(series1.ID, b(series1.Key), false)
|
||||
s1.DropSeries(series2.ID, b(series2.Key), false)
|
||||
|
||||
if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
|
||||
s2.DropSeries(series1.ID, b(series1.Key), false)
|
||||
s2.DropSeries(series2.ID, b(series2.Key), false)
|
||||
|
||||
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
}
|
||||
|
||||
// seriesFileWrapper is a test wrapper for tsdb.seriesFileWrapper.
|
||||
type seriesFileWrapper struct {
|
||||
*tsdb.SeriesFile
|
||||
|
|
|
@ -832,16 +832,16 @@ func (i *Index) DropSeriesGlobal(key []byte) error { return nil }
|
|||
|
||||
// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
|
||||
// series for the measurment.
|
||||
func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error {
|
||||
func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {
|
||||
// Check if that was the last series for the measurement in the entire index.
|
||||
if ok, err := i.MeasurementHasSeries(name); err != nil {
|
||||
return err
|
||||
return false, err
|
||||
} else if ok {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If no more series exist in the measurement then delete the measurement.
|
||||
return i.DropMeasurement(name)
|
||||
return true, i.DropMeasurement(name)
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns the two measurement sketches for the index.
|
||||
|
|
Loading…
Reference in New Issue