diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index d61dd49ab5..dd7149ac55 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -99,9 +99,7 @@ type Engine struct { traceLogger zap.Logger // Logger to be used when trace-logging is on. traceLogging bool - // TODO(benbjohnson): Index needs to be moved entirely into engine. - index *tsdb.DatabaseIndex - + index tsdb.Index fieldsMu sync.RWMutex measurementFields map[string]*tsdb.MeasurementFields @@ -283,11 +281,11 @@ func (e *Engine) disableSnapshotCompactions() { func (e *Engine) Path() string { return e.path } func (e *Engine) Measurement(name string) (*tsdb.Measurement, error) { - return e.index.Measurement(name), nil + return e.index.Measurement(name) } func (e *Engine) Measurements() (tsdb.Measurements, error) { - return e.index.Measurements(), nil + return e.index.Measurements() } func (e *Engine) MeasurementCardinality() (int64, error) { @@ -299,7 +297,7 @@ func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool } func (e *Engine) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) { - return e.index.MeasurementsByRegex(re), nil + return e.index.MeasurementsByRegex(re) } // MeasurementFields returns the measurement fields for a measurement. @@ -434,6 +432,12 @@ func (e *Engine) Close() error { defer e.mu.Unlock() e.done = nil // Ensures that the channel will not be closed again. + if e.index != nil { + if err := e.index.Close(); err != nil { + return err + } + } + if err := e.FileStore.Close(); err != nil { return err } @@ -644,7 +648,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq seriesKey, field := SeriesAndFieldFromCompositeKey(key) measurement := tsdb.MeasurementFromSeriesKey(string(seriesKey)) - m := index.CreateMeasurementIndexIfNotExists(measurement) + m, _ := index.CreateMeasurementIndexIfNotExists(measurement) m.SetFieldName(field) e.fieldsMu.Lock() @@ -660,8 +664,10 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq } // Have we already indexed this series? - ss := index.SeriesBytes(seriesKey) - if ss != nil { + ss, err := index.Series(string(seriesKey)) + if err != nil { + return err + } else if ss != nil { return nil } @@ -844,14 +850,12 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { toDelete = append(toDelete, k) } } - e.index.DropSeries(toDelete) - - return nil + return e.index.DropSeries(toDelete) } // CreateMeasurement creates a measurement on the index. func (e *Engine) CreateMeasurement(name string) (*tsdb.Measurement, error) { - return e.index.CreateMeasurementIndexIfNotExists(name), nil + return e.index.CreateMeasurementIndexIfNotExists(name) } // DeleteMeasurement deletes a measurement and all related series. @@ -865,17 +869,16 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error { } // Remove the measurement from the index. - e.index.DropMeasurement(name) - return nil + return e.index.DropMeasurement(name) } func (e *Engine) CreateSeries(measurment string, series *tsdb.Series) (*tsdb.Series, error) { - return e.index.CreateSeriesIndexIfNotExists(measurment, series), nil + return e.index.CreateSeriesIndexIfNotExists(measurment, series) } // Series returns a series from the index. func (e *Engine) Series(key string) (*tsdb.Series, error) { - return e.index.Series(key), nil + return e.index.Series(key) } // WriteTo is not implemented. @@ -1338,7 +1341,11 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo var itrs []influxql.Iterator if err := func() error { - mms := tsdb.Measurements(e.index.MeasurementsByName(influxql.Sources(opt.Sources).Names())) + mByName, err := e.index.MeasurementsByName(influxql.Sources(opt.Sources).Names()) + if err != nil { + return err + } + mms := tsdb.Measurements(mByName) for _, mm := range mms { // Determine tagsets for this measurement based on dimensions and filters. @@ -1468,7 +1475,12 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu // createVarRefSeriesIterator creates an iterator for a variable reference for a series. func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) { - tags := influxql.NewTags(e.index.TagsForSeries(seriesKey).Map()) + tfs, err := e.index.TagsForSeries(seriesKey) + if err != nil { + return nil, err + } + + tags := influxql.NewTags(tfs.Map()) // Create options specific for this series. itrOpt := opt diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index d2cc08b3f4..736f609760 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -44,7 +44,10 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Verify index is correct. - if m := index.Measurement("cpu"); m == nil { + m, err := index.Measurement("cpu") + if err != nil { + t.Fatal(err) + } else if m == nil { t.Fatal("measurement not found") } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) { t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) @@ -67,7 +70,9 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Verify index is correct. - if m := index.Measurement("cpu"); m == nil { + if m, err = index.Measurement("cpu"); err != nil { + t.Fatal(err) + } else if m == nil { t.Fatal("measurement not found") } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) { t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) @@ -92,7 +97,9 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Verify index is correct. - if m := index.Measurement("cpu"); m == nil { + if m, err = index.Measurement("cpu"); err != nil { + t.Fatal(err) + } else if m == nil { t.Fatal("measurement not found") } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) { t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) diff --git a/tsdb/index.go b/tsdb/index.go new file mode 100644 index 0000000000..d27e086a80 --- /dev/null +++ b/tsdb/index.go @@ -0,0 +1,30 @@ +package tsdb + +import ( + "regexp" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" +) + +type Index interface { + Open() error + Close() error + + CreateMeasurementIndexIfNotExists(name string) (*Measurement, error) + Measurement(name string) (*Measurement, error) + Measurements() (Measurements, error) + MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) + MeasurementsByName(names []string) ([]*Measurement, error) + MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) + DropMeasurement(name string) error + + CreateSeriesIndexIfNotExists(measurment string, series *Series) (*Series, error) + Series(key string) (*Series, error) + DropSeries(keys []string) error + + SeriesN() (int64, error) + + Statistics(tags map[string]string) []models.Statistic + TagsForSeries(key string) (models.Tags, error) +} diff --git a/tsdb/meta.go b/tsdb/meta.go index 6ecf4a2ddc..f25218ade6 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -50,6 +50,9 @@ func NewDatabaseIndex(name string) *DatabaseIndex { } } +func (d *DatabaseIndex) Open() error { return nil } +func (d *DatabaseIndex) Close() error { return nil } + // IndexStatistics maintains statistics for the index. type IndexStatistics struct { NumSeries int64 @@ -69,48 +72,29 @@ func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic { } // Series returns a series by key. -func (d *DatabaseIndex) Series(key string) *Series { +func (d *DatabaseIndex) Series(key string) (*Series, error) { d.mu.RLock() s := d.series[key] d.mu.RUnlock() - return s -} - -// SeriesBytes returns a series by key. -func (d *DatabaseIndex) SeriesBytes(key []byte) *Series { - d.mu.RLock() - s := d.series[string(key)] - d.mu.RUnlock() - return s -} - -// SeriesKeys returns a sorted slice of strings indicating all the series keys in the index. -func (d *DatabaseIndex) SeriesKeys() []string { - d.mu.RLock() - s := make([]string, 0, len(d.series)) - for k := range d.series { - s = append(s, k) - } - d.mu.RUnlock() - return s + return s, nil } // SeriesN returns the number of series. -func (d *DatabaseIndex) SeriesN() int { +func (d *DatabaseIndex) SeriesN() (int64, error) { d.mu.RLock() defer d.mu.RUnlock() - return len(d.series) + return int64(len(d.series)), nil } -// Measurement returns the measurement object from the index by the name. -func (d *DatabaseIndex) Measurement(name string) *Measurement { +// Measurement returns the measurement object from the index by the name +func (d *DatabaseIndex) Measurement(name string) (*Measurement, error) { d.mu.RLock() defer d.mu.RUnlock() - return d.measurements[name] + return d.measurements[name], nil } -// MeasurementsByName returns a list of all the measurements in the index that match any entry in names. -func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement { +// MeasurementsByName returns a list of measurements. +func (d *DatabaseIndex) MeasurementsByName(names []string) ([]*Measurement, error) { d.mu.RLock() defer d.mu.RUnlock() @@ -120,7 +104,7 @@ func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement { a = append(a, m) } } - return a + return a, nil } // MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database. @@ -147,25 +131,25 @@ func (d *DatabaseIndex) SeriesShardN(shardID uint64) int { // CreateSeriesIndexIfNotExists adds the series for the given measurement to the // index and sets its ID or returns the existing series object -func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series { +func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) (*Series, error) { d.mu.RLock() // if there is a measurement for this id, it's already been added ss := d.series[series.Key] if ss != nil { d.mu.RUnlock() - return ss + return ss, nil } d.mu.RUnlock() // get or create the measurement index - m := d.CreateMeasurementIndexIfNotExists(measurementName) + m, _ := d.CreateMeasurementIndexIfNotExists(measurementName) d.mu.Lock() // Check for the series again under a write lock ss = d.series[series.Key] if ss != nil { d.mu.Unlock() - return ss + return ss, nil } // set the in memory ID for query processing on this shard @@ -180,12 +164,12 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser atomic.AddInt64(&d.stats.NumSeries, 1) d.mu.Unlock() - return series + return series, nil } // CreateMeasurementIndexIfNotExists creates or retrieves an in memory index // object for the measurement -func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement { +func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) (*Measurement, error) { name = escape.UnescapeString(name) // See if the measurement exists using a read-lock @@ -193,7 +177,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem m := d.measurements[name] if m != nil { d.mu.RUnlock() - return m + return m, nil } d.mu.RUnlock() @@ -209,19 +193,19 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem d.measurements[name] = m atomic.AddInt64(&d.stats.NumMeasurements, 1) } - return m + return m, nil } // TagsForSeries returns the tag map for the passed in series -func (d *DatabaseIndex) TagsForSeries(key string) models.Tags { +func (d *DatabaseIndex) TagsForSeries(key string) (models.Tags, error) { d.mu.RLock() defer d.mu.RUnlock() ss := d.series[key] if ss == nil { - return nil + return nil, nil } - return ss.Tags + return ss.Tags, nil } // MeasurementsByExpr takes an expression containing only tags and returns a @@ -399,7 +383,7 @@ func (d *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measureme } // MeasurementsByRegex returns the measurements that match the regex. -func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements { +func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) { d.mu.RLock() defer d.mu.RUnlock() @@ -409,11 +393,11 @@ func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements { matches = append(matches, m) } } - return matches + return matches, nil } // Measurements returns a list of all measurements. -func (d *DatabaseIndex) Measurements() Measurements { +func (d *DatabaseIndex) Measurements() (Measurements, error) { d.mu.RLock() measurements := make(Measurements, 0, len(d.measurements)) for _, m := range d.measurements { @@ -421,15 +405,16 @@ func (d *DatabaseIndex) Measurements() Measurements { } d.mu.RUnlock() - return measurements + return measurements, nil } // DropMeasurement removes the measurement and all of its underlying -// series from the database index. -func (d *DatabaseIndex) DropMeasurement(name string) { +// series from the database index +func (d *DatabaseIndex) DropMeasurement(name string) error { d.mu.Lock() defer d.mu.Unlock() d.dropMeasurement(name) + return nil } func (d *DatabaseIndex) dropMeasurement(name string) { @@ -447,10 +432,10 @@ func (d *DatabaseIndex) dropMeasurement(name string) { atomic.AddInt64(&d.stats.NumMeasurements, -1) } -// DropSeries removes the series keys and their tags from the index. -func (d *DatabaseIndex) DropSeries(keys []string) { +// DropSeries removes the series keys and their tags from the index +func (d *DatabaseIndex) DropSeries(keys []string) error { if len(keys) == 0 { - return + return nil } d.mu.Lock() @@ -481,6 +466,7 @@ func (d *DatabaseIndex) DropSeries(keys []string) { d.dropMeasurement(mname) } atomic.AddInt64(&d.stats.NumSeries, -nDeleted) + return nil } // Dereference removes all references to data within b and moves them to the heap.