Tag TSM stats with database, retention policy

... by extracting the db/rp from the given path.

Now that the code has "standardized" on extracting db/rp this way, the
ShardLocation struct is no longer necessary and thus has been removed.
We're back on the previous style of passing the path and walPath to
NewShard.
pull/5844/head
Mark Rushakoff 2016-02-26 11:41:54 -08:00
parent 42eb2db665
commit cdcb079769
9 changed files with 79 additions and 70 deletions

View File

@ -18,6 +18,7 @@
- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup cleanup
- [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped.
- [#5758](https://github.com/influxdata/influxdb/pull/5758): TSM engine stats for cache, WAL, and filestore. Thanks @jonseymour
- [#5844](https://github.com/influxdata/influxdb/pull/5844): Tag TSM engine stats with database and retention policy
### Bugfixes

View File

@ -164,10 +164,8 @@ func MustOpenShard(id uint64) *Shard {
sh := &Shard{
Shard: tsdb.NewShard(id,
tsdb.NewDatabaseIndex("db"),
tsdb.ShardConfig{
Path: filepath.Join(path, "data"),
WALPath: filepath.Join(path, "wal"),
},
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
tsdb.NewEngineOptions(),
),
path: path,

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)
var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded")
@ -113,10 +114,15 @@ type Cache struct {
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
// Only used for engine caches, never for snapshots
func NewCache(maxSize uint64, path string) *Cache {
db, rp := tsdb.DecodeStorePath(path)
c := &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics(
"tsm1_cache:"+path,
"tsm1_cache",
map[string]string{"path": path, "database": db, "retentionPolicy": rp},
),
lastSnapshot: time.Now(),
}
c.UpdateAge()

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)
type TSMFile interface {
@ -124,11 +125,16 @@ func (f FileStat) ContainsKey(key string) bool {
}
func NewFileStore(dir string) *FileStore {
db, rp := tsdb.DecodeStorePath(dir)
return &FileStore{
dir: dir,
lastModified: time.Now(),
Logger: log.New(os.Stderr, "[filestore] ", log.LstdFlags),
statMap: influxdb.NewStatistics("tsm1_filestore:"+dir, "tsm1_filestore", map[string]string{"path": dir}),
statMap: influxdb.NewStatistics(
"tsm1_filestore:"+dir,
"tsm1_filestore",
map[string]string{"path": dir, "database": db, "retentionPolicy": rp},
),
}
}

View File

@ -18,6 +18,7 @@ import (
"github.com/golang/snappy"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)
const (
@ -89,6 +90,7 @@ type WAL struct {
}
func NewWAL(path string) *WAL {
db, rp := tsdb.DecodeStorePath(path)
return &WAL{
path: path,
@ -98,7 +100,11 @@ func NewWAL(path string) *WAL {
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
closing: make(chan struct{}),
statMap: influxdb.NewStatistics("tsm1_wal:"+path, "tsm1_wal", map[string]string{"path": path}),
statMap: influxdb.NewStatistics(
"tsm1_wal:"+path,
"tsm1_wal",
map[string]string{"path": path, "database": db, "retentionPolicy": rp},
),
}
}

View File

@ -48,10 +48,13 @@ var (
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
type Shard struct {
index *DatabaseIndex
id uint64
index *DatabaseIndex
path string
walPath string
id uint64
config ShardConfig
database string
retentionPolicy string
engine Engine
options EngineOptions
@ -66,49 +69,38 @@ type Shard struct {
LogOutput io.Writer
}
// ShardConfig is passed to NewShard to specify the shard's
// database, retention policy, and location of files on disk.
type ShardConfig struct {
// Name of the database this shard belongs to
Database string
// Name of the retention policy this shard belongs to
RetentionPolicy string
// Path to this shard's location on disk
Path string
// Path to this shard's WAL location
WALPath string
}
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func NewShard(id uint64, index *DatabaseIndex, config ShardConfig, options EngineOptions) *Shard {
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard {
// Configure statistics collection.
key := fmt.Sprintf("shard:%s:%d", config.Path, id)
key := fmt.Sprintf("shard:%s:%d", path, id)
db, rp := DecodeStorePath(path)
tags := map[string]string{
"path": config.Path,
"path": path,
"id": fmt.Sprintf("%d", id),
"engine": options.EngineVersion,
"database": config.Database,
"retentionPolicy": config.RetentionPolicy,
"database": db,
"retentionPolicy": rp,
}
statMap := influxdb.NewStatistics(key, "shard", tags)
return &Shard{
index: index,
id: id,
config: config,
path: path,
walPath: walPath,
options: options,
measurementFields: make(map[string]*MeasurementFields),
database: db,
retentionPolicy: rp,
statMap: statMap,
LogOutput: os.Stderr,
}
}
// Path returns the path set on the shard when it was created.
func (s *Shard) Path() string { return s.config.Path }
func (s *Shard) Path() string { return s.path }
// PerformMaintenance gets called periodically to have the engine perform
// any maintenance tasks like WAL flushing and compaction
@ -131,7 +123,7 @@ func (s *Shard) Open() error {
}
// Initialize underlying engine.
e, err := NewEngine(s.config.Path, s.config.WALPath, s.options)
e, err := NewEngine(s.path, s.walPath, s.options)
if err != nil {
return fmt.Errorf("new engine: %s", err)
}
@ -175,9 +167,7 @@ func (s *Shard) close() error {
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
stats, err := os.Stat(s.config.Path)
stats, err := os.Stat(s.path)
if err != nil {
return 0, err
}

View File

@ -33,7 +33,7 @@ func TestShardWriteAndIndex(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -76,7 +76,7 @@ func TestShardWriteAndIndex(t *testing.T) {
sh.Close()
index = tsdb.NewDatabaseIndex("db")
sh = tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -103,7 +103,7 @@ func TestShardWriteAddNewField(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -258,7 +258,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard.Open()
b.StartTimer()
@ -294,7 +294,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard.Open()
defer shard.Close()
chunkedWrite(shard, points)
@ -356,10 +356,8 @@ func NewShard() *Shard {
return &Shard{
Shard: tsdb.NewShard(0,
tsdb.NewDatabaseIndex("db"),
tsdb.ShardConfig{
Path: filepath.Join(path, "data"),
WALPath: filepath.Join(path, "wal"),
},
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
opt,
),
path: path,

View File

@ -141,13 +141,7 @@ func (s *Store) loadShards() error {
continue
}
sc := ShardConfig{
Path: path,
WALPath: walPath,
Database: db,
RetentionPolicy: rp.Name(),
}
shard := NewShard(shardID, s.databaseIndexes[db], sc, s.EngineOptions)
shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions)
err = shard.Open()
if err != nil {
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
@ -258,13 +252,8 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
s.databaseIndexes[database] = db
}
sc := ShardConfig{
Path: filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)),
WALPath: walPath,
Database: database,
RetentionPolicy: retentionPolicy,
}
shard := NewShard(shardID, db, sc, s.EngineOptions)
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, db, path, walPath, s.EngineOptions)
if err := shard.Open(); err != nil {
return err
}
@ -294,11 +283,11 @@ func (s *Store) deleteShard(shardID uint64) error {
return err
}
if err := os.RemoveAll(sh.config.Path); err != nil {
if err := os.RemoveAll(sh.path); err != nil {
return err
}
if err := os.RemoveAll(sh.config.WALPath); err != nil {
if err := os.RemoveAll(sh.walPath); err != nil {
return err
}
@ -322,7 +311,7 @@ func (s *Store) DeleteDatabase(name string) error {
// Close and delete all shards on the database.
for shardID, sh := range s.shards {
if sh.config.Database == name {
if sh.database == name {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
@ -351,7 +340,7 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
// Close and delete all shards under the retention policy on the
// database.
for shardID, sh := range s.shards {
if sh.config.Database == database && sh.config.RetentionPolicy == name {
if sh.database == database && sh.retentionPolicy == name {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
@ -390,7 +379,7 @@ func (s *Store) DeleteMeasurement(database, name string) error {
// Remove underlying data.
for _, sh := range s.shards {
if sh.config.Database != database {
if sh.database != database {
continue
}
@ -479,7 +468,7 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}
path, err := relativePath(s.path, shard.config.Path)
path, err := relativePath(s.path, shard.path)
if err != nil {
return err
}
@ -493,7 +482,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
if shard == nil {
return "", fmt.Errorf("shard %d doesn't exist on this server", id)
}
return relativePath(s.path, shard.config.Path)
return relativePath(s.path, shard.path)
}
// DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
@ -568,7 +557,7 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error {
}
for _, sh := range s.shards {
if sh.config.Database != database {
if sh.database != database {
continue
}
if err := sh.DeleteSeries(seriesKeys); err != nil {
@ -957,6 +946,20 @@ func IsRetryable(err error) bool {
return true
}
// DecodeStorePath extracts the database and retention policy names
// from a given shard or WAL path.
func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {
// shardOrWALPath format: /maybe/absolute/base/then/:database/:retentionPolicy/:nameOfShardOrWAL
// Discard the last part of the path (the shard name or the wal name).
path, _ := filepath.Split(filepath.Clean(shardOrWALPath))
// Extract the database and retention policy.
path, rp := filepath.Split(filepath.Clean(path))
_, db := filepath.Split(filepath.Clean(path))
return db, rp
}
// relativePath will expand out the full paths passed in and return
// the relative shard path from the store
func relativePath(storePath, shardPath string) (string, error) {

View File

@ -344,6 +344,7 @@ func (s *Store) Reopen() error {
return err
}
s.Store = tsdb.NewStore(s.Path())
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
return s.Open()
}