diff --git a/services/copier/service_test.go b/services/copier/service_test.go index aa7f398721..a68a413b87 100644 --- a/services/copier/service_test.go +++ b/services/copier/service_test.go @@ -163,7 +163,7 @@ func MustOpenShard(id uint64) *Shard { sh := &Shard{ Shard: tsdb.NewShard(id, - tsdb.NewDatabaseIndex(), + tsdb.NewDatabaseIndex("db"), filepath.Join(path, "data"), filepath.Join(path, "wal"), tsdb.NewEngineOptions(), diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index eadd9a725e..fcd867fdb7 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -33,7 +33,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Load metadata index. - index := tsdb.NewDatabaseIndex() + index := tsdb.NewDatabaseIndex("db") if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { t.Fatal(err) } @@ -56,7 +56,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Load metadata index. - index = tsdb.NewDatabaseIndex() + index = tsdb.NewDatabaseIndex("db") if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { t.Fatal(err) } @@ -81,7 +81,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Load metadata index. - index = tsdb.NewDatabaseIndex() + index = tsdb.NewDatabaseIndex("db") if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { t.Fatal(err) } @@ -521,7 +521,7 @@ func MustOpenEngine() *Engine { if err := e.Open(); err != nil { panic(err) } - if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex(), make(map[string]*tsdb.MeasurementFields)); err != nil { + if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db"), make(map[string]*tsdb.MeasurementFields)); err != nil { panic(err) } return e diff --git a/tsdb/meta.go b/tsdb/meta.go index 6b0bc5c5a3..2e50d6984c 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -1,6 +1,7 @@ package tsdb import ( + "expvar" "fmt" "regexp" "sort" @@ -8,6 +9,7 @@ import ( "sync" "time" + "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/pkg/escape" "github.com/influxdata/influxdb/tsdb/internal" @@ -19,6 +21,9 @@ import ( const ( maxStringLength = 64 * 1024 + + statDatabaseSeries = "numSeries" // number of series in this database + statDatabaseMeasurements = "numMeasurements" // number of measurements in this database ) // DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. @@ -29,13 +34,19 @@ type DatabaseIndex struct { measurements map[string]*Measurement // measurement name to object and index series map[string]*Series // map series key to the Series object lastID uint64 // last used series ID. They're in memory only for this shard + + name string // name of the database represented by this index + + statMap *expvar.Map } // NewDatabaseIndex returns a new initialized DatabaseIndex. -func NewDatabaseIndex() *DatabaseIndex { +func NewDatabaseIndex(name string) *DatabaseIndex { return &DatabaseIndex{ measurements: make(map[string]*Measurement), series: make(map[string]*Series), + name: name, + statMap: influxdb.NewStatistics("database:"+name, "database", map[string]string{"database": name}), } } @@ -103,6 +114,8 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser m.AddSeries(series) + d.statMap.Add(statDatabaseSeries, 1) + return series } @@ -113,6 +126,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem if m == nil { m = NewMeasurement(name, d) d.measurements[name] = m + d.statMap.Add(statDatabaseMeasurements, 1) } return m } @@ -311,12 +325,19 @@ func (d *DatabaseIndex) DropMeasurement(name string) { for _, s := range m.seriesByID { delete(d.series, s.Key) } + + m.drop() + + d.statMap.Add(statDatabaseSeries, int64(-len(m.seriesByID))) + d.statMap.Add(statDatabaseMeasurements, -1) } // DropSeries removes the series keys and their tags from the index func (d *DatabaseIndex) DropSeries(keys []string) { d.mu.Lock() defer d.mu.Unlock() + + var nDeleted int64 for _, k := range keys { series := d.series[k] if series == nil { @@ -324,9 +345,16 @@ func (d *DatabaseIndex) DropSeries(keys []string) { } series.measurement.DropSeries(series.id) delete(d.series, k) + nDeleted++ } + + d.statMap.Add(statDatabaseSeries, -nDeleted) } +const ( + statMeasurementSeries = "numSeries" // number of series contained in this measurement +) + // Measurement represents a collection of time series in a database. It also contains in memory // structures for indexing tags. Exported functions are goroutine safe while un-exported functions // assume the caller will use the appropriate locks @@ -341,6 +369,8 @@ type Measurement struct { measurement *Measurement seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids seriesIDs SeriesIDs // sorted list of series IDs in this measurement + + statMap *expvar.Map } // NewMeasurement allocates and initializes a new Measurement. @@ -353,6 +383,12 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement { seriesByID: make(map[uint64]*Series), seriesByTagKeyValue: make(map[string]map[string]SeriesIDs), seriesIDs: make(SeriesIDs, 0), + + statMap: influxdb.NewStatistics( + fmt.Sprintf("measurement:%s.%s", name, idx.name), + "measurement", + map[string]string{"database": idx.name, "measurement": name}, + ), } } @@ -445,6 +481,7 @@ func (m *Measurement) AddSeries(s *Series) bool { valueMap[v] = ids } + m.statMap.Add(statMeasurementSeries, 1) return true } @@ -492,9 +529,17 @@ func (m *Measurement) DropSeries(seriesID uint64) { } } + m.statMap.Add(statMeasurementSeries, -1) + return } +// drop handles any cleanup for when a measurement is dropped. +// Currently only cleans up stats. +func (m *Measurement) drop() { + m.statMap.Add(statMeasurementSeries, int64(-len(m.seriesIDs))) +} + // filters walks the where clause of a select statement and returns a map with all series ids // matching the where clause and any filter expression that should be applied to each func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr, error) { diff --git a/tsdb/meta_test.go b/tsdb/meta_test.go index 1a18b5c28f..df6bec89d6 100644 --- a/tsdb/meta_test.go +++ b/tsdb/meta_test.go @@ -157,7 +157,7 @@ func BenchmarkCreateSeriesIndex_1M(b *testing.B) { func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) { idxs := make([]*tsdb.DatabaseIndex, 0, b.N) for i := 0; i < b.N; i++ { - idxs = append(idxs, tsdb.NewDatabaseIndex()) + idxs = append(idxs, tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i))) } b.ResetTimer() diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index dd460f8378..8a798e0c95 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -29,7 +29,7 @@ func TestShardWriteAndIndex(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex() + index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") @@ -75,7 +75,7 @@ func TestShardWriteAndIndex(t *testing.T) { // ensure the index gets loaded after closing and opening the shard sh.Close() - index = tsdb.NewDatabaseIndex() + index = tsdb.NewDatabaseIndex("db") sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) @@ -99,7 +99,7 @@ func TestShardWriteAddNewField(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex() + index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") @@ -239,7 +239,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) // Create index for the shard to use. - index := tsdb.NewDatabaseIndex() + index := tsdb.NewDatabaseIndex("db") // Generate point data to write to the shard. points := []models.Point{} for _, s := range series { @@ -280,7 +280,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) // Create index for the shard to use. - index := tsdb.NewDatabaseIndex() + index := tsdb.NewDatabaseIndex("db") // Generate point data to write to the shard. points := []models.Point{} for _, s := range series { @@ -355,7 +355,7 @@ func NewShard() *Shard { return &Shard{ Shard: tsdb.NewShard(0, - tsdb.NewDatabaseIndex(), + tsdb.NewDatabaseIndex("db"), filepath.Join(path, "data"), filepath.Join(path, "wal"), opt, diff --git a/tsdb/store.go b/tsdb/store.go index f18ec0b08a..aad1579c52 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -110,7 +110,7 @@ func (s *Store) loadIndexes() error { s.Logger.Printf("Skipping database dir: %s. Not a directory", db.Name()) continue } - s.databaseIndexes[db.Name()] = NewDatabaseIndex() + s.databaseIndexes[db.Name()] = NewDatabaseIndex(db.Name()) } return nil } @@ -252,7 +252,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er // create the database index if it does not exist db, ok := s.databaseIndexes[database] if !ok { - db = NewDatabaseIndex() + db = NewDatabaseIndex(database) s.databaseIndexes[database] = db }