Merge pull request #5810 from influxdata/mr-shard-diagnostics-tags

Add database, RP as tags on shard stats
pull/5841/head
Mark Rushakoff 2016-02-25 13:53:26 -08:00
commit 56aebebc87
4 changed files with 103 additions and 95 deletions

View File

@ -164,8 +164,10 @@ func MustOpenShard(id uint64) *Shard {
sh := &Shard{
Shard: tsdb.NewShard(id,
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
tsdb.ShardConfig{
Path: filepath.Join(path, "data"),
WALPath: filepath.Join(path, "wal"),
},
tsdb.NewEngineOptions(),
),
path: path,

View File

@ -48,10 +48,10 @@ var (
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
type Shard struct {
index *DatabaseIndex
path string
walPath string
id uint64
index *DatabaseIndex
id uint64
config ShardConfig
engine Engine
options EngineOptions
@ -66,18 +66,39 @@ type Shard struct {
LogOutput io.Writer
}
// ShardConfig is passed to NewShard to specify the shard's
// database, retention policy, and location of files on disk.
type ShardConfig struct {
// Name of the database this shard belongs to
Database string
// Name of the retention policy this shard belongs to
RetentionPolicy string
// Path to this shard's location on disk
Path string
// Path to this shard's WAL location
WALPath string
}
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard {
func NewShard(id uint64, index *DatabaseIndex, config ShardConfig, options EngineOptions) *Shard {
// Configure statistics collection.
key := fmt.Sprintf("shard:%s:%d", path, id)
tags := map[string]string{"path": path, "id": fmt.Sprintf("%d", id), "engine": options.EngineVersion}
key := fmt.Sprintf("shard:%s:%d", config.Path, id)
tags := map[string]string{
"path": config.Path,
"id": fmt.Sprintf("%d", id),
"engine": options.EngineVersion,
"database": config.Database,
"retentionPolicy": config.RetentionPolicy,
}
statMap := influxdb.NewStatistics(key, "shard", tags)
return &Shard{
index: index,
path: path,
walPath: walPath,
id: id,
config: config,
options: options,
measurementFields: make(map[string]*MeasurementFields),
@ -87,7 +108,7 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
}
// Path returns the path set on the shard when it was created.
func (s *Shard) Path() string { return s.path }
func (s *Shard) Path() string { return s.config.Path }
// PerformMaintenance gets called periodically to have the engine perform
// any maintenance tasks like WAL flushing and compaction
@ -110,7 +131,7 @@ func (s *Shard) Open() error {
}
// Initialize underlying engine.
e, err := NewEngine(s.path, s.walPath, s.options)
e, err := NewEngine(s.config.Path, s.config.WALPath, s.options)
if err != nil {
return fmt.Errorf("new engine: %s", err)
}
@ -156,7 +177,7 @@ func (s *Shard) close() error {
func (s *Shard) DiskSize() (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
stats, err := os.Stat(s.path)
stats, err := os.Stat(s.config.Path)
if err != nil {
return 0, err
}

View File

@ -33,9 +33,9 @@ func TestShardWriteAndIndex(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
t.Fatalf("error opening shard: %s", err.Error())
}
pt := models.MustNewPoint(
@ -76,9 +76,9 @@ func TestShardWriteAndIndex(t *testing.T) {
sh.Close()
index = tsdb.NewDatabaseIndex("db")
sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh = tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
t.Fatalf("error opening shard: %s", err.Error())
}
validateIndex()
@ -103,9 +103,9 @@ func TestShardWriteAddNewField(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()
@ -258,7 +258,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions())
shard.Open()
b.StartTimer()
@ -294,7 +294,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions())
shard.Open()
defer shard.Close()
chunkedWrite(shard, points)
@ -356,8 +356,10 @@ func NewShard() *Shard {
return &Shard{
Shard: tsdb.NewShard(0,
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
tsdb.ShardConfig{
Path: filepath.Join(path, "data"),
WALPath: filepath.Join(path, "wal"),
},
opt,
),
path: path,

View File

@ -36,12 +36,8 @@ type Store struct {
databaseIndexes map[string]*DatabaseIndex
// shardLocations is a map of shard IDs to both the associated
// Shard, and meta information about where the shard is located on
// disk.
//
// shardLocations stores mappings for all shards on all databases.
shardLocations map[uint64]*shardLocation
// shards is a map of shard IDs to the associated Shard.
shards map[uint64]*Shard
EngineOptions EngineOptions
Logger *log.Logger
@ -75,7 +71,7 @@ func (s *Store) Open() error {
s.closing = make(chan struct{})
s.shardLocations = map[uint64]*shardLocation{}
s.shards = map[uint64]*Shard{}
s.databaseIndexes = map[string]*DatabaseIndex{}
s.Logger.Printf("Using data dir: %v", s.Path())
@ -145,13 +141,19 @@ func (s *Store) loadShards() error {
continue
}
shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions)
sc := ShardConfig{
Path: path,
WALPath: walPath,
Database: db,
RetentionPolicy: rp.Name(),
}
shard := NewShard(shardID, s.databaseIndexes[db], sc, s.EngineOptions)
err = shard.Open()
if err != nil {
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
}
s.shardLocations[shardID] = &shardLocation{Database: db, RetentionPolicy: rp.Name(), Shard: shard}
s.shards[shardID] = shard
}
}
}
@ -170,13 +172,13 @@ func (s *Store) Close() error {
}
s.wg.Wait()
for _, sl := range s.shardLocations {
if err := sl.Shard.Close(); err != nil {
for _, sh := range s.shards {
if err := sh.Close(); err != nil {
return err
}
}
s.opened = false
s.shardLocations = nil
s.shards = nil
s.databaseIndexes = nil
return nil
@ -193,11 +195,11 @@ func (s *Store) DatabaseIndexN() int {
func (s *Store) Shard(id uint64) *Shard {
s.mu.RLock()
defer s.mu.RUnlock()
sl, ok := s.shardLocations[id]
sh, ok := s.shards[id]
if !ok {
return nil
}
return sl.Shard
return sh
}
// Shards returns a list of shards by id.
@ -206,11 +208,11 @@ func (s *Store) Shards(ids []uint64) []*Shard {
defer s.mu.RUnlock()
a := make([]*Shard, 0, len(ids))
for _, id := range ids {
sl, ok := s.shardLocations[id]
sh, ok := s.shards[id]
if !ok {
continue
}
a = append(a, sl.Shard)
a = append(a, sh)
}
return a
}
@ -219,7 +221,7 @@ func (s *Store) Shards(ids []uint64) []*Shard {
func (s *Store) ShardN() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.shardLocations)
return len(s.shards)
}
// CreateShard creates a shard with the given id and retention policy on a database.
@ -234,7 +236,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
}
// shard already exists
if _, ok := s.shardLocations[shardID]; ok {
if _, ok := s.shards[shardID]; ok {
return nil
}
@ -256,13 +258,18 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
s.databaseIndexes[database] = db
}
shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, db, shardPath, walPath, s.EngineOptions)
sc := ShardConfig{
Path: filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)),
WALPath: walPath,
Database: database,
RetentionPolicy: retentionPolicy,
}
shard := NewShard(shardID, db, sc, s.EngineOptions)
if err := shard.Open(); err != nil {
return err
}
s.shardLocations[shardID] = &shardLocation{Database: database, RetentionPolicy: retentionPolicy, Shard: shard}
s.shards[shardID] = shard
return nil
}
@ -278,24 +285,24 @@ func (s *Store) DeleteShard(shardID uint64) error {
// to handle locks appropriately.
func (s *Store) deleteShard(shardID uint64) error {
// ensure shard exists
sl, ok := s.shardLocations[shardID]
sh, ok := s.shards[shardID]
if !ok {
return nil
}
if err := sl.Shard.Close(); err != nil {
if err := sh.Close(); err != nil {
return err
}
if err := os.RemoveAll(sl.Shard.path); err != nil {
if err := os.RemoveAll(sh.config.Path); err != nil {
return err
}
if err := os.RemoveAll(sl.Shard.walPath); err != nil {
if err := os.RemoveAll(sh.config.WALPath); err != nil {
return err
}
delete(s.shardLocations, shardID)
delete(s.shards, shardID)
return nil
}
@ -314,8 +321,8 @@ func (s *Store) DeleteDatabase(name string) error {
defer s.mu.Unlock()
// Close and delete all shards on the database.
for shardID, location := range s.shardLocations {
if location.IsDatabase(name) {
for shardID, sh := range s.shards {
if sh.config.Database == name {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
@ -343,8 +350,8 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
// Close and delete all shards under the retention policy on the
// database.
for shardID, location := range s.shardLocations {
if location.IsDatabase(database) && location.IsRetentionPolicy(name) {
for shardID, sh := range s.shards {
if sh.config.Database == database && sh.config.RetentionPolicy == name {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
@ -382,12 +389,12 @@ func (s *Store) DeleteMeasurement(database, name string) error {
db.DropMeasurement(m.Name)
// Remove underlying data.
for _, sl := range s.shardLocations {
if !sl.IsDatabase(database) {
for _, sh := range s.shards {
if sh.config.Database != database {
continue
}
if err := sl.Shard.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil {
if err := sh.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil {
return err
}
}
@ -403,8 +410,8 @@ func (s *Store) ShardIDs() []uint64 {
}
func (s *Store) shardIDs() []uint64 {
a := make([]uint64, 0, len(s.shardLocations))
for shardID := range s.shardLocations {
a := make([]uint64, 0, len(s.shards))
for shardID := range s.shards {
a = append(a, shardID)
}
return a
@ -412,9 +419,9 @@ func (s *Store) shardIDs() []uint64 {
// shardsSlice returns an ordered list of shards.
func (s *Store) shardsSlice() []*Shard {
a := make([]*Shard, 0, len(s.shardLocations))
for _, sl := range s.shardLocations {
a = append(a, sl.Shard)
a := make([]*Shard, 0, len(s.shards))
for _, sh := range s.shards {
a = append(a, sh)
}
sort.Sort(Shards(a))
return a
@ -472,7 +479,7 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}
path, err := relativePath(s.path, shard.path)
path, err := relativePath(s.path, shard.config.Path)
if err != nil {
return err
}
@ -486,7 +493,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
if shard == nil {
return "", fmt.Errorf("shard %d doesn't exist on this server", id)
}
return relativePath(s.path, shard.path)
return relativePath(s.path, shard.config.Path)
}
// DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
@ -560,11 +567,11 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error {
return influxql.ErrDatabaseNotFound(database)
}
for _, sl := range s.shardLocations {
if !sl.IsDatabase(database) {
for _, sh := range s.shards {
if sh.config.Database != database {
continue
}
if err := sl.Shard.DeleteSeries(seriesKeys); err != nil {
if err := sh.DeleteSeries(seriesKeys); err != nil {
return err
}
}
@ -592,8 +599,8 @@ func (s *Store) periodicMaintenance() {
func (s *Store) performMaintenance() {
s.mu.Lock()
defer s.mu.Unlock()
for _, sl := range s.shardLocations {
s.performMaintenanceOnShard(sl.Shard)
for _, sh := range s.shards {
s.performMaintenanceOnShard(sh)
}
}
@ -676,12 +683,12 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
default:
}
sl, ok := s.shardLocations[shardID]
sh, ok := s.shards[shardID]
if !ok {
return ErrShardNotFound
}
return sl.Shard.WritePoints(points)
return sh.WritePoints(points)
}
func (s *Store) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) {
@ -938,30 +945,6 @@ func (s *Store) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatem
return rows, nil
}
// shardLocation is a wrapper around a shard that provides extra
// information about which database and retention policy the shard
// belongs to.
//
// shardLocation is safe for use from multiple goroutines.
type shardLocation struct {
mu sync.RWMutex
Database string
RetentionPolicy string
Shard *Shard
}
func (s *shardLocation) IsDatabase(db string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.Database == db
}
func (s *shardLocation) IsRetentionPolicy(rp string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.RetentionPolicy == rp
}
// IsRetryable returns true if this error is temporary and could be retried
func IsRetryable(err error) bool {
if err == nil {