Pass series id in via Index API
parent
47851f4b7d
commit
a2ece0a49a
|
@ -1399,7 +1399,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
|||
}
|
||||
|
||||
// Remove the series from the local index.
|
||||
if err := e.index.DropSeries(k, ts); err != nil {
|
||||
if err := e.index.DropSeries(sid, k, ts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ type Index interface {
|
|||
InitializeSeries(key, name []byte, tags models.Tags) error
|
||||
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
|
||||
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
|
||||
DropSeries(key []byte, ts int64) error
|
||||
DropSeries(seriesID uint64, key []byte, ts int64) error
|
||||
|
||||
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
|
||||
SeriesN() int64
|
||||
|
|
|
@ -1065,14 +1065,9 @@ type ShardIndex struct {
|
|||
opt tsdb.EngineOptions
|
||||
}
|
||||
|
||||
// DropSeries removes the provided series id from the local bitset tracking
|
||||
// series. It then checks the database-wide series file, and if the series id has
|
||||
// been deleted, it removes the series from the global in-memory index.
|
||||
func (idx *ShardIndex) DropSeries(key []byte, ts int64) error {
|
||||
// TODO(edd): Make this more efficient by passing in the series id.
|
||||
name, tags := models.ParseKey(key)
|
||||
seriesID := idx.sfile.SeriesID([]byte(name), tags, nil)
|
||||
|
||||
// DropSeries removes the provided series id from the local bitset that tracks
|
||||
// series in this shard only.
|
||||
func (idx *ShardIndex) DropSeries(seriesID uint64, _ []byte, ts int64) error {
|
||||
// Remove from shard-local bitset if it exists.
|
||||
idx.seriesIDSet.Lock()
|
||||
if idx.seriesIDSet.ContainsNoLock(seriesID) {
|
||||
|
|
|
@ -536,15 +536,15 @@ func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
|
|||
}
|
||||
|
||||
// DropSeries drops the provided series from the index.
|
||||
func (i *Index) DropSeries(key []byte, ts int64) error {
|
||||
// Extract measurement name.
|
||||
name, _ := models.ParseKeyBytes(key)
|
||||
|
||||
func (i *Index) DropSeries(seriesID uint64, key []byte, ts int64) error {
|
||||
// Remove from partition.
|
||||
if err := i.partition(key).DropSeries(key, ts); err != nil {
|
||||
if err := i.partition(key).DropSeries(seriesID, ts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Extract measurement name.
|
||||
name, _ := models.ParseKeyBytes(key)
|
||||
|
||||
// Check if that was the last series for the measurement in the entire index.
|
||||
if ok, err := i.MeasurementHasSeries(name); err != nil {
|
||||
return err
|
||||
|
@ -820,9 +820,9 @@ func (i *Index) AssignShard(k string, shardID uint64) {}
|
|||
// UnassignShard removes the provided series key from the index. The naming of
|
||||
// this method stems from a legacy index logic that used to track which shards
|
||||
// owned which series.
|
||||
func (i *Index) UnassignShard(k string, id uint64, ts int64) error {
|
||||
func (i *Index) UnassignShard(k string, _ uint64, ts int64) error {
|
||||
// This can be called directly once inmem is gone.
|
||||
return i.DropSeries([]byte(k), ts)
|
||||
return i.DropSeries(0, []byte(k), ts)
|
||||
}
|
||||
|
||||
func (i *Index) Rebuild() {}
|
||||
|
|
|
@ -91,8 +91,14 @@ func TestIndex_MeasurementExists(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
name, tags := []byte("cpu"), models.NewTags(map[string]string{"region": "east"})
|
||||
sid := idx.Index.SeriesFile().SeriesID(name, tags, nil)
|
||||
if sid == 0 {
|
||||
t.Fatalf("got 0 series id for %s/%v", name, tags)
|
||||
}
|
||||
|
||||
// Delete one series.
|
||||
if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "east"})), 0); err != nil {
|
||||
if err := idx.DropSeries(sid, models.MakeKey(name, tags), 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -106,7 +112,8 @@ func TestIndex_MeasurementExists(t *testing.T) {
|
|||
})
|
||||
|
||||
// Delete second series.
|
||||
if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "west"})), 0); err != nil {
|
||||
tags.Set([]byte("region"), []byte("west"))
|
||||
if err := idx.DropSeries(sid, models.MakeKey(name, tags), 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -606,32 +606,16 @@ func (i *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode
|
|||
return i.CheckLogFile()
|
||||
}
|
||||
|
||||
func (i *Partition) DropSeries(key []byte, ts int64) error {
|
||||
func (i *Partition) DropSeries(seriesID uint64, ts int64) error {
|
||||
// TODO: Use ts.
|
||||
|
||||
if err := func() error {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
name, tags := models.ParseKeyBytes(key)
|
||||
seriesID := i.sfile.SeriesID(name, tags, nil)
|
||||
if seriesID == 0 {
|
||||
return fmt.Errorf("[partition %s] no series id for key %q when attempting index drop", i.id, string(key))
|
||||
}
|
||||
|
||||
// Delete series from index.
|
||||
if err := i.activeLogFile.DeleteSeriesID(seriesID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove from series id set.
|
||||
i.seriesIDSet.Remove(seriesID)
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
// Delete series from index.
|
||||
if err := i.activeLogFile.DeleteSeriesID(seriesID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i.seriesIDSet.Remove(seriesID)
|
||||
|
||||
// Swap log file, if necessary.
|
||||
if err := i.CheckLogFile(); err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue