From 2171d9471bdae3d7ad5747942793bd854abede19 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 1 Sep 2016 13:40:16 +0100 Subject: [PATCH] Initialise index in shards --- cmd/influxd/run/server.go | 13 +- cmd/influxd/run/server_test.go | 2 +- tsdb/shard.go | 50 +++-- tsdb/shard_test.go | 81 ++++---- tsdb/store.go | 339 +++++++++++++++------------------ tsdb/store_test.go | 50 ++++- 6 files changed, 279 insertions(+), 256 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index f0ed86beed..74bbe61892 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -497,21 +497,16 @@ func (s *Server) startServerReporting() { // reportServer reports usage statistics about the system. func (s *Server) reportServer() { - dis := s.MetaClient.Databases() - numDatabases := len(dis) + dbs := s.MetaClient.Databases() + numDatabases := len(dbs) numMeasurements := 0 numSeries := 0 // Only needed in the case of a data node if s.TSDBStore != nil { - for _, di := range dis { - d := s.TSDBStore.DatabaseIndex(di.Name) - if d == nil { - // No data in this store for this database. - continue - } - m, s := d.MeasurementSeriesCounts() + for _, db := range dbs { + m, s := s.TSDBStore.MeasurementSeriesCounts(db.Name) numMeasurements += m numSeries += s } diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index a3fa82c044..ee8bbde261 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -5684,7 +5684,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) { &Query{ name: "Drop non-existant measurement", command: `DROP MEASUREMENT doesntexist`, - exp: `{"results":[{"statement_id":0,"error":"measurement not found: doesntexist"}]}`, + exp: `{"results":[{"statement_id":0,"error":"shard 1: measurement not found: doesntexist"}]}`, params: url.Values{"db": []string{"db0"}}, }, }...) diff --git a/tsdb/shard.go b/tsdb/shard.go index 4ccf5d043d..a7b175a4ca 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -127,11 +127,10 @@ type Shard struct { } // NewShard returns a new initialized Shard. -func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard { +func NewShard(id uint64, path string, walPath string, options EngineOptions) *Shard { db, rp := DecodeStorePath(path) logger := zap.New(zap.NullEncoder()) s := &Shard{ - index: index, id: id, path: path, walPath: walPath, @@ -218,6 +217,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { statDiskBytes: atomic.LoadInt64(&s.stats.DiskBytes), }, }} + + // Add the index and engine statistics. + statistics = append(statistics, s.index.Statistics(tags)...) statistics = append(statistics, s.engine.Statistics(tags)...) return statistics } @@ -282,12 +284,6 @@ func (s *Shard) Open() error { return nil } -// UnloadIndex removes all references to this shard from the DatabaseIndex -func (s *Shard) UnloadIndex() { - // Don't leak our shard ID and series keys in the index - s.index.RemoveShard(s.id) -} - // Close shuts down the shard's store. func (s *Shard) Close() error { s.mu.Lock() @@ -307,8 +303,8 @@ func (s *Shard) close() error { close(s.closing) } - // Don't leak our shard ID and series keys in the index - s.UnloadIndex() + // Wipe out our index. + s.index = NewDatabaseIndex(s.database) err := s.engine.Close() if err == nil { @@ -460,15 +456,24 @@ func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error { } // DeleteMeasurement deletes a measurement and all underlying series. -func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error { +func (s *Shard) DeleteMeasurement(name string) error { if err := s.ready(); err != nil { return err } - if err := s.engine.DeleteMeasurement(name, seriesKeys); err != nil { + // Attempt to find the series keys. + m := s.index.Measurement(name) + if m == nil { + return influxql.ErrMeasurementNotFound(name) + } + + // Remove the measurement from the engine. + if err := s.engine.DeleteMeasurement(name, m.SeriesKeys()); err != nil { return err } + // Remove the measurement from the index. + s.index.DropMeasurement(name) return nil } @@ -650,6 +655,22 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, return points, fieldsToCreate, err } +// Measurement returns the named measurement from the index. +func (s *Shard) Measurement(name string) *Measurement { + return s.index.Measurement(name) +} + +// Measurements returns a slice of all measurements from the index. +func (s *Shard) Measurements() []*Measurement { + return s.index.Measurements() +} + +// MeasurementsByExpr takes an expression containing only tags and returns a +// slice of matching measurements. +func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, error) { + return s.index.MeasurementsByExpr(cond) +} + // SeriesCount returns the number of series buckets on the shard. func (s *Shard) SeriesCount() (int, error) { if err := s.ready(); err != nil { @@ -658,6 +679,11 @@ func (s *Shard) SeriesCount() (int, error) { return s.engine.SeriesCount() } +// Series returns a series by key. +func (s *Shard) Series(key string) *Series { + return s.index.Series(key) +} + // WriteTo writes the shard's data to w. func (s *Shard) WriteTo(w io.Writer) (int64, error) { if err := s.ready(); err != nil { diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 61d4732fa2..7ec41742a5 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -31,11 +31,10 @@ func TestShardWriteAndIndex(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, tmpShard, tmpWal, opts) // Calling WritePoints when the engine is not open will return // ErrEngineClosed. @@ -66,15 +65,20 @@ func TestShardWriteAndIndex(t *testing.T) { } validateIndex := func() { - if index.SeriesN() != 1 { - t.Fatalf("series wasn't in index") + cnt, err := sh.SeriesCount() + if err != nil { + t.Fatal(err) } - seriesTags := index.Series(string(pt.Key())).Tags + if got, exp := cnt, 1; got != exp { + t.Fatalf("got %v series, exp %v series in index", got, exp) + } + + seriesTags := sh.Series(string(pt.Key())).Tags if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") { t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags) } - if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) { + if !reflect.DeepEqual(sh.Measurement("cpu").TagKeys(), []string{"host"}) { t.Fatalf("tag key wasn't saved to measurement index") } } @@ -84,8 +88,7 @@ func TestShardWriteAndIndex(t *testing.T) { // ensure the index gets loaded after closing and opening the shard sh.Close() - index = tsdb.NewDatabaseIndex("db") - sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh = tsdb.NewShard(1, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) } @@ -106,12 +109,11 @@ func TestMaxSeriesLimit(t *testing.T) { tmpShard := path.Join(tmpDir, "db", "rp", "1") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") opts.Config.MaxSeriesPerDatabase = 1000 - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) @@ -212,11 +214,10 @@ func TestWriteTimeTag(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) } @@ -237,7 +238,7 @@ func TestWriteTimeTag(t *testing.T) { t.Fatalf("unexpected log message: %s", strings.TrimSpace(got)) } - m := index.Measurement("cpu") + m := sh.Measurement("cpu") if m != nil { t.Fatal("unexpected cpu measurement") } @@ -257,7 +258,7 @@ func TestWriteTimeTag(t *testing.T) { t.Fatalf("unexpected log message: %s", strings.TrimSpace(got)) } - m = index.Measurement("cpu") + m = sh.Measurement("cpu") if m == nil { t.Fatal("expected cpu measurement") } @@ -273,11 +274,10 @@ func TestWriteTimeField(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) } @@ -299,7 +299,7 @@ func TestWriteTimeField(t *testing.T) { } key := models.MakeKey([]byte("cpu"), nil) - series := index.Series(string(key)) + series := sh.Series(string(key)) if series == nil { t.Fatal("expected series") } else if len(series.Tags) != 0 { @@ -313,11 +313,10 @@ func TestShardWriteAddNewField(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) } @@ -347,18 +346,23 @@ func TestShardWriteAddNewField(t *testing.T) { t.Fatalf(err.Error()) } - if index.SeriesN() != 1 { - t.Fatalf("series wasn't in index") + cnt, err := sh.SeriesCount() + if err != nil { + t.Fatal(err) } - seriesTags := index.Series(string(pt.Key())).Tags + if got, exp := cnt, 1; got != exp { + t.Fatalf("got %d series, exp %d series in index", got, exp) + } + + seriesTags := sh.Series(string(pt.Key())).Tags if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") { t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags) } - if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) { + if !reflect.DeepEqual(sh.Measurement("cpu").TagKeys(), []string{"host"}) { t.Fatalf("tag key wasn't saved to measurement index") } - if len(index.Measurement("cpu").FieldNames()) != 2 { + if len(sh.Measurement("cpu").FieldNames()) != 2 { t.Fatalf("field names wasn't saved to measurement index") } } @@ -445,11 +449,10 @@ func TestShard_Close_RemoveIndex(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) @@ -467,16 +470,25 @@ func TestShard_Close_RemoveIndex(t *testing.T) { t.Fatalf(err.Error()) } - if got, exp := index.SeriesN(), 1; got != exp { - t.Fatalf("series count mismatch: got %v, exp %v", got, exp) + cnt, err := sh.SeriesCount() + if err != nil { + t.Fatal(err) + } + if got, exp := cnt, 1; got != exp { + t.Fatalf("got %d series, exp %d series in index", got, exp) } // ensure the index gets loaded after closing and opening the shard sh.Close() + sh.Open() - if got, exp := index.SeriesN(), 0; got != exp { - t.Fatalf("series count mismatch: got %v, exp %v", got, exp) + if cnt, err = sh.SeriesCount(); err != nil { + t.Fatal(err) } + if got, exp := cnt, 1; got != exp { + t.Fatalf("got %d series, exp %d series in index", got, exp) + } + } // Ensure a shard can create iterators for its underlying data. @@ -822,8 +834,6 @@ func BenchmarkWritePoints_ExistingSeries_1M(b *testing.B) { func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) - // Create index for the shard to use. - index := tsdb.NewDatabaseIndex("db") // Generate point data to write to the shard. points := []models.Point{} for _, s := range series { @@ -842,7 +852,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { tmpDir, _ := ioutil.TempDir("", "shard_test") tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, tmpShard, tmpWal, tsdb.NewEngineOptions()) shard.Open() b.StartTimer() @@ -863,8 +873,6 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) - // Create index for the shard to use. - index := tsdb.NewDatabaseIndex("db") // Generate point data to write to the shard. points := []models.Point{} for _, s := range series { @@ -878,7 +886,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt defer os.RemoveAll(tmpDir) tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, tmpShard, tmpWal, tsdb.NewEngineOptions()) shard.Open() defer shard.Close() chunkedWrite(shard, points) @@ -939,7 +947,6 @@ func NewShard() *Shard { return &Shard{ Shard: tsdb.NewShard(0, - tsdb.NewDatabaseIndex("db"), filepath.Join(path, "data", "db0", "rp0", "1"), filepath.Join(path, "wal", "db0", "rp0", "1"), opt, diff --git a/tsdb/store.go b/tsdb/store.go index 5b745c4653..fc4c230d0a 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -29,10 +29,12 @@ var ( // Store manages shards and indexes for databases. type Store struct { - mu sync.RWMutex - path string + mu sync.RWMutex + // databases keeps track of the number of databases being managed by the + // store. + databases map[string]struct{} - databaseIndexes map[string]*DatabaseIndex + path string // shards is a map of shard IDs to the associated Shard. shards map[uint64]*Shard @@ -53,6 +55,7 @@ func NewStore(path string) *Store { logger := zap.New(zap.NullEncoder()) return &Store{ + databases: make(map[string]struct{}), path: path, EngineOptions: opts, Logger: logger, @@ -71,21 +74,15 @@ func (s *Store) WithLogger(log zap.Logger) { // Statistics returns statistics for period monitoring. func (s *Store) Statistics(tags map[string]string) []models.Statistic { - var statistics []models.Statistic - s.mu.RLock() - indexes := make([]models.Statistic, 0, len(s.databaseIndexes)) - for _, dbi := range s.databaseIndexes { - indexes = append(indexes, dbi.Statistics(tags)...) - } shards := s.shardsSlice() s.mu.RUnlock() + // Gather allĀ statistics for all shards. + var statistics []models.Statistic for _, shard := range shards { statistics = append(statistics, shard.Statistics(tags)...) } - - statistics = append(statistics, indexes...) return statistics } @@ -93,15 +90,13 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { func (s *Store) Path() string { return s.path } // Open initializes the store, creating all necessary directories, loading all -// shards and indexes and initializing periodic maintenance of all shards. +// shards as well as initializing periodic maintenance of them. func (s *Store) Open() error { s.mu.Lock() defer s.mu.Unlock() s.closing = make(chan struct{}) - s.shards = map[uint64]*Shard{} - s.databaseIndexes = map[string]*DatabaseIndex{} s.Logger.Info(fmt.Sprintf("Using data dir: %v", s.Path())) @@ -110,11 +105,6 @@ func (s *Store) Open() error { return err } - // TODO: Start AE for Node - if err := s.loadIndexes(); err != nil { - return err - } - if err := s.loadShards(); err != nil { return err } @@ -124,23 +114,8 @@ func (s *Store) Open() error { return nil } -func (s *Store) loadIndexes() error { - dbs, err := ioutil.ReadDir(s.path) - if err != nil { - return err - } - for _, db := range dbs { - if !db.IsDir() { - s.Logger.Info(fmt.Sprintf("Skipping database dir: %s. Not a directory", db.Name())) - continue - } - s.databaseIndexes[db.Name()] = NewDatabaseIndex(db.Name()) - } - return nil -} - func (s *Store) loadShards() error { - // struct to hold the result of opening each reader in a goroutine + // res holds the result from opening each shard in a goroutine type res struct { s *Shard err error @@ -151,27 +126,37 @@ func (s *Store) loadShards() error { resC := make(chan *res) var n int - // loop through the current database indexes - for db := range s.databaseIndexes { - rps, err := ioutil.ReadDir(filepath.Join(s.path, db)) + // Determine how many shards we need to open by checking the store path. + dbDirs, err := ioutil.ReadDir(s.path) + if err != nil { + return err + } + + for _, db := range dbDirs { + if !db.IsDir() { + s.Logger.Printf("Not loading %s. Not a database directory.", db.Name()) + continue + } + + // Load each retention policy within the database directory. + rpDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name())) if err != nil { return err } - for _, rp := range rps { - // retention policies should be directories. Skip anything that is not a dir. + for _, rp := range rpDirs { if !rp.IsDir() { s.Logger.Info(fmt.Sprintf("Skipping retention policy dir: %s. Not a directory", rp.Name())) continue } - shards, err := ioutil.ReadDir(filepath.Join(s.path, db, rp.Name())) + shardDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name(), rp.Name())) if err != nil { return err } - for _, sh := range shards { + for _, sh := range shardDirs { n++ - go func(index *DatabaseIndex, db, rp, sh string) { + go func(db, rp, sh string) { t.Take() defer t.Release() @@ -186,7 +171,7 @@ func (s *Store) loadShards() error { return } - shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions) + shard := NewShard(shardID, path, walPath, s.EngineOptions) shard.WithLogger(s.baseLogger) err = shard.Open() @@ -197,11 +182,13 @@ func (s *Store) loadShards() error { resC <- &res{s: shard} s.Logger.Info(fmt.Sprintf("%s opened in %s", path, time.Now().Sub(start))) - }(s.databaseIndexes[db], db, rp.Name(), sh.Name()) + }(db, rp.Name(), sh.Name()) } } } + // Gather results of opening shards concurrently, keeping track of how + // many databases we are managing. for i := 0; i < n; i++ { res := <-resC if res.err != nil { @@ -209,6 +196,7 @@ func (s *Store) loadShards() error { continue } s.shards[res.s.id] = res.s + s.databases[res.s.database] = struct{}{} } close(resC) return nil @@ -234,18 +222,10 @@ func (s *Store) Close() error { s.opened = false s.shards = nil - s.databaseIndexes = nil return nil } -// DatabaseIndexN returns the number of databases indices in the store. -func (s *Store) DatabaseIndexN() int { - s.mu.RLock() - defer s.mu.RUnlock() - return len(s.databaseIndexes) -} - // Shard returns a shard by id. func (s *Store) Shard(id uint64) *Shard { s.mu.RLock() @@ -290,31 +270,24 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en default: } - // shard already exists + // Shard already exists. if _, ok := s.shards[shardID]; ok { return nil } - // created the db and retention policy dirs if they don't exist + // Create the db and retention policy directories if they don't exist. if err := os.MkdirAll(filepath.Join(s.path, database, retentionPolicy), 0700); err != nil { return err } - // create the WAL directory + // Create the WAL directory. walPath := filepath.Join(s.EngineOptions.Config.WALDir, database, retentionPolicy, fmt.Sprintf("%d", shardID)) if err := os.MkdirAll(walPath, 0700); err != nil { return err } - // create the database index if it does not exist - db, ok := s.databaseIndexes[database] - if !ok { - db = NewDatabaseIndex(database) - s.databaseIndexes[database] = db - } - path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) - shard := NewShard(shardID, db, path, walPath, s.EngineOptions) + shard := NewShard(shardID, path, walPath, s.EngineOptions) shard.WithLogger(s.baseLogger) shard.EnableOnOpen = enabled @@ -323,6 +296,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en } s.shards[shardID] = shard + s.databases[database] = struct{}{} // Ensure we are tracking any new db. return nil } @@ -394,9 +368,7 @@ func (s *Store) ShardIteratorCreator(id uint64, opt *influxql.SelectOptions) inf // DeleteDatabase will close all shards associated with a database and remove the directory and files from disk. func (s *Store) DeleteDatabase(name string) error { s.mu.RLock() - shards := s.filterShards(func(sh *Shard) bool { - return sh.database == name - }) + shards := s.filterShards(byDatabase(name)) s.mu.RUnlock() if err := s.walkShards(shards, func(sh *Shard) error { @@ -420,7 +392,9 @@ func (s *Store) DeleteDatabase(name string) error { for _, sh := range shards { delete(s.shards, sh.id) } - delete(s.databaseIndexes, name) + + // Remove database from store list of databases + delete(s.databases, name) s.mu.Unlock() return nil @@ -448,7 +422,7 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error { return err } - // Remove the rentention policy folder. + // Remove the retention policy folder. if err := os.RemoveAll(filepath.Join(s.path, database, name)); err != nil { return err } @@ -468,41 +442,16 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error { // DeleteMeasurement removes a measurement and all associated series from a database. func (s *Store) DeleteMeasurement(database, name string) error { - // Find the database. s.mu.RLock() - db := s.databaseIndexes[database] - s.mu.RUnlock() - if db == nil { - return nil - } - - // Find the measurement. - m := db.Measurement(name) - if m == nil { - return influxql.ErrMeasurementNotFound(name) - } - - seriesKeys := m.SeriesKeys() - - s.mu.RLock() - shards := s.filterShards(func(sh *Shard) bool { - return sh.database == database - }) + shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() - if err := s.walkShards(shards, func(sh *Shard) error { - if err := sh.DeleteMeasurement(m.Name, seriesKeys); err != nil { + return s.walkShards(shards, func(sh *Shard) error { + if err := sh.DeleteMeasurement(name); err != nil { return err } return nil - }); err != nil { - return err - } - - // Remove measurement from index. - db.DropMeasurement(m.Name) - - return nil + }) } // filterShards returns a slice of shards where fn returns true @@ -517,6 +466,14 @@ func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard { return shards } +// byDatabase provides a predicate for filterShards that matches on the name of +// the database passed in. +var byDatabase = func(name string) func(sh *Shard) bool { + return func(sh *Shard) bool { + return sh.database == name + } +} + // walkShards apply a function to each shard in parallel. If any of the // functions return an error, the first error is returned. func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { @@ -577,36 +534,20 @@ func (s *Store) shardsSlice() []*Shard { return a } -// DatabaseIndex returns the index for a database by its name. -func (s *Store) DatabaseIndex(name string) *DatabaseIndex { - s.mu.RLock() - defer s.mu.RUnlock() - return s.databaseIndexes[name] -} - -// Databases returns all the databases in the indexes. +// Databases returns the names of all databases managed by the store. func (s *Store) Databases() []string { s.mu.RLock() defer s.mu.RUnlock() - databases := make([]string, 0, len(s.databaseIndexes)) - for db := range s.databaseIndexes { - databases = append(databases, db) + + databases := make([]string, 0, len(s.databases)) + for k, _ := range s.databases { + databases = append(databases, k) } return databases } -// Measurement returns a measurement by name from the given database. -func (s *Store) Measurement(database, name string) *Measurement { - s.mu.RLock() - db := s.databaseIndexes[database] - s.mu.RUnlock() - if db == nil { - return nil - } - return db.Measurement(name) -} - -// DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size. +// DiskSize returns the size of all the shard files in bytes. +// This size does not include the WAL size. func (s *Store) DiskSize() (int64, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -622,7 +563,8 @@ func (s *Store) DiskSize() (int64, error) { return size, nil } -// BackupShard will get the shard and have the engine backup since the passed in time to the writer. +// BackupShard will get the shard and have the engine backup since the passed in +// time to the writer. func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { shard := s.Shard(id) if shard == nil { @@ -653,7 +595,8 @@ func (s *Store) RestoreShard(id uint64, r io.Reader) error { return shard.Restore(r, path) } -// ShardRelativePath will return the relative path to the shard. i.e. //. +// ShardRelativePath will return the relative path to the shard, i.e., +// //. func (s *Store) ShardRelativePath(id uint64) (string, error) { shard := s.Shard(id) if shard == nil { @@ -662,7 +605,8 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { return relativePath(s.path, shard.path) } -// DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys. +// DeleteSeries loops through the local shards and deletes the series data for +// the passed in series keys. func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error { // Expand regex expressions in the FROM clause. a, err := s.ExpandSources(sources) @@ -680,15 +624,21 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi } s.mu.RLock() - defer s.mu.RUnlock() + shards := s.filterShards(byDatabase(database)) + s.mu.RUnlock() - // Find the database. - db := s.DatabaseIndex(database) - if db == nil { - return nil + mMap := make(map[string]*Measurement) + for _, shard := range shards { + shardMeasures := shard.Measurements() + for _, m := range shardMeasures { + mMap[m.Name] = m + } } - measurements, err := measurementsFromSourcesOrDB(db, sources...) + s.mu.RLock() + defer s.mu.RUnlock() + + measurements, err := measurementsFromSourcesOrDB(mMap, sources...) if err != nil { return err } @@ -723,46 +673,38 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi } } - // delete the raw series data - if err := s.deleteSeries(database, seriesKeys, min, max); err != nil { - return err - } - - return nil + // delete the raw series data. + return s.deleteSeries(database, seriesKeys, min, max) } func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int64) error { - db := s.databaseIndexes[database] - if db == nil { - return influxql.ErrDatabaseNotFound(database) - } - s.mu.RLock() - shards := s.filterShards(func(sh *Shard) bool { - return sh.database == database - }) + shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() return s.walkShards(shards, func(sh *Shard) error { if sh.database != database { return nil } + if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil { return err } // The keys we passed in may be fully deleted from the shard, if so, - // we need to remove the shard from all the meta data indexes + // we need to remove the shard from all the meta data indices. existing, err := sh.ContainsSeries(seriesKeys) if err != nil { return err } + var toDelete []string for k, exists := range existing { if !exists { - db.UnassignShard(k, sh.id) + toDelete = append(toDelete, k) } } + sh.index.DropSeries(toDelete) return nil }) } @@ -827,38 +769,49 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { return sh.WritePoints(points) } -// Measurements returns a slice of sorted measurement names in the given database, -// matching the given condition. +// Measurements returns a slice of all measurements. Measurements accepts an +// optional condition expression. If cond is nil, then all measurements for the +// database will be returned. func (s *Store) Measurements(database string, cond influxql.Expr) ([]string, error) { - dbi := s.DatabaseIndex(database) - if dbi == nil { - return nil, nil - } + s.mu.RLock() + shards := s.filterShards(byDatabase(database)) + s.mu.RUnlock() - // Retrieve measurements from database index. Filter if condition specified. - var mms Measurements - if cond == nil { - mms = dbi.Measurements() - } else { - var err error - mms, _, err = dbi.MeasurementsByExpr(cond) - if err != nil { - return nil, err + var m Measurements + for _, sh := range shards { + var mms Measurements + // Retrieve measurements from database index. Filter if condition specified. + if cond == nil { + mms = sh.Measurements() + } else { + var err error + mms, _, err = sh.MeasurementsByExpr(cond) + if err != nil { + return nil, err + } } + + m = append(m, mms...) } // Sort measurements by name. - sort.Sort(mms) + sort.Sort(m) - measurements := make([]string, len(mms)) - for i, m := range mms { - measurements[i] = m.Name + measurements := make([]string, 0, len(m)) + for _, m := range m { + measurements = append(measurements, m.Name) } return measurements, nil } -// TagValues represents the tag keys and values in a measurement. +// MeasurementSeriesCounts returns the number of measurements and series in all +// the shards' indices. +func (s *Store) MeasurementSeriesCounts(database string) (measuments int, series int) { + // TODO: implement me + return 0, 0 +} + type TagValues struct { Measurement string Values []KeyValue @@ -870,11 +823,6 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err return nil, errors.New("a condition is required") } - dbi := s.DatabaseIndex(database) - if dbi == nil { - return nil, nil - } - measurementExpr := influxql.CloneExpr(cond) measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { switch e := e.(type) { @@ -890,18 +838,31 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err return e }), nil) - mms, ok, err := dbi.MeasurementsByExpr(measurementExpr) - if err != nil { - return nil, err - } else if !ok { - mms = dbi.Measurements() - sort.Sort(mms) + // Get all measurements for the shards we're interested in. + s.mu.RLock() + shards := s.filterShards(byDatabase(database)) + s.mu.RUnlock() + + var measures Measurements + for _, sh := range shards { + mms, ok, err := sh.MeasurementsByExpr(measurementExpr) + if err != nil { + return nil, err + } else if !ok { + // TODO(edd): can we simplify this so we don't have to check the + // ok value, and we can call sh.measurements with a shard filter + // instead? + mms = sh.Measurements() + } + + measures = append(measures, mms...) } // If there are no measurements, return immediately. - if len(mms) == 0 { + if len(measures) == 0 { return nil, nil } + sort.Sort(measures) filterExpr := influxql.CloneExpr(cond) filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { @@ -918,8 +879,8 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err return e }), nil) - tagValues := make([]TagValues, len(mms)) - for i, mm := range mms { + tagValues := make([]TagValues, len(measures)) + for i, mm := range measures { tagValues[i].Measurement = mm.Name ids, err := mm.SeriesIDsAllOrByExpr(filterExpr) @@ -1053,31 +1014,31 @@ func relativePath(storePath, shardPath string) (string, error) { // measurementsFromSourcesOrDB returns a list of measurements from the // sources passed in or, if sources is empty, a list of all -// measurement names from the database passed in. -func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source) (Measurements, error) { - var measurements Measurements +// measurement names from the measurement map passed in. +func measurementsFromSourcesOrDB(measurements map[string]*Measurement, sources ...influxql.Source) (Measurements, error) { + var all Measurements if len(sources) > 0 { for _, source := range sources { if m, ok := source.(*influxql.Measurement); ok { - measurement := db.measurements[m.Name] + measurement := measurements[m.Name] if measurement == nil { continue } - measurements = append(measurements, measurement) + all = append(all, measurement) } else { return nil, errors.New("identifiers in FROM clause must be measurement names") } } } else { // No measurements specified in FROM clause so get all measurements that have series. - for _, m := range db.Measurements() { + for _, m := range measurements { if m.HasSeries() { - measurements = append(measurements, m) + all = append(all, m) } } } - sort.Sort(measurements) + sort.Sort(all) - return measurements, nil + return all, nil } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 792fa79c29..06ae2a68b6 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -6,6 +6,8 @@ import ( "io/ioutil" "os" "path/filepath" + "reflect" + "sort" "strings" "testing" "time" @@ -96,8 +98,6 @@ func TestStore_CreateShard(t *testing.T) { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard") - } else if di := s.DatabaseIndex("db0"); di == nil { - t.Errorf("expected database index") } // Create another shard and verify that it exists. @@ -147,8 +147,6 @@ func TestStore_CreateShardSnapShot(t *testing.T) { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard") - } else if di := s.DatabaseIndex("db0"); di == nil { - t.Errorf("expected database index") } dir, e := s.CreateShardSnapshot(1) @@ -160,6 +158,40 @@ func TestStore_CreateShardSnapShot(t *testing.T) { } } +func TestStore_Open(t *testing.T) { + s := NewStore() + defer s.Close() + + if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil { + t.Fatal(err) + } + + if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp2", "4"), 0777); err != nil { + t.Fatal(err) + } + + if err := os.MkdirAll(filepath.Join(s.Path(), "db1", "rp0", "1"), 0777); err != nil { + t.Fatal(err) + } + + // Store should ignore shard since it does not have a numeric name. + if err := s.Open(); err != nil { + t.Fatal(err) + } else if n := len(s.Databases()); n != 2 { + t.Fatalf("unexpected database index count: %d", n) + } else if n := s.ShardN(); n != 3 { + t.Fatalf("unexpected shard count: %d", n) + } + + expDatabases := []string{"db0", "db1"} + gotDatabases := s.Databases() + sort.Strings(gotDatabases) + + if got, exp := gotDatabases, expDatabases; !reflect.DeepEqual(got, exp) { + t.Fatalf("got %#v, expected %#v", got, exp) + } +} + // Ensure the store reports an error when it can't open a database directory. func TestStore_Open_InvalidDatabaseFile(t *testing.T) { s := NewStore() @@ -173,7 +205,7 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) { // Store should ignore database since it's a file. if err := s.Open(); err != nil { t.Fatal(err) - } else if n := s.DatabaseIndexN(); n != 0 { + } else if n := len(s.Databases()); n != 0 { t.Fatalf("unexpected database index count: %d", n) } } @@ -190,10 +222,12 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) { t.Fatal(err) } - // Store should ignore database since it's a file. + // Store should ignore retention policy since it's a file, and there should + // be no indices created. if err := s.Open(); err != nil { t.Fatal(err) - } else if n := s.DatabaseIndexN(); n != 1 { + } else if n := len(s.Databases()); n != 0 { + t.Log(s.Databases()) t.Fatalf("unexpected database index count: %d", n) } } @@ -213,7 +247,7 @@ func TestStore_Open_InvalidShard(t *testing.T) { // Store should ignore shard since it does not have a numeric name. if err := s.Open(); err != nil { t.Fatal(err) - } else if n := s.DatabaseIndexN(); n != 1 { + } else if n := len(s.Databases()); n != 0 { t.Fatalf("unexpected database index count: %d", n) } else if n := s.ShardN(); n != 0 { t.Fatalf("unexpected shard count: %d", n)