diff --git a/services/copier/service_test.go b/services/copier/service_test.go index a68a413b87..e4f5e2bb96 100644 --- a/services/copier/service_test.go +++ b/services/copier/service_test.go @@ -164,8 +164,10 @@ func MustOpenShard(id uint64) *Shard { sh := &Shard{ Shard: tsdb.NewShard(id, tsdb.NewDatabaseIndex("db"), - filepath.Join(path, "data"), - filepath.Join(path, "wal"), + tsdb.ShardConfig{ + Path: filepath.Join(path, "data"), + WALPath: filepath.Join(path, "wal"), + }, tsdb.NewEngineOptions(), ), path: path, diff --git a/tsdb/shard.go b/tsdb/shard.go index 3fdfb97679..8d8b9ead0a 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -48,10 +48,10 @@ var ( // Data can be split across many shards. The query engine in TSDB is responsible // for combining the output of many shards into a single query result. type Shard struct { - index *DatabaseIndex - path string - walPath string - id uint64 + index *DatabaseIndex + id uint64 + + config ShardConfig engine Engine options EngineOptions @@ -66,18 +66,39 @@ type Shard struct { LogOutput io.Writer } +// ShardConfig is passed to NewShard to specify the shard's +// database, retention policy, and location of files on disk. +type ShardConfig struct { + // Name of the database this shard belongs to + Database string + + // Name of the retention policy this shard belongs to + RetentionPolicy string + + // Path to this shard's location on disk + Path string + + // Path to this shard's WAL location + WALPath string +} + // NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index -func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard { +func NewShard(id uint64, index *DatabaseIndex, config ShardConfig, options EngineOptions) *Shard { // Configure statistics collection. - key := fmt.Sprintf("shard:%s:%d", path, id) - tags := map[string]string{"path": path, "id": fmt.Sprintf("%d", id), "engine": options.EngineVersion} + key := fmt.Sprintf("shard:%s:%d", config.Path, id) + tags := map[string]string{ + "path": config.Path, + "id": fmt.Sprintf("%d", id), + "engine": options.EngineVersion, + "database": config.Database, + "retentionPolicy": config.RetentionPolicy, + } statMap := influxdb.NewStatistics(key, "shard", tags) return &Shard{ index: index, - path: path, - walPath: walPath, id: id, + config: config, options: options, measurementFields: make(map[string]*MeasurementFields), @@ -87,7 +108,7 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti } // Path returns the path set on the shard when it was created. -func (s *Shard) Path() string { return s.path } +func (s *Shard) Path() string { return s.config.Path } // PerformMaintenance gets called periodically to have the engine perform // any maintenance tasks like WAL flushing and compaction @@ -110,7 +131,7 @@ func (s *Shard) Open() error { } // Initialize underlying engine. - e, err := NewEngine(s.path, s.walPath, s.options) + e, err := NewEngine(s.config.Path, s.config.WALPath, s.options) if err != nil { return fmt.Errorf("new engine: %s", err) } @@ -156,7 +177,7 @@ func (s *Shard) close() error { func (s *Shard) DiskSize() (int64, error) { s.mu.RLock() defer s.mu.RUnlock() - stats, err := os.Stat(s.path) + stats, err := os.Stat(s.config.Path) if err != nil { return 0, err } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 8a798e0c95..db78179426 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -33,9 +33,9 @@ func TestShardWriteAndIndex(t *testing.T) { opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts) if err := sh.Open(); err != nil { - t.Fatalf("error openeing shard: %s", err.Error()) + t.Fatalf("error opening shard: %s", err.Error()) } pt := models.MustNewPoint( @@ -76,9 +76,9 @@ func TestShardWriteAndIndex(t *testing.T) { sh.Close() index = tsdb.NewDatabaseIndex("db") - sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh = tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts) if err := sh.Open(); err != nil { - t.Fatalf("error openeing shard: %s", err.Error()) + t.Fatalf("error opening shard: %s", err.Error()) } validateIndex() @@ -103,9 +103,9 @@ func TestShardWriteAddNewField(t *testing.T) { opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts) if err := sh.Open(); err != nil { - t.Fatalf("error openeing shard: %s", err.Error()) + t.Fatalf("error opening shard: %s", err.Error()) } defer sh.Close() @@ -258,7 +258,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { tmpDir, _ := ioutil.TempDir("", "shard_test") tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions()) shard.Open() b.StartTimer() @@ -294,7 +294,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt defer os.RemoveAll(tmpDir) tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions()) shard.Open() defer shard.Close() chunkedWrite(shard, points) @@ -356,8 +356,10 @@ func NewShard() *Shard { return &Shard{ Shard: tsdb.NewShard(0, tsdb.NewDatabaseIndex("db"), - filepath.Join(path, "data"), - filepath.Join(path, "wal"), + tsdb.ShardConfig{ + Path: filepath.Join(path, "data"), + WALPath: filepath.Join(path, "wal"), + }, opt, ), path: path, diff --git a/tsdb/store.go b/tsdb/store.go index 6340df6d10..7b0a75c209 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -36,12 +36,8 @@ type Store struct { databaseIndexes map[string]*DatabaseIndex - // shardLocations is a map of shard IDs to both the associated - // Shard, and meta information about where the shard is located on - // disk. - // - // shardLocations stores mappings for all shards on all databases. - shardLocations map[uint64]*shardLocation + // shards is a map of shard IDs to the associated Shard. + shards map[uint64]*Shard EngineOptions EngineOptions Logger *log.Logger @@ -75,7 +71,7 @@ func (s *Store) Open() error { s.closing = make(chan struct{}) - s.shardLocations = map[uint64]*shardLocation{} + s.shards = map[uint64]*Shard{} s.databaseIndexes = map[string]*DatabaseIndex{} s.Logger.Printf("Using data dir: %v", s.Path()) @@ -145,13 +141,19 @@ func (s *Store) loadShards() error { continue } - shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions) + sc := ShardConfig{ + Path: path, + WALPath: walPath, + Database: db, + RetentionPolicy: rp.Name(), + } + shard := NewShard(shardID, s.databaseIndexes[db], sc, s.EngineOptions) err = shard.Open() if err != nil { return fmt.Errorf("failed to open shard %d: %s", shardID, err) } - s.shardLocations[shardID] = &shardLocation{Database: db, RetentionPolicy: rp.Name(), Shard: shard} + s.shards[shardID] = shard } } } @@ -170,13 +172,13 @@ func (s *Store) Close() error { } s.wg.Wait() - for _, sl := range s.shardLocations { - if err := sl.Shard.Close(); err != nil { + for _, sh := range s.shards { + if err := sh.Close(); err != nil { return err } } s.opened = false - s.shardLocations = nil + s.shards = nil s.databaseIndexes = nil return nil @@ -193,11 +195,11 @@ func (s *Store) DatabaseIndexN() int { func (s *Store) Shard(id uint64) *Shard { s.mu.RLock() defer s.mu.RUnlock() - sl, ok := s.shardLocations[id] + sh, ok := s.shards[id] if !ok { return nil } - return sl.Shard + return sh } // Shards returns a list of shards by id. @@ -206,11 +208,11 @@ func (s *Store) Shards(ids []uint64) []*Shard { defer s.mu.RUnlock() a := make([]*Shard, 0, len(ids)) for _, id := range ids { - sl, ok := s.shardLocations[id] + sh, ok := s.shards[id] if !ok { continue } - a = append(a, sl.Shard) + a = append(a, sh) } return a } @@ -219,7 +221,7 @@ func (s *Store) Shards(ids []uint64) []*Shard { func (s *Store) ShardN() int { s.mu.RLock() defer s.mu.RUnlock() - return len(s.shardLocations) + return len(s.shards) } // CreateShard creates a shard with the given id and retention policy on a database. @@ -234,7 +236,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er } // shard already exists - if _, ok := s.shardLocations[shardID]; ok { + if _, ok := s.shards[shardID]; ok { return nil } @@ -256,13 +258,18 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er s.databaseIndexes[database] = db } - shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) - shard := NewShard(shardID, db, shardPath, walPath, s.EngineOptions) + sc := ShardConfig{ + Path: filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)), + WALPath: walPath, + Database: database, + RetentionPolicy: retentionPolicy, + } + shard := NewShard(shardID, db, sc, s.EngineOptions) if err := shard.Open(); err != nil { return err } - s.shardLocations[shardID] = &shardLocation{Database: database, RetentionPolicy: retentionPolicy, Shard: shard} + s.shards[shardID] = shard return nil } @@ -278,24 +285,24 @@ func (s *Store) DeleteShard(shardID uint64) error { // to handle locks appropriately. func (s *Store) deleteShard(shardID uint64) error { // ensure shard exists - sl, ok := s.shardLocations[shardID] + sh, ok := s.shards[shardID] if !ok { return nil } - if err := sl.Shard.Close(); err != nil { + if err := sh.Close(); err != nil { return err } - if err := os.RemoveAll(sl.Shard.path); err != nil { + if err := os.RemoveAll(sh.config.Path); err != nil { return err } - if err := os.RemoveAll(sl.Shard.walPath); err != nil { + if err := os.RemoveAll(sh.config.WALPath); err != nil { return err } - delete(s.shardLocations, shardID) + delete(s.shards, shardID) return nil } @@ -314,8 +321,8 @@ func (s *Store) DeleteDatabase(name string) error { defer s.mu.Unlock() // Close and delete all shards on the database. - for shardID, location := range s.shardLocations { - if location.IsDatabase(name) { + for shardID, sh := range s.shards { + if sh.config.Database == name { // Delete the shard from disk. if err := s.deleteShard(shardID); err != nil { return err @@ -343,8 +350,8 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error { // Close and delete all shards under the retention policy on the // database. - for shardID, location := range s.shardLocations { - if location.IsDatabase(database) && location.IsRetentionPolicy(name) { + for shardID, sh := range s.shards { + if sh.config.Database == database && sh.config.RetentionPolicy == name { // Delete the shard from disk. if err := s.deleteShard(shardID); err != nil { return err @@ -382,12 +389,12 @@ func (s *Store) DeleteMeasurement(database, name string) error { db.DropMeasurement(m.Name) // Remove underlying data. - for _, sl := range s.shardLocations { - if !sl.IsDatabase(database) { + for _, sh := range s.shards { + if sh.config.Database != database { continue } - if err := sl.Shard.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil { + if err := sh.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil { return err } } @@ -403,8 +410,8 @@ func (s *Store) ShardIDs() []uint64 { } func (s *Store) shardIDs() []uint64 { - a := make([]uint64, 0, len(s.shardLocations)) - for shardID := range s.shardLocations { + a := make([]uint64, 0, len(s.shards)) + for shardID := range s.shards { a = append(a, shardID) } return a @@ -412,9 +419,9 @@ func (s *Store) shardIDs() []uint64 { // shardsSlice returns an ordered list of shards. func (s *Store) shardsSlice() []*Shard { - a := make([]*Shard, 0, len(s.shardLocations)) - for _, sl := range s.shardLocations { - a = append(a, sl.Shard) + a := make([]*Shard, 0, len(s.shards)) + for _, sh := range s.shards { + a = append(a, sh) } sort.Sort(Shards(a)) return a @@ -472,7 +479,7 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { return fmt.Errorf("shard %d doesn't exist on this server", id) } - path, err := relativePath(s.path, shard.path) + path, err := relativePath(s.path, shard.config.Path) if err != nil { return err } @@ -486,7 +493,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { if shard == nil { return "", fmt.Errorf("shard %d doesn't exist on this server", id) } - return relativePath(s.path, shard.path) + return relativePath(s.path, shard.config.Path) } // DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys @@ -560,11 +567,11 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error { return influxql.ErrDatabaseNotFound(database) } - for _, sl := range s.shardLocations { - if !sl.IsDatabase(database) { + for _, sh := range s.shards { + if sh.config.Database != database { continue } - if err := sl.Shard.DeleteSeries(seriesKeys); err != nil { + if err := sh.DeleteSeries(seriesKeys); err != nil { return err } } @@ -592,8 +599,8 @@ func (s *Store) periodicMaintenance() { func (s *Store) performMaintenance() { s.mu.Lock() defer s.mu.Unlock() - for _, sl := range s.shardLocations { - s.performMaintenanceOnShard(sl.Shard) + for _, sh := range s.shards { + s.performMaintenanceOnShard(sh) } } @@ -676,12 +683,12 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { default: } - sl, ok := s.shardLocations[shardID] + sh, ok := s.shards[shardID] if !ok { return ErrShardNotFound } - return sl.Shard.WritePoints(points) + return sh.WritePoints(points) } func (s *Store) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) { @@ -938,30 +945,6 @@ func (s *Store) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatem return rows, nil } -// shardLocation is a wrapper around a shard that provides extra -// information about which database and retention policy the shard -// belongs to. -// -// shardLocation is safe for use from multiple goroutines. -type shardLocation struct { - mu sync.RWMutex - Database string - RetentionPolicy string - Shard *Shard -} - -func (s *shardLocation) IsDatabase(db string) bool { - s.mu.RLock() - defer s.mu.RUnlock() - return s.Database == db -} - -func (s *shardLocation) IsRetentionPolicy(rp string) bool { - s.mu.RLock() - defer s.mu.RUnlock() - return s.RetentionPolicy == rp -} - // IsRetryable returns true if this error is temporary and could be retried func IsRetryable(err error) bool { if err == nil {