Track stats for number of series, measurements

Per database: track number of series and measurements
Per measurement: track number of series
pull/5816/head
Mark Rushakoff 2016-02-23 16:24:56 -08:00
parent 3cdb4c1c12
commit fb83374389
6 changed files with 60 additions and 15 deletions

View File

@ -163,7 +163,7 @@ func MustOpenShard(id uint64) *Shard {
sh := &Shard{
Shard: tsdb.NewShard(id,
tsdb.NewDatabaseIndex(),
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
tsdb.NewEngineOptions(),

View File

@ -33,7 +33,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Load metadata index.
index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
@ -56,7 +56,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Load metadata index.
index = tsdb.NewDatabaseIndex()
index = tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
@ -81,7 +81,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Load metadata index.
index = tsdb.NewDatabaseIndex()
index = tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
@ -521,7 +521,7 @@ func MustOpenEngine() *Engine {
if err := e.Open(); err != nil {
panic(err)
}
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex(), make(map[string]*tsdb.MeasurementFields)); err != nil {
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db"), make(map[string]*tsdb.MeasurementFields)); err != nil {
panic(err)
}
return e

View File

@ -1,6 +1,7 @@
package tsdb
import (
"expvar"
"fmt"
"regexp"
"sort"
@ -8,6 +9,7 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/pkg/escape"
"github.com/influxdata/influxdb/tsdb/internal"
@ -19,6 +21,9 @@ import (
const (
maxStringLength = 64 * 1024
statDatabaseSeries = "numSeries" // number of series in this database
statDatabaseMeasurements = "numMeasurements" // number of measurements in this database
)
// DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags.
@ -29,13 +34,19 @@ type DatabaseIndex struct {
measurements map[string]*Measurement // measurement name to object and index
series map[string]*Series // map series key to the Series object
lastID uint64 // last used series ID. They're in memory only for this shard
name string // name of the database represented by this index
statMap *expvar.Map
}
// NewDatabaseIndex returns a new initialized DatabaseIndex.
func NewDatabaseIndex() *DatabaseIndex {
func NewDatabaseIndex(name string) *DatabaseIndex {
return &DatabaseIndex{
measurements: make(map[string]*Measurement),
series: make(map[string]*Series),
name: name,
statMap: influxdb.NewStatistics("database:"+name, "database", map[string]string{"database": name}),
}
}
@ -103,6 +114,8 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
m.AddSeries(series)
d.statMap.Add(statDatabaseSeries, 1)
return series
}
@ -113,6 +126,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
if m == nil {
m = NewMeasurement(name, d)
d.measurements[name] = m
d.statMap.Add(statDatabaseMeasurements, 1)
}
return m
}
@ -311,12 +325,19 @@ func (d *DatabaseIndex) DropMeasurement(name string) {
for _, s := range m.seriesByID {
delete(d.series, s.Key)
}
m.drop()
d.statMap.Add(statDatabaseSeries, int64(-len(m.seriesByID)))
d.statMap.Add(statDatabaseMeasurements, -1)
}
// DropSeries removes the series keys and their tags from the index
func (d *DatabaseIndex) DropSeries(keys []string) {
d.mu.Lock()
defer d.mu.Unlock()
var nDeleted int64
for _, k := range keys {
series := d.series[k]
if series == nil {
@ -324,9 +345,16 @@ func (d *DatabaseIndex) DropSeries(keys []string) {
}
series.measurement.DropSeries(series.id)
delete(d.series, k)
nDeleted++
}
d.statMap.Add(statDatabaseSeries, -nDeleted)
}
const (
statMeasurementSeries = "numSeries" // number of series contained in this measurement
)
// Measurement represents a collection of time series in a database. It also contains in memory
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
// assume the caller will use the appropriate locks
@ -341,6 +369,8 @@ type Measurement struct {
measurement *Measurement
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
seriesIDs SeriesIDs // sorted list of series IDs in this measurement
statMap *expvar.Map
}
// NewMeasurement allocates and initializes a new Measurement.
@ -353,6 +383,12 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
seriesByID: make(map[uint64]*Series),
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
seriesIDs: make(SeriesIDs, 0),
statMap: influxdb.NewStatistics(
fmt.Sprintf("measurement:%s.%s", name, idx.name),
"measurement",
map[string]string{"database": idx.name, "measurement": name},
),
}
}
@ -445,6 +481,7 @@ func (m *Measurement) AddSeries(s *Series) bool {
valueMap[v] = ids
}
m.statMap.Add(statMeasurementSeries, 1)
return true
}
@ -492,9 +529,17 @@ func (m *Measurement) DropSeries(seriesID uint64) {
}
}
m.statMap.Add(statMeasurementSeries, -1)
return
}
// drop handles any cleanup for when a measurement is dropped.
// Currently only cleans up stats.
func (m *Measurement) drop() {
m.statMap.Add(statMeasurementSeries, int64(-len(m.seriesIDs)))
}
// filters walks the where clause of a select statement and returns a map with all series ids
// matching the where clause and any filter expression that should be applied to each
func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr, error) {

View File

@ -157,7 +157,7 @@ func BenchmarkCreateSeriesIndex_1M(b *testing.B) {
func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) {
idxs := make([]*tsdb.DatabaseIndex, 0, b.N)
for i := 0; i < b.N; i++ {
idxs = append(idxs, tsdb.NewDatabaseIndex())
idxs = append(idxs, tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i)))
}
b.ResetTimer()

View File

@ -29,7 +29,7 @@ func TestShardWriteAndIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
@ -75,7 +75,7 @@ func TestShardWriteAndIndex(t *testing.T) {
// ensure the index gets loaded after closing and opening the shard
sh.Close()
index = tsdb.NewDatabaseIndex()
index = tsdb.NewDatabaseIndex("db")
sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
@ -99,7 +99,7 @@ func TestShardWriteAddNewField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
@ -239,7 +239,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Create index for the shard to use.
index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
// Generate point data to write to the shard.
points := []models.Point{}
for _, s := range series {
@ -280,7 +280,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Create index for the shard to use.
index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
// Generate point data to write to the shard.
points := []models.Point{}
for _, s := range series {
@ -355,7 +355,7 @@ func NewShard() *Shard {
return &Shard{
Shard: tsdb.NewShard(0,
tsdb.NewDatabaseIndex(),
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
opt,

View File

@ -110,7 +110,7 @@ func (s *Store) loadIndexes() error {
s.Logger.Printf("Skipping database dir: %s. Not a directory", db.Name())
continue
}
s.databaseIndexes[db.Name()] = NewDatabaseIndex()
s.databaseIndexes[db.Name()] = NewDatabaseIndex(db.Name())
}
return nil
}
@ -252,7 +252,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
// create the database index if it does not exist
db, ok := s.databaseIndexes[database]
if !ok {
db = NewDatabaseIndex()
db = NewDatabaseIndex(database)
s.databaseIndexes[database] = db
}