Initialise index in shards

pull/7913/head
Edd Robinson 2016-09-01 13:40:16 +01:00 committed by Ben Johnson
parent 57d0556174
commit 2171d9471b
No known key found for this signature in database
GPG Key ID: 81741CD251883081
6 changed files with 279 additions and 256 deletions

View File

@ -497,21 +497,16 @@ func (s *Server) startServerReporting() {
// reportServer reports usage statistics about the system.
func (s *Server) reportServer() {
dis := s.MetaClient.Databases()
numDatabases := len(dis)
dbs := s.MetaClient.Databases()
numDatabases := len(dbs)
numMeasurements := 0
numSeries := 0
// Only needed in the case of a data node
if s.TSDBStore != nil {
for _, di := range dis {
d := s.TSDBStore.DatabaseIndex(di.Name)
if d == nil {
// No data in this store for this database.
continue
}
m, s := d.MeasurementSeriesCounts()
for _, db := range dbs {
m, s := s.TSDBStore.MeasurementSeriesCounts(db.Name)
numMeasurements += m
numSeries += s
}

View File

@ -5684,7 +5684,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
&Query{
name: "Drop non-existant measurement",
command: `DROP MEASUREMENT doesntexist`,
exp: `{"results":[{"statement_id":0,"error":"measurement not found: doesntexist"}]}`,
exp: `{"results":[{"statement_id":0,"error":"shard 1: measurement not found: doesntexist"}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)

View File

@ -127,11 +127,10 @@ type Shard struct {
}
// NewShard returns a new initialized Shard.
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard {
func NewShard(id uint64, path string, walPath string, options EngineOptions) *Shard {
db, rp := DecodeStorePath(path)
logger := zap.New(zap.NullEncoder())
s := &Shard{
index: index,
id: id,
path: path,
walPath: walPath,
@ -218,6 +217,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
statDiskBytes: atomic.LoadInt64(&s.stats.DiskBytes),
},
}}
// Add the index and engine statistics.
statistics = append(statistics, s.index.Statistics(tags)...)
statistics = append(statistics, s.engine.Statistics(tags)...)
return statistics
}
@ -282,12 +284,6 @@ func (s *Shard) Open() error {
return nil
}
// UnloadIndex removes all references to this shard from the DatabaseIndex
func (s *Shard) UnloadIndex() {
// Don't leak our shard ID and series keys in the index
s.index.RemoveShard(s.id)
}
// Close shuts down the shard's store.
func (s *Shard) Close() error {
s.mu.Lock()
@ -307,8 +303,8 @@ func (s *Shard) close() error {
close(s.closing)
}
// Don't leak our shard ID and series keys in the index
s.UnloadIndex()
// Wipe out our index.
s.index = NewDatabaseIndex(s.database)
err := s.engine.Close()
if err == nil {
@ -460,15 +456,24 @@ func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
}
// DeleteMeasurement deletes a measurement and all underlying series.
func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error {
func (s *Shard) DeleteMeasurement(name string) error {
if err := s.ready(); err != nil {
return err
}
if err := s.engine.DeleteMeasurement(name, seriesKeys); err != nil {
// Attempt to find the series keys.
m := s.index.Measurement(name)
if m == nil {
return influxql.ErrMeasurementNotFound(name)
}
// Remove the measurement from the engine.
if err := s.engine.DeleteMeasurement(name, m.SeriesKeys()); err != nil {
return err
}
// Remove the measurement from the index.
s.index.DropMeasurement(name)
return nil
}
@ -650,6 +655,22 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
return points, fieldsToCreate, err
}
// Measurement returns the named measurement from the index.
func (s *Shard) Measurement(name string) *Measurement {
return s.index.Measurement(name)
}
// Measurements returns a slice of all measurements from the index.
func (s *Shard) Measurements() []*Measurement {
return s.index.Measurements()
}
// MeasurementsByExpr takes an expression containing only tags and returns a
// slice of matching measurements.
func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, error) {
return s.index.MeasurementsByExpr(cond)
}
// SeriesCount returns the number of series buckets on the shard.
func (s *Shard) SeriesCount() (int, error) {
if err := s.ready(); err != nil {
@ -658,6 +679,11 @@ func (s *Shard) SeriesCount() (int, error) {
return s.engine.SeriesCount()
}
// Series returns a series by key.
func (s *Shard) Series(key string) *Series {
return s.index.Series(key)
}
// WriteTo writes the shard's data to w.
func (s *Shard) WriteTo(w io.Writer) (int64, error) {
if err := s.ready(); err != nil {

View File

@ -31,11 +31,10 @@ func TestShardWriteAndIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
// Calling WritePoints when the engine is not open will return
// ErrEngineClosed.
@ -66,15 +65,20 @@ func TestShardWriteAndIndex(t *testing.T) {
}
validateIndex := func() {
if index.SeriesN() != 1 {
t.Fatalf("series wasn't in index")
cnt, err := sh.SeriesCount()
if err != nil {
t.Fatal(err)
}
seriesTags := index.Series(string(pt.Key())).Tags
if got, exp := cnt, 1; got != exp {
t.Fatalf("got %v series, exp %v series in index", got, exp)
}
seriesTags := sh.Series(string(pt.Key())).Tags
if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") {
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags)
}
if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) {
if !reflect.DeepEqual(sh.Measurement("cpu").TagKeys(), []string{"host"}) {
t.Fatalf("tag key wasn't saved to measurement index")
}
}
@ -84,8 +88,7 @@ func TestShardWriteAndIndex(t *testing.T) {
// ensure the index gets loaded after closing and opening the shard
sh.Close()
index = tsdb.NewDatabaseIndex("db")
sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh = tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -106,12 +109,11 @@ func TestMaxSeriesLimit(t *testing.T) {
tmpShard := path.Join(tmpDir, "db", "rp", "1")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.Config.MaxSeriesPerDatabase = 1000
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
@ -212,11 +214,10 @@ func TestWriteTimeTag(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -237,7 +238,7 @@ func TestWriteTimeTag(t *testing.T) {
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
}
m := index.Measurement("cpu")
m := sh.Measurement("cpu")
if m != nil {
t.Fatal("unexpected cpu measurement")
}
@ -257,7 +258,7 @@ func TestWriteTimeTag(t *testing.T) {
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
}
m = index.Measurement("cpu")
m = sh.Measurement("cpu")
if m == nil {
t.Fatal("expected cpu measurement")
}
@ -273,11 +274,10 @@ func TestWriteTimeField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -299,7 +299,7 @@ func TestWriteTimeField(t *testing.T) {
}
key := models.MakeKey([]byte("cpu"), nil)
series := index.Series(string(key))
series := sh.Series(string(key))
if series == nil {
t.Fatal("expected series")
} else if len(series.Tags) != 0 {
@ -313,11 +313,10 @@ func TestShardWriteAddNewField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -347,18 +346,23 @@ func TestShardWriteAddNewField(t *testing.T) {
t.Fatalf(err.Error())
}
if index.SeriesN() != 1 {
t.Fatalf("series wasn't in index")
cnt, err := sh.SeriesCount()
if err != nil {
t.Fatal(err)
}
seriesTags := index.Series(string(pt.Key())).Tags
if got, exp := cnt, 1; got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
seriesTags := sh.Series(string(pt.Key())).Tags
if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") {
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags)
}
if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) {
if !reflect.DeepEqual(sh.Measurement("cpu").TagKeys(), []string{"host"}) {
t.Fatalf("tag key wasn't saved to measurement index")
}
if len(index.Measurement("cpu").FieldNames()) != 2 {
if len(sh.Measurement("cpu").FieldNames()) != 2 {
t.Fatalf("field names wasn't saved to measurement index")
}
}
@ -445,11 +449,10 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
@ -467,16 +470,25 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
t.Fatalf(err.Error())
}
if got, exp := index.SeriesN(), 1; got != exp {
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
cnt, err := sh.SeriesCount()
if err != nil {
t.Fatal(err)
}
if got, exp := cnt, 1; got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
// ensure the index gets loaded after closing and opening the shard
sh.Close()
sh.Open()
if got, exp := index.SeriesN(), 0; got != exp {
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
if cnt, err = sh.SeriesCount(); err != nil {
t.Fatal(err)
}
if got, exp := cnt, 1; got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
}
// Ensure a shard can create iterators for its underlying data.
@ -822,8 +834,6 @@ func BenchmarkWritePoints_ExistingSeries_1M(b *testing.B) {
func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Create index for the shard to use.
index := tsdb.NewDatabaseIndex("db")
// Generate point data to write to the shard.
points := []models.Point{}
for _, s := range series {
@ -842,7 +852,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard.Open()
b.StartTimer()
@ -863,8 +873,6 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Create index for the shard to use.
index := tsdb.NewDatabaseIndex("db")
// Generate point data to write to the shard.
points := []models.Point{}
for _, s := range series {
@ -878,7 +886,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard.Open()
defer shard.Close()
chunkedWrite(shard, points)
@ -939,7 +947,6 @@ func NewShard() *Shard {
return &Shard{
Shard: tsdb.NewShard(0,
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data", "db0", "rp0", "1"),
filepath.Join(path, "wal", "db0", "rp0", "1"),
opt,

View File

@ -29,10 +29,12 @@ var (
// Store manages shards and indexes for databases.
type Store struct {
mu sync.RWMutex
path string
mu sync.RWMutex
// databases keeps track of the number of databases being managed by the
// store.
databases map[string]struct{}
databaseIndexes map[string]*DatabaseIndex
path string
// shards is a map of shard IDs to the associated Shard.
shards map[uint64]*Shard
@ -53,6 +55,7 @@ func NewStore(path string) *Store {
logger := zap.New(zap.NullEncoder())
return &Store{
databases: make(map[string]struct{}),
path: path,
EngineOptions: opts,
Logger: logger,
@ -71,21 +74,15 @@ func (s *Store) WithLogger(log zap.Logger) {
// Statistics returns statistics for period monitoring.
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
var statistics []models.Statistic
s.mu.RLock()
indexes := make([]models.Statistic, 0, len(s.databaseIndexes))
for _, dbi := range s.databaseIndexes {
indexes = append(indexes, dbi.Statistics(tags)...)
}
shards := s.shardsSlice()
s.mu.RUnlock()
// Gather all statistics for all shards.
var statistics []models.Statistic
for _, shard := range shards {
statistics = append(statistics, shard.Statistics(tags)...)
}
statistics = append(statistics, indexes...)
return statistics
}
@ -93,15 +90,13 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
func (s *Store) Path() string { return s.path }
// Open initializes the store, creating all necessary directories, loading all
// shards and indexes and initializing periodic maintenance of all shards.
// shards as well as initializing periodic maintenance of them.
func (s *Store) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
s.closing = make(chan struct{})
s.shards = map[uint64]*Shard{}
s.databaseIndexes = map[string]*DatabaseIndex{}
s.Logger.Info(fmt.Sprintf("Using data dir: %v", s.Path()))
@ -110,11 +105,6 @@ func (s *Store) Open() error {
return err
}
// TODO: Start AE for Node
if err := s.loadIndexes(); err != nil {
return err
}
if err := s.loadShards(); err != nil {
return err
}
@ -124,23 +114,8 @@ func (s *Store) Open() error {
return nil
}
func (s *Store) loadIndexes() error {
dbs, err := ioutil.ReadDir(s.path)
if err != nil {
return err
}
for _, db := range dbs {
if !db.IsDir() {
s.Logger.Info(fmt.Sprintf("Skipping database dir: %s. Not a directory", db.Name()))
continue
}
s.databaseIndexes[db.Name()] = NewDatabaseIndex(db.Name())
}
return nil
}
func (s *Store) loadShards() error {
// struct to hold the result of opening each reader in a goroutine
// res holds the result from opening each shard in a goroutine
type res struct {
s *Shard
err error
@ -151,27 +126,37 @@ func (s *Store) loadShards() error {
resC := make(chan *res)
var n int
// loop through the current database indexes
for db := range s.databaseIndexes {
rps, err := ioutil.ReadDir(filepath.Join(s.path, db))
// Determine how many shards we need to open by checking the store path.
dbDirs, err := ioutil.ReadDir(s.path)
if err != nil {
return err
}
for _, db := range dbDirs {
if !db.IsDir() {
s.Logger.Printf("Not loading %s. Not a database directory.", db.Name())
continue
}
// Load each retention policy within the database directory.
rpDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name()))
if err != nil {
return err
}
for _, rp := range rps {
// retention policies should be directories. Skip anything that is not a dir.
for _, rp := range rpDirs {
if !rp.IsDir() {
s.Logger.Info(fmt.Sprintf("Skipping retention policy dir: %s. Not a directory", rp.Name()))
continue
}
shards, err := ioutil.ReadDir(filepath.Join(s.path, db, rp.Name()))
shardDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name(), rp.Name()))
if err != nil {
return err
}
for _, sh := range shards {
for _, sh := range shardDirs {
n++
go func(index *DatabaseIndex, db, rp, sh string) {
go func(db, rp, sh string) {
t.Take()
defer t.Release()
@ -186,7 +171,7 @@ func (s *Store) loadShards() error {
return
}
shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions)
shard := NewShard(shardID, path, walPath, s.EngineOptions)
shard.WithLogger(s.baseLogger)
err = shard.Open()
@ -197,11 +182,13 @@ func (s *Store) loadShards() error {
resC <- &res{s: shard}
s.Logger.Info(fmt.Sprintf("%s opened in %s", path, time.Now().Sub(start)))
}(s.databaseIndexes[db], db, rp.Name(), sh.Name())
}(db, rp.Name(), sh.Name())
}
}
}
// Gather results of opening shards concurrently, keeping track of how
// many databases we are managing.
for i := 0; i < n; i++ {
res := <-resC
if res.err != nil {
@ -209,6 +196,7 @@ func (s *Store) loadShards() error {
continue
}
s.shards[res.s.id] = res.s
s.databases[res.s.database] = struct{}{}
}
close(resC)
return nil
@ -234,18 +222,10 @@ func (s *Store) Close() error {
s.opened = false
s.shards = nil
s.databaseIndexes = nil
return nil
}
// DatabaseIndexN returns the number of databases indices in the store.
func (s *Store) DatabaseIndexN() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.databaseIndexes)
}
// Shard returns a shard by id.
func (s *Store) Shard(id uint64) *Shard {
s.mu.RLock()
@ -290,31 +270,24 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
default:
}
// shard already exists
// Shard already exists.
if _, ok := s.shards[shardID]; ok {
return nil
}
// created the db and retention policy dirs if they don't exist
// Create the db and retention policy directories if they don't exist.
if err := os.MkdirAll(filepath.Join(s.path, database, retentionPolicy), 0700); err != nil {
return err
}
// create the WAL directory
// Create the WAL directory.
walPath := filepath.Join(s.EngineOptions.Config.WALDir, database, retentionPolicy, fmt.Sprintf("%d", shardID))
if err := os.MkdirAll(walPath, 0700); err != nil {
return err
}
// create the database index if it does not exist
db, ok := s.databaseIndexes[database]
if !ok {
db = NewDatabaseIndex(database)
s.databaseIndexes[database] = db
}
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, db, path, walPath, s.EngineOptions)
shard := NewShard(shardID, path, walPath, s.EngineOptions)
shard.WithLogger(s.baseLogger)
shard.EnableOnOpen = enabled
@ -323,6 +296,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
}
s.shards[shardID] = shard
s.databases[database] = struct{}{} // Ensure we are tracking any new db.
return nil
}
@ -394,9 +368,7 @@ func (s *Store) ShardIteratorCreator(id uint64, opt *influxql.SelectOptions) inf
// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (s *Store) DeleteDatabase(name string) error {
s.mu.RLock()
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == name
})
shards := s.filterShards(byDatabase(name))
s.mu.RUnlock()
if err := s.walkShards(shards, func(sh *Shard) error {
@ -420,7 +392,9 @@ func (s *Store) DeleteDatabase(name string) error {
for _, sh := range shards {
delete(s.shards, sh.id)
}
delete(s.databaseIndexes, name)
// Remove database from store list of databases
delete(s.databases, name)
s.mu.Unlock()
return nil
@ -448,7 +422,7 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
return err
}
// Remove the rentention policy folder.
// Remove the retention policy folder.
if err := os.RemoveAll(filepath.Join(s.path, database, name)); err != nil {
return err
}
@ -468,41 +442,16 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
// DeleteMeasurement removes a measurement and all associated series from a database.
func (s *Store) DeleteMeasurement(database, name string) error {
// Find the database.
s.mu.RLock()
db := s.databaseIndexes[database]
s.mu.RUnlock()
if db == nil {
return nil
}
// Find the measurement.
m := db.Measurement(name)
if m == nil {
return influxql.ErrMeasurementNotFound(name)
}
seriesKeys := m.SeriesKeys()
s.mu.RLock()
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == database
})
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
if err := s.walkShards(shards, func(sh *Shard) error {
if err := sh.DeleteMeasurement(m.Name, seriesKeys); err != nil {
return s.walkShards(shards, func(sh *Shard) error {
if err := sh.DeleteMeasurement(name); err != nil {
return err
}
return nil
}); err != nil {
return err
}
// Remove measurement from index.
db.DropMeasurement(m.Name)
return nil
})
}
// filterShards returns a slice of shards where fn returns true
@ -517,6 +466,14 @@ func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard {
return shards
}
// byDatabase provides a predicate for filterShards that matches on the name of
// the database passed in.
var byDatabase = func(name string) func(sh *Shard) bool {
return func(sh *Shard) bool {
return sh.database == name
}
}
// walkShards apply a function to each shard in parallel. If any of the
// functions return an error, the first error is returned.
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
@ -577,36 +534,20 @@ func (s *Store) shardsSlice() []*Shard {
return a
}
// DatabaseIndex returns the index for a database by its name.
func (s *Store) DatabaseIndex(name string) *DatabaseIndex {
s.mu.RLock()
defer s.mu.RUnlock()
return s.databaseIndexes[name]
}
// Databases returns all the databases in the indexes.
// Databases returns the names of all databases managed by the store.
func (s *Store) Databases() []string {
s.mu.RLock()
defer s.mu.RUnlock()
databases := make([]string, 0, len(s.databaseIndexes))
for db := range s.databaseIndexes {
databases = append(databases, db)
databases := make([]string, 0, len(s.databases))
for k, _ := range s.databases {
databases = append(databases, k)
}
return databases
}
// Measurement returns a measurement by name from the given database.
func (s *Store) Measurement(database, name string) *Measurement {
s.mu.RLock()
db := s.databaseIndexes[database]
s.mu.RUnlock()
if db == nil {
return nil
}
return db.Measurement(name)
}
// DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size.
// DiskSize returns the size of all the shard files in bytes.
// This size does not include the WAL size.
func (s *Store) DiskSize() (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
@ -622,7 +563,8 @@ func (s *Store) DiskSize() (int64, error) {
return size, nil
}
// BackupShard will get the shard and have the engine backup since the passed in time to the writer.
// BackupShard will get the shard and have the engine backup since the passed in
// time to the writer.
func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
shard := s.Shard(id)
if shard == nil {
@ -653,7 +595,8 @@ func (s *Store) RestoreShard(id uint64, r io.Reader) error {
return shard.Restore(r, path)
}
// ShardRelativePath will return the relative path to the shard. i.e. <database>/<retention>/<id>.
// ShardRelativePath will return the relative path to the shard, i.e.,
// <database>/<retention>/<id>.
func (s *Store) ShardRelativePath(id uint64) (string, error) {
shard := s.Shard(id)
if shard == nil {
@ -662,7 +605,8 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
return relativePath(s.path, shard.path)
}
// DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys.
// DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys.
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
// Expand regex expressions in the FROM clause.
a, err := s.ExpandSources(sources)
@ -680,15 +624,21 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
}
s.mu.RLock()
defer s.mu.RUnlock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
// Find the database.
db := s.DatabaseIndex(database)
if db == nil {
return nil
mMap := make(map[string]*Measurement)
for _, shard := range shards {
shardMeasures := shard.Measurements()
for _, m := range shardMeasures {
mMap[m.Name] = m
}
}
measurements, err := measurementsFromSourcesOrDB(db, sources...)
s.mu.RLock()
defer s.mu.RUnlock()
measurements, err := measurementsFromSourcesOrDB(mMap, sources...)
if err != nil {
return err
}
@ -723,46 +673,38 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
}
}
// delete the raw series data
if err := s.deleteSeries(database, seriesKeys, min, max); err != nil {
return err
}
return nil
// delete the raw series data.
return s.deleteSeries(database, seriesKeys, min, max)
}
func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int64) error {
db := s.databaseIndexes[database]
if db == nil {
return influxql.ErrDatabaseNotFound(database)
}
s.mu.RLock()
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == database
})
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
return s.walkShards(shards, func(sh *Shard) error {
if sh.database != database {
return nil
}
if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil {
return err
}
// The keys we passed in may be fully deleted from the shard, if so,
// we need to remove the shard from all the meta data indexes
// we need to remove the shard from all the meta data indices.
existing, err := sh.ContainsSeries(seriesKeys)
if err != nil {
return err
}
var toDelete []string
for k, exists := range existing {
if !exists {
db.UnassignShard(k, sh.id)
toDelete = append(toDelete, k)
}
}
sh.index.DropSeries(toDelete)
return nil
})
}
@ -827,38 +769,49 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
return sh.WritePoints(points)
}
// Measurements returns a slice of sorted measurement names in the given database,
// matching the given condition.
// Measurements returns a slice of all measurements. Measurements accepts an
// optional condition expression. If cond is nil, then all measurements for the
// database will be returned.
func (s *Store) Measurements(database string, cond influxql.Expr) ([]string, error) {
dbi := s.DatabaseIndex(database)
if dbi == nil {
return nil, nil
}
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
// Retrieve measurements from database index. Filter if condition specified.
var mms Measurements
if cond == nil {
mms = dbi.Measurements()
} else {
var err error
mms, _, err = dbi.MeasurementsByExpr(cond)
if err != nil {
return nil, err
var m Measurements
for _, sh := range shards {
var mms Measurements
// Retrieve measurements from database index. Filter if condition specified.
if cond == nil {
mms = sh.Measurements()
} else {
var err error
mms, _, err = sh.MeasurementsByExpr(cond)
if err != nil {
return nil, err
}
}
m = append(m, mms...)
}
// Sort measurements by name.
sort.Sort(mms)
sort.Sort(m)
measurements := make([]string, len(mms))
for i, m := range mms {
measurements[i] = m.Name
measurements := make([]string, 0, len(m))
for _, m := range m {
measurements = append(measurements, m.Name)
}
return measurements, nil
}
// TagValues represents the tag keys and values in a measurement.
// MeasurementSeriesCounts returns the number of measurements and series in all
// the shards' indices.
func (s *Store) MeasurementSeriesCounts(database string) (measuments int, series int) {
// TODO: implement me
return 0, 0
}
type TagValues struct {
Measurement string
Values []KeyValue
@ -870,11 +823,6 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
return nil, errors.New("a condition is required")
}
dbi := s.DatabaseIndex(database)
if dbi == nil {
return nil, nil
}
measurementExpr := influxql.CloneExpr(cond)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
@ -890,18 +838,31 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
return e
}), nil)
mms, ok, err := dbi.MeasurementsByExpr(measurementExpr)
if err != nil {
return nil, err
} else if !ok {
mms = dbi.Measurements()
sort.Sort(mms)
// Get all measurements for the shards we're interested in.
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
var measures Measurements
for _, sh := range shards {
mms, ok, err := sh.MeasurementsByExpr(measurementExpr)
if err != nil {
return nil, err
} else if !ok {
// TODO(edd): can we simplify this so we don't have to check the
// ok value, and we can call sh.measurements with a shard filter
// instead?
mms = sh.Measurements()
}
measures = append(measures, mms...)
}
// If there are no measurements, return immediately.
if len(mms) == 0 {
if len(measures) == 0 {
return nil, nil
}
sort.Sort(measures)
filterExpr := influxql.CloneExpr(cond)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
@ -918,8 +879,8 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
return e
}), nil)
tagValues := make([]TagValues, len(mms))
for i, mm := range mms {
tagValues := make([]TagValues, len(measures))
for i, mm := range measures {
tagValues[i].Measurement = mm.Name
ids, err := mm.SeriesIDsAllOrByExpr(filterExpr)
@ -1053,31 +1014,31 @@ func relativePath(storePath, shardPath string) (string, error) {
// measurementsFromSourcesOrDB returns a list of measurements from the
// sources passed in or, if sources is empty, a list of all
// measurement names from the database passed in.
func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source) (Measurements, error) {
var measurements Measurements
// measurement names from the measurement map passed in.
func measurementsFromSourcesOrDB(measurements map[string]*Measurement, sources ...influxql.Source) (Measurements, error) {
var all Measurements
if len(sources) > 0 {
for _, source := range sources {
if m, ok := source.(*influxql.Measurement); ok {
measurement := db.measurements[m.Name]
measurement := measurements[m.Name]
if measurement == nil {
continue
}
measurements = append(measurements, measurement)
all = append(all, measurement)
} else {
return nil, errors.New("identifiers in FROM clause must be measurement names")
}
}
} else {
// No measurements specified in FROM clause so get all measurements that have series.
for _, m := range db.Measurements() {
for _, m := range measurements {
if m.HasSeries() {
measurements = append(measurements, m)
all = append(all, m)
}
}
}
sort.Sort(measurements)
sort.Sort(all)
return measurements, nil
return all, nil
}

View File

@ -6,6 +6,8 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"testing"
"time"
@ -96,8 +98,6 @@ func TestStore_CreateShard(t *testing.T) {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
} else if di := s.DatabaseIndex("db0"); di == nil {
t.Errorf("expected database index")
}
// Create another shard and verify that it exists.
@ -147,8 +147,6 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
} else if di := s.DatabaseIndex("db0"); di == nil {
t.Errorf("expected database index")
}
dir, e := s.CreateShardSnapshot(1)
@ -160,6 +158,40 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
}
}
func TestStore_Open(t *testing.T) {
s := NewStore()
defer s.Close()
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
t.Fatal(err)
}
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp2", "4"), 0777); err != nil {
t.Fatal(err)
}
if err := os.MkdirAll(filepath.Join(s.Path(), "db1", "rp0", "1"), 0777); err != nil {
t.Fatal(err)
}
// Store should ignore shard since it does not have a numeric name.
if err := s.Open(); err != nil {
t.Fatal(err)
} else if n := len(s.Databases()); n != 2 {
t.Fatalf("unexpected database index count: %d", n)
} else if n := s.ShardN(); n != 3 {
t.Fatalf("unexpected shard count: %d", n)
}
expDatabases := []string{"db0", "db1"}
gotDatabases := s.Databases()
sort.Strings(gotDatabases)
if got, exp := gotDatabases, expDatabases; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %#v, expected %#v", got, exp)
}
}
// Ensure the store reports an error when it can't open a database directory.
func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
s := NewStore()
@ -173,7 +205,7 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
// Store should ignore database since it's a file.
if err := s.Open(); err != nil {
t.Fatal(err)
} else if n := s.DatabaseIndexN(); n != 0 {
} else if n := len(s.Databases()); n != 0 {
t.Fatalf("unexpected database index count: %d", n)
}
}
@ -190,10 +222,12 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
t.Fatal(err)
}
// Store should ignore database since it's a file.
// Store should ignore retention policy since it's a file, and there should
// be no indices created.
if err := s.Open(); err != nil {
t.Fatal(err)
} else if n := s.DatabaseIndexN(); n != 1 {
} else if n := len(s.Databases()); n != 0 {
t.Log(s.Databases())
t.Fatalf("unexpected database index count: %d", n)
}
}
@ -213,7 +247,7 @@ func TestStore_Open_InvalidShard(t *testing.T) {
// Store should ignore shard since it does not have a numeric name.
if err := s.Open(); err != nil {
t.Fatal(err)
} else if n := s.DatabaseIndexN(); n != 1 {
} else if n := len(s.Databases()); n != 0 {
t.Fatalf("unexpected database index count: %d", n)
} else if n := s.ShardN(); n != 0 {
t.Fatalf("unexpected shard count: %d", n)