Mark series deleted in series file

This commit adds the ability to correctly mark a series as deleted in
the global series file. Whenever a shard engine determines that a series
should be deleted, it checks with each shard's bitset for series that
are to be deleted and are no longer contained in any shard-local
bitsets.

These series are then removed from the series file.
pull/9315/head
Jason Wilder 2018-01-10 16:37:18 -07:00 committed by Edd Robinson
parent cd0c420dce
commit ba9a5af7eb
9 changed files with 134 additions and 34 deletions

View File

@ -81,6 +81,11 @@ type Engine interface {
io.WriterTo
}
// SeriesIDSets provides access to the total set of series IDs
type SeriesIDSets interface {
ForEach(f func(ids *SeriesIDSet)) error
}
// EngineFormat represents the format for an engine.
type EngineFormat int
@ -150,7 +155,8 @@ type EngineOptions struct {
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate
Config Config
Config Config
SeriesIDSets SeriesIDSets
}
// NewEngineOptions returns the default options.

View File

@ -178,6 +178,9 @@ type Engine struct {
compactionLimiter limiter.Fixed
scheduler *scheduler
// provides access to the total set of series IDs
seriesIDSets tsdb.SeriesIDSets
}
// NewEngine returns a new instance of Engine.
@ -219,6 +222,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
stats: stats,
compactionLimiter: opt.CompactionLimiter,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
seriesIDSets: opt.SeriesIDSets,
}
if e.traceLogging {
@ -1359,7 +1363,15 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// the series from the index.
if len(seriesKeys) > 0 {
buf := make([]byte, 1024) // For use when accessing series file.
ids := tsdb.NewSeriesIDSet()
for _, k := range seriesKeys {
name, tags := models.ParseKey(k)
sid := e.sfile.SeriesID([]byte(name), tags, buf)
if sid == 0 {
return fmt.Errorf("unable to find id for series key %s during deletion", k)
}
id := (sid << 32) | e.id
// This key was crossed out earlier, skip it
if k == nil {
continue
@ -1379,30 +1391,38 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
i++
}
// Some cache values still exists, leave the series in the index.
if hasCacheValues {
continue
}
// Remove the series from the series file and index.
// TODO(edd): we need to first check with all other shards if it's
// OK to tombstone the series in the series file.
//
// Further, in the case of the inmem index, we should only remove
// the series from the index if we also tombstone it in the series
// file.
name, tags := models.ParseKey(k)
sid := e.sfile.SeriesID([]byte(name), tags, buf)
if sid == 0 {
return fmt.Errorf("unable to find id for series key %s during deletion", k)
}
// Remove the series from the index for this shard
id := (sid << 32) | e.id
if err := e.index.UnassignShard(string(k), id, ts); err != nil {
return err
}
// Add the id to the set of delete ids.
ids.Add(sid)
}
// Remove any series IDs for our set that still exist in other shards.
// We cannot remove these from the series file yet.
if err := e.seriesIDSets.ForEach(func(s *tsdb.SeriesIDSet) {
ids = ids.AndNot(s)
}); err != nil {
return err
}
// Remove the remaining ids from the series file as they no longer exist
// in any shard.
var err error
ids.ForEach(func(id uint64) {
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
err = err1
}
})
if err != nil {
return err
}
}

View File

@ -1817,15 +1817,16 @@ func NewEngine(index string) (*Engine, error) {
if err = sfile.Open(); err != nil {
return nil, err
}
seriesIDs := tsdb.NewSeriesIDSet()
opt := tsdb.NewEngineOptions()
opt.IndexVersion = index
if index == "inmem" {
opt.InmemIndex = inmem.NewIndex(db, sfile)
}
opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDs})
idxPath := filepath.Join(dbPath, "index")
seriesIDs := tsdb.NewSeriesIDSet()
idx := tsdb.MustOpenIndex(1, db, idxPath, seriesIDs, sfile, opt)
tsm1Engine := tsm1.NewEngine(1, idx, db, filepath.Join(root, "data"), filepath.Join(root, "wal"), sfile, opt).(*tsm1.Engine)
@ -2053,3 +2054,12 @@ func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) {
itr.keys = itr.keys[1:]
return s, nil
}
type seriesIDSets []*tsdb.SeriesIDSet
func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
for _, v := range a {
f(v)
}
return nil
}

View File

@ -1050,6 +1050,7 @@ func (idx *ShardIndex) UnassignShard(key string, id uint64, ts int64) error {
// TODO(edd): temporarily munging series id and shard id into same value,
// to test prototype without having to change Index API.
sid, shardID := id>>32, id&0xFFFFFFFF
idx.seriesIDSet.Remove(sid)
return idx.Index.UnassignShard(key, shardID, ts)
}

View File

@ -528,17 +528,17 @@ func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
// DropSeries drops the provided series from the index.
func (i *Index) DropSeries(key []byte, ts int64) error {
// Extract measurement name.
name, tags := models.ParseKeyBytes(key)
partitionKey := tsdb.AppendSeriesKey(nil, name, tags)
// Remove from partition.
if err := i.partition(key).DropSeries(key, ts); err != nil {
if err := i.partition(partitionKey).DropSeries(key, ts); err != nil {
return err
}
// Extract measurement name.
name, _ := models.ParseKey(key)
mname := []byte(name)
// Check if that was the last series for the measurement in the entire index.
itr, err := i.MeasurementSeriesIDIterator(mname)
itr, err := i.MeasurementSeriesIDIterator(name)
if err != nil {
return err
} else if itr == nil {
@ -554,7 +554,7 @@ func (i *Index) DropSeries(key []byte, ts int64) error {
}
// If no more series exist in the measurement then delete the measurement.
if err := i.DropMeasurement(mname); err != nil {
if err := i.DropMeasurement(name); err != nil {
return err
}
return nil

View File

@ -295,6 +295,10 @@ func (i *Partition) buildSeriesSet() error {
return nil
}
if i.sfile.IsDeleted(elem.SeriesID) {
continue
}
// Add id to series set.
i.seriesSet.Add(elem.SeriesID)
}
@ -605,19 +609,12 @@ func (i *Partition) DropSeries(key []byte, ts int64) error {
i.mu.RLock()
defer i.mu.RUnlock()
name, tags := models.ParseKey(key)
mname := []byte(name)
seriesID := i.sfile.SeriesID(mname, tags, nil)
name, tags := models.ParseKeyBytes(key)
seriesID := i.sfile.SeriesID(name, tags, nil)
// Remove from series id set.
i.seriesSet.Remove(seriesID)
// TODO(edd): this should only happen when there are no shards containing
// this series.
if err := i.sfile.DeleteSeriesID(seriesID); err != nil {
return err
}
return nil
}(); err != nil {
return err

View File

@ -71,3 +71,29 @@ func (s *SeriesIDSet) Merge(others ...*SeriesIDSet) {
defer s.Unlock()
s.bitmap = roaring.FastOr(bms...)
}
// AndNot returns the set of elements that only exist in s.
func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet {
s.RLock()
defer s.RUnlock()
other.RLock()
defer other.RUnlock()
return &SeriesIDSet{bitmap: roaring.AndNot(s.bitmap, other.bitmap)}
}
// ForEach calls f for each id in the set.
func (s *SeriesIDSet) ForEach(f func(id uint64)) {
s.RLock()
defer s.RUnlock()
itr := s.bitmap.Iterator()
for itr.HasNext() {
f(uint64(itr.Next()))
}
}
func (s *SeriesIDSet) String() string {
s.RLock()
defer s.RUnlock()
return s.bitmap.String()
}

View File

@ -365,6 +365,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir), sfile.SeriesFile)
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
if err := sh.Open(); err != nil {
@ -452,6 +453,7 @@ func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir), sfile.SeriesFile)
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
if err := sh.Open(); err != nil {
@ -1924,3 +1926,12 @@ func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) {
itr.keys = itr.keys[1:]
return s, nil
}
type seriesIDSets []*tsdb.SeriesIDSet
func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
for _, v := range a {
f(v)
}
return nil
}

View File

@ -267,6 +267,9 @@ func (s *Store) loadShards() error {
opt := s.EngineOptions
opt.InmemIndex = idx
// Provide an implementation of the ShardIDSets
opt.SeriesIDSets = shardSet{store: s, db: db}
// Existing shards should continue to use inmem index.
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
opt.IndexVersion = "inmem"
@ -486,6 +489,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
// Copy index options and pass in shared index.
opt := s.EngineOptions
opt.InmemIndex = idx
opt.SeriesIDSets = shardSet{store: s, db: database}
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, path, walPath, sfile, opt)
@ -1731,3 +1735,28 @@ func relativePath(storePath, shardPath string) (string, error) {
return name, nil
}
type shardSet struct {
store *Store
db string
}
func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
s.store.mu.RLock()
shards := s.store.filterShards(byDatabase(s.db))
s.store.mu.RUnlock()
for _, sh := range shards {
idx, err := sh.Index()
if err != nil {
return err
}
if t, ok := idx.(interface {
SeriesIDSet() *SeriesIDSet
}); ok {
f(t.SeriesIDSet())
}
}
return nil
}