diff --git a/tsdb/engine.go b/tsdb/engine.go index 5ec979bce6..c37a51763f 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "regexp" "sort" "time" @@ -36,12 +37,22 @@ type Engine interface { CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) WritePoints(points []models.Point) error + + CreateSeries(measurment string, series *Series) (*Series, error) + Series(key string) (*Series, error) ContainsSeries(keys []string) (map[string]bool, error) DeleteSeries(keys []string) error DeleteSeriesRange(keys []string, min, max int64) error - DeleteMeasurement(name string, seriesKeys []string) error SeriesCount() (n int, err error) + + CreateMeasurement(name string) (*Measurement, error) + Measurement(name string) (*Measurement, error) + Measurements() (Measurements, error) + MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) + MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) MeasurementFields(measurement string) *MeasurementFields + DeleteMeasurement(name string, seriesKeys []string) error + CreateSnapshot() (string, error) SetEnabled(enabled bool) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 37ba8ee2a3..efcac07ae4 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -10,6 +10,7 @@ import ( "math" "os" "path/filepath" + "regexp" "runtime" "sort" "strings" @@ -288,6 +289,22 @@ func (e *Engine) Index() *tsdb.DatabaseIndex { return e.index } +func (e *Engine) Measurement(name string) (*tsdb.Measurement, error) { + return e.index.Measurement(name), nil +} + +func (e *Engine) Measurements() (tsdb.Measurements, error) { + return e.index.Measurements(), nil +} + +func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) { + return e.index.MeasurementsByExpr(expr) +} + +func (e *Engine) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) { + return e.index.MeasurementsByRegex(re), nil +} + // MeasurementFields returns the measurement fields for a measurement. func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields { e.fieldsMu.RLock() @@ -374,6 +391,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration), }, }) + statistics = append(statistics, e.index.Statistics(tags)...) statistics = append(statistics, e.Cache.Statistics(tags)...) statistics = append(statistics, e.FileStore.Statistics(tags)...) statistics = append(statistics, e.WAL.Statistics(tags)...) @@ -648,8 +666,6 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq // Have we already indexed this series? ss := index.SeriesBytes(seriesKey) if ss != nil { - // Add this shard to the existing series - ss.AssignShard(shardID) return nil } @@ -657,9 +673,8 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq // fields (in line protocol format) in the series key _, tags, _ := models.ParseKey(seriesKey) - s := tsdb.NewSeries(string(seriesKey), tags) + s := tsdb.NewSeries(seriesKey, tags) index.CreateSeriesIndexIfNotExists(measurement, s) - s.AssignShard(shardID) return nil } @@ -744,6 +759,7 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) { }); err != nil { return nil, err } + return keyMap, nil } @@ -815,9 +831,31 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { e.Cache.DeleteRange(walKeys, min, max) // delete from the WAL - _, err := e.WAL.DeleteRange(walKeys, min, max) + if _, err := e.WAL.DeleteRange(walKeys, min, max); err != nil { + return err + } - return err + // Have we deleted all points for the series? If so, we need to remove + // the series from the index. + existing, err := e.ContainsSeries(seriesKeys) + if err != nil { + return err + } + + var toDelete []string + for k, exists := range existing { + if !exists { + toDelete = append(toDelete, k) + } + } + e.index.DropSeries(toDelete) + + return nil +} + +// CreateMeasurement creates a measurement on the index. +func (e *Engine) CreateMeasurement(name string) (*tsdb.Measurement, error) { + return e.index.CreateMeasurementIndexIfNotExists(name), nil } // DeleteMeasurement deletes a measurement and all related series. @@ -826,7 +864,13 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error { delete(e.measurementFields, name) e.fieldsMu.Unlock() - return e.DeleteSeries(seriesKeys) + if err := e.DeleteSeries(seriesKeys); err != nil { + return err + } + + // Remove the measurement from the index. + e.index.DropMeasurement(name) + return nil } // SeriesCount returns the number of series buckets on the shard. @@ -846,6 +890,15 @@ func (e *Engine) LastModified() time.Time { return fsTime } +func (e *Engine) CreateSeries(measurment string, series *tsdb.Series) (*tsdb.Series, error) { + return e.index.CreateSeriesIndexIfNotExists(measurment, series), nil +} + +// Series returns a series from the index. +func (e *Engine) Series(key string) (*tsdb.Series, error) { + return e.index.Series(key), nil +} + // WriteTo is not implemented. func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 0d7f61a636..f571be9973 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -573,7 +573,8 @@ func TestEngine_DeleteSeries(t *testing.T) { p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") // Write those points to the engine. - e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) + e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) + e.LoadMetadataIndex(1, tsdb.NewDatabaseIndex("db0")) // Initialise an index // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} diff --git a/tsdb/meta.go b/tsdb/meta.go index d8acad05ee..6ecf4a2ddc 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -145,7 +145,8 @@ func (d *DatabaseIndex) SeriesShardN(shardID uint64) int { return n } -// CreateSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object. +// CreateSeriesIndexIfNotExists adds the series for the given measurement to the +// index and sets its ID or returns the existing series object func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series { d.mu.RLock() // if there is a measurement for this id, it's already been added @@ -182,7 +183,8 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser return series } -// CreateMeasurementIndexIfNotExists creates or retrieves an in-memory index object for the measurement. +// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index +// object for the measurement func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement { name = escape.UnescapeString(name) @@ -210,56 +212,6 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem return m } -// AssignShard updates the index to indicate that series k exists in -// the given shardID. -func (d *DatabaseIndex) AssignShard(k string, shardID uint64) { - ss := d.Series(k) - if ss != nil { - ss.AssignShard(shardID) - } -} - -// UnassignShard updates the index to indicate that series k does not exist in -// the given shardID. -func (d *DatabaseIndex) UnassignShard(k string, shardID uint64) { - ss := d.Series(k) - if ss != nil { - if ss.Assigned(shardID) { - // Remove the shard from any series - ss.UnassignShard(shardID) - - // If this series no longer has shards assigned, remove the series - if ss.ShardN() == 0 { - - // Remove the series the measurements - ss.measurement.DropSeries(ss) - - // If the measurement no longer has any series, remove it as well - if !ss.measurement.HasSeries() { - d.mu.Lock() - d.dropMeasurement(ss.measurement.Name) - d.mu.Unlock() - } - - // Remove the series key from the series index - d.mu.Lock() - delete(d.series, k) - atomic.AddInt64(&d.stats.NumSeries, -1) - d.mu.Unlock() - } - } - } -} - -// RemoveShard removes all references to shardID from any series or measurements -// in the index. If the shard was the only owner of data for the series, the series -// is removed from the index. -func (d *DatabaseIndex) RemoveShard(shardID uint64) { - for _, k := range d.SeriesKeys() { - d.UnassignShard(k, shardID) - } -} - // TagsForSeries returns the tag map for the passed in series func (d *DatabaseIndex) TagsForSeries(key string) models.Tags { d.mu.RLock() @@ -497,6 +449,10 @@ func (d *DatabaseIndex) dropMeasurement(name string) { // DropSeries removes the series keys and their tags from the index. func (d *DatabaseIndex) DropSeries(keys []string) { + if len(keys) == 0 { + return + } + d.mu.Lock() defer d.mu.Unlock() @@ -537,9 +493,10 @@ func (d *DatabaseIndex) Dereference(b []byte) { } } -// Measurement represents a collection of time series in a database. It also contains in-memory -// structures for indexing tags. Exported functions are goroutine safe while un-exported functions -// assume the caller will use the appropriate locks. +// Measurement represents a collection of time series in a database. It also +// contains in memory structures for indexing tags. Exported functions are +// goroutine safe while un-exported functions assume the caller will use the +// appropriate locks. type Measurement struct { mu sync.RWMutex Name string `json:"name,omitempty"` @@ -563,14 +520,6 @@ func NewMeasurement(name string) *Measurement { } } -// HasField returns true if the measurement has a field by the given name. -func (m *Measurement) HasField(name string) bool { - m.mu.RLock() - hasField := m.hasField(name) - m.mu.RUnlock() - return hasField -} - func (m *Measurement) hasField(name string) bool { _, hasField := m.fieldNames[name] return hasField @@ -617,20 +566,7 @@ func (m *Measurement) SeriesKeys() []string { return keys } -// ValidateGroupBy ensures that the GROUP BY is not a field. -func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error { - for _, d := range stmt.Dimensions { - switch e := d.Expr.(type) { - case *influxql.VarRef: - if m.HasField(e.Val) { - return fmt.Errorf("can not use field in GROUP BY clause: %s", e.Val) - } - } - } - return nil -} - -// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key. +// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key func (m *Measurement) HasTagKey(k string) bool { m.mu.RLock() defer m.mu.RUnlock() @@ -766,15 +702,18 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf return m.walkWhereForSeriesIds(condition) } -// TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine -// what composite series will be created by a group by. i.e. "group by region" should return: -// {"region":"uswest"}, {"region":"useast"} -// or region, service returns -// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, ... -// This will also populate the TagSet objects with the series IDs that match each tagset and any -// influx filter expression that goes with the series -// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. -func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { +// TagSets returns the unique tag sets that exist for the given tag keys. This +// is used to determine what composite series will be created by a group by. +// +// i.e. "group by region" should return: {"region":"uswest"}, +// {"region":"useast"} or region, service returns {"region": "uswest", +// "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... +// +// This will also populate the TagSet objects with the series IDs that match +// each tagset and any influx filter expression that goes with the series TODO: +// this shouldn't be exported. However, until tx.go and the engine get +// refactored into tsdb, we need it. +func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { m.mu.RLock() // get the unique set of series ids and the filters that should be applied to each @@ -1161,9 +1100,10 @@ func (fe FilterExprs) Len() int { return len(fe) } -// walkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and -// a map from those series IDs to filter expressions that should be used to limit points returned in -// the final query result. +// walkWhereForSeriesIds recursively walks the WHERE clause and returns an +// ordered set of series IDs and a map from those series IDs to filter +// expressions that should be used to limit points returned in the final query +// result. func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) { switch n := expr.(type) { case *influxql.BinaryExpr: @@ -1226,7 +1166,8 @@ func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, Filt } } -// expandExpr returns a list of expressions expanded by all possible tag combinations. +// expandExpr returns a list of expressions expanded by all possible tag +// combinations. func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr { // Retrieve list of unique values for each tag. valuesByTagKey := m.uniqueTagValues(expr) @@ -1483,7 +1424,7 @@ func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string { return out } -// Measurements represents a list of *Measurement. +// Measurements represents a set of *Measurement. type Measurements []*Measurement // Len implements sort.Interface. @@ -1559,60 +1500,16 @@ type Series struct { Tags models.Tags ID uint64 measurement *Measurement - shardIDs []uint64 // shards that have this series defined } -// NewSeries returns an initialized series struct. -func NewSeries(key string, tags models.Tags) *Series { +// NewSeries returns an initialized series struct +func NewSeries(key []byte, tags models.Tags) *Series { return &Series{ - Key: key, + Key: string(key), Tags: tags, } } -// AssignShard adds shardID to the list of shards this series is assigned to. -func (s *Series) AssignShard(shardID uint64) { - s.mu.Lock() - if !s.assigned(shardID) { - s.shardIDs = append(s.shardIDs, shardID) - sort.Sort(uint64Slice(s.shardIDs)) - } - s.mu.Unlock() -} - -// UnassignShard removes the shardID from the list of shards this series is assigned to. -func (s *Series) UnassignShard(shardID uint64) { - s.mu.Lock() - for i, v := range s.shardIDs { - if v == shardID { - s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...) - break - } - } - s.mu.Unlock() -} - -// Assigned returns whether this series is assigned to the given shard. -func (s *Series) Assigned(shardID uint64) bool { - s.mu.RLock() - b := s.assigned(shardID) - s.mu.RUnlock() - return b -} - -func (s *Series) assigned(shardID uint64) bool { - i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID }) - return i < len(s.shardIDs) && s.shardIDs[i] == shardID -} - -// ShardN returns the number of shards this series is assigned to. -func (s *Series) ShardN() int { - s.mu.RLock() - n := len(s.shardIDs) - s.mu.RUnlock() - return n -} - // Dereference removes references to a byte slice. func (s *Series) Dereference(b []byte) { s.mu.Lock() @@ -2085,12 +1982,6 @@ func MeasurementFromSeriesKey(key string) string { return escape.UnescapeString(k) } -type uint64Slice []uint64 - -func (a uint64Slice) Len() int { return len(a) } -func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } - type byTagKey []*influxql.TagSet func (t byTagKey) Len() int { return len(t) } diff --git a/tsdb/shard.go b/tsdb/shard.go index a7b175a4ca..fb087ac880 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -101,7 +101,6 @@ func (e PartialWriteError) Error() string { // Data can be split across many shards. The query engine in TSDB is responsible // for combining the output of many shards into a single query result. type Shard struct { - index *DatabaseIndex path string walPath string id uint64 @@ -219,7 +218,6 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { }} // Add the index and engine statistics. - statistics = append(statistics, s.index.Statistics(tags)...) statistics = append(statistics, s.engine.Statistics(tags)...) return statistics } @@ -257,15 +255,18 @@ func (s *Shard) Open() error { // Load metadata index. start := time.Now() - if err := e.LoadMetadataIndex(s.id, s.index); err != nil { + if err := e.LoadMetadataIndex(s.id, NewDatabaseIndex(s.database)); err != nil { return err } - count := s.index.SeriesShardN(s.id) - atomic.AddInt64(&s.stats.SeriesCreated, int64(count)) - s.engine = e + count, err := s.engine.SeriesCount() + if err != nil { + return err + } + atomic.AddInt64(&s.stats.SeriesCreated, int64(count)) + s.logger.Info(fmt.Sprintf("%s database index loaded in %s", s.path, time.Now().Sub(start))) go s.monitor() @@ -303,9 +304,6 @@ func (s *Shard) close() error { close(s.closing) } - // Wipe out our index. - s.index = NewDatabaseIndex(s.database) - err := s.engine.Close() if err == nil { s.engine = nil @@ -462,19 +460,17 @@ func (s *Shard) DeleteMeasurement(name string) error { } // Attempt to find the series keys. - m := s.index.Measurement(name) + m, err := s.engine.Measurement(name) + if err != nil { + return err + } + if m == nil { return influxql.ErrMeasurementNotFound(name) } // Remove the measurement from the engine. - if err := s.engine.DeleteMeasurement(name, m.SeriesKeys()); err != nil { - return err - } - - // Remove the measurement from the index. - s.index.DropMeasurement(name) - return nil + return s.engine.DeleteMeasurement(name, m.SeriesKeys()) } func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error { @@ -492,7 +488,10 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error } // ensure the measurement is in the index and the field is there - measurement := s.index.CreateMeasurementIndexIfNotExists(f.Measurement) + measurement, err := s.engine.CreateMeasurement(f.Measurement) + if err != nil { + return err + } measurement.SetFieldName(f.Field.Name) } @@ -512,7 +511,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, // and record why/increment counters for i, p := range points { tags := p.Tags() - m := s.index.Measurement(p.Name()) + m := s.Measurement(p.Name()) // Measurement doesn't exist yet, can't check the limit if m != nil { @@ -574,22 +573,31 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, iter.Reset() // see if the series should be added to the index - ss := s.index.SeriesBytes(p.Key()) + key := string(p.Key()) + ss, err := s.engine.Series(key) + if err != nil { + return nil, nil, err + } + if ss == nil { - if s.options.Config.MaxSeriesPerDatabase > 0 && s.index.SeriesN()+1 > s.options.Config.MaxSeriesPerDatabase { + cnt, err := s.engine.SeriesN() + if err != nil { + return nil, nil, err + } + + if s.options.Config.MaxSeriesPerDatabase > 0 && cnt+1 > s.options.Config.MaxSeriesPerDatabase { atomic.AddInt64(&s.stats.WritePointsDropped, 1) dropped++ - reason = fmt.Sprintf("max-series-per-database limit exceeded: db=%s (%d/%d)", - s.database, s.index.SeriesN(), s.options.Config.MaxSeriesPerDatabase) + reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)", s.database, cnt, s.options.Config.MaxSeriesPerDatabase) continue } - ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), NewSeries(string(p.Key()), tags)) + ss = NewSeries(p.Key(), tags) atomic.AddInt64(&s.stats.SeriesCreated, 1) } - if !ss.Assigned(s.id) { - ss.AssignShard(s.id) + if ss, err = s.engine.CreateSeries(p.Name(), ss); err != nil { + return nil, nil, err } // see if the field definitions need to be saved to the shard @@ -657,18 +665,20 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, // Measurement returns the named measurement from the index. func (s *Shard) Measurement(name string) *Measurement { - return s.index.Measurement(name) + m, _ := s.engine.Measurement(name) + return m } // Measurements returns a slice of all measurements from the index. func (s *Shard) Measurements() []*Measurement { - return s.index.Measurements() + m, _ := s.engine.Measurements() + return m } // MeasurementsByExpr takes an expression containing only tags and returns a // slice of matching measurements. func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, error) { - return s.index.MeasurementsByExpr(cond) + return s.engine.MeasurementsByExpr(cond) } // SeriesCount returns the number of series buckets on the shard. @@ -681,7 +691,8 @@ func (s *Shard) SeriesCount() (int, error) { // Series returns a series by key. func (s *Shard) Series(key string) *Series { - return s.index.Series(key) + series, _ := s.engine.Series(key) + return series } // WriteTo writes the shard's data to w. @@ -767,7 +778,10 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]inf switch m := src.(type) { case *influxql.Measurement: // Retrieve measurement. - mm := s.index.Measurement(m.Name) + mm, err := s.engine.Measurement(m.Name) + if err != nil { + return nil, nil, err + } if mm == nil { continue } @@ -785,7 +799,7 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]inf } } - return + return fields, dimensions, nil } // ExpandSources expands regex sources and removes duplicates. @@ -805,7 +819,12 @@ func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error } // Loop over matching measurements. - for _, m := range s.index.MeasurementsByRegex(src.Regex.Val) { + measurements, err := s.engine.MeasurementsByRegex(src.Regex.Val) + if err != nil { + return nil, err + } + + for _, m := range measurements { other := &influxql.Measurement{ Database: src.Database, RetentionPolicy: src.RetentionPolicy, @@ -886,8 +905,8 @@ func (s *Shard) monitor() { continue } - for _, m := range s.index.Measurements() { - m.WalkTagKeys(func(k string) { + for _, m := range s.Measurements() { + for _, k := range m.TagKeys() { n := m.Cardinality(k) perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100) if perc > 100 { @@ -1072,15 +1091,16 @@ func (ic *shardIteratorCreator) ExpandSources(sources influxql.Sources) (influxq func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { itr := &fieldKeysIterator{sh: sh} + var err error // Retrieve measurements from shard. Filter if condition specified. if opt.Condition == nil { - itr.mms = sh.index.Measurements() - } else { - mms, _, err := sh.index.measurementsByExpr(opt.Condition) - if err != nil { + if itr.mms, err = sh.engine.Measurements(); err != nil { + return nil, err + } + } else { + if itr.mms, _, err = sh.engine.MeasurementsByExpr(opt.Condition); err != nil { return nil, err } - itr.mms = mms } // Sort measurements by name. @@ -1182,7 +1202,10 @@ func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterat } // Read and sort all measurements. - mms := sh.index.Measurements() + mms, err := sh.engine.Measurements() + if err != nil { + return nil, err + } sort.Sort(mms) return &seriesIterator{ @@ -1295,11 +1318,13 @@ func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Ite return e }), nil) - mms, ok, err := sh.index.measurementsByExpr(measurementExpr) + mms, ok, err := sh.engine.MeasurementsByExpr(measurementExpr) if err != nil { return nil, err } else if !ok { - mms = sh.index.Measurements() + if mms, err = sh.engine.Measurements(); err != nil { + return nil, err + } sort.Sort(mms) } @@ -1408,11 +1433,14 @@ type measurementKeyFunc func(m *Measurement) []string func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt influxql.IteratorOptions) (*measurementKeysIterator, error) { itr := &measurementKeysIterator{fn: fn} + var err error // Retrieve measurements from shard. Filter if condition specified. if opt.Condition == nil { - itr.mms = sh.index.Measurements() + if itr.mms, err = sh.engine.Measurements(); err != nil { + return nil, err + } } else { - mms, _, err := sh.index.measurementsByExpr(opt.Condition) + mms, _, err := sh.engine.MeasurementsByExpr(opt.Condition) if err != nil { return nil, err } diff --git a/tsdb/store.go b/tsdb/store.go index fc4c230d0a..e1162b45f7 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -329,11 +329,6 @@ func (s *Store) DeleteShard(shardID uint64) error { return nil } - // Remove the shard from the database indexes before closing the shard. - // Closing the shard will do this as well, but it will unload it while - // the shard is locked which can block stats collection and other calls. - sh.UnloadIndex() - if err := sh.Close(); err != nil { return err } @@ -683,28 +678,9 @@ func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int6 s.mu.RUnlock() return s.walkShards(shards, func(sh *Shard) error { - if sh.database != database { - return nil - } - if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil { return err } - - // The keys we passed in may be fully deleted from the shard, if so, - // we need to remove the shard from all the meta data indices. - existing, err := sh.ContainsSeries(seriesKeys) - if err != nil { - return err - } - - var toDelete []string - for k, exists := range existing { - if !exists { - toDelete = append(toDelete, k) - } - } - sh.index.DropSeries(toDelete) return nil }) }