diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 74bbe61892..3a3b6b3ab9 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -500,15 +500,25 @@ func (s *Server) reportServer() { dbs := s.MetaClient.Databases() numDatabases := len(dbs) - numMeasurements := 0 - numSeries := 0 + var ( + numMeasurements int64 + numSeries int64 + ) - // Only needed in the case of a data node - if s.TSDBStore != nil { - for _, db := range dbs { - m, s := s.TSDBStore.MeasurementSeriesCounts(db.Name) - numMeasurements += m - numSeries += s + for _, db := range dbs { + name := db.Name + n, err := s.TSDBStore.SeriesCardinality(name) + if err != nil { + s.Logger.Printf("Unable to get series cardinality for database %s: %v", name, err) + } else { + numSeries += n + } + + n, err = s.TSDBStore.MeasurementsCardinality(name) + if err != nil { + s.Logger.Printf("Unable to get measurement cardinality for database %s: %v", name, err) + } else { + numMeasurements += n } } diff --git a/services/retention/service.go b/services/retention/service.go index 41bffb56c2..03efdf8fa7 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -18,7 +18,6 @@ type Service struct { PruneShardGroups() error } TSDBStore interface { - ShardIDs() []uint64 DeleteShard(shardID uint64) error } @@ -119,16 +118,14 @@ func (s *Service) deleteShards() { } } - for _, id := range s.TSDBStore.ShardIDs() { - if di, ok := deletedShardIDs[id]; ok { - if err := s.TSDBStore.DeleteShard(id); err != nil { - s.logger.Info(fmt.Sprintf("failed to delete shard ID %d from database %s, retention policy %s: %s", - id, di.db, di.rp, err.Error())) - continue - } - s.logger.Info(fmt.Sprintf("shard ID %d from database %s, retention policy %s, deleted", - id, di.db, di.rp)) + for id, info := range deletedShardIDs { + if err := s.TSDBStore.DeleteShard(id); err != nil { + s.logger.Printf("failed to delete shard ID %d from database %s, retention policy %s: %s", + id, info.db, info.rp, err.Error()) + continue } + s.logger.Printf("shard ID %d from database %s, retention policy %s, deleted", + id, info.db, info.rp) } if err := s.MetaClient.PruneShardGroups(); err != nil { s.logger.Info(fmt.Sprintf("error pruning shard groups: %s", err)) diff --git a/tsdb/engine.go b/tsdb/engine.go index c37a51763f..2833cd2b00 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -28,10 +28,12 @@ var ( type Engine interface { Open() error Close() error - + SetEnabled(enabled bool) WithLogger(zap.Logger) + LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error + CreateSnapshot() (string, error) Backup(w io.Writer, basePath string, since time.Time) error Restore(r io.Reader, basePath string) error @@ -39,25 +41,18 @@ type Engine interface { WritePoints(points []models.Point) error CreateSeries(measurment string, series *Series) (*Series, error) - Series(key string) (*Series, error) - ContainsSeries(keys []string) (map[string]bool, error) - DeleteSeries(keys []string) error DeleteSeriesRange(keys []string, min, max int64) error - SeriesCount() (n int, err error) + Series(key string) (*Series, error) + SeriesCardinality() (n int64, err error) CreateMeasurement(name string) (*Measurement, error) + DeleteMeasurement(name string, seriesKeys []string) error Measurement(name string) (*Measurement, error) Measurements() (Measurements, error) + MeasurementCardinality() (n int64, err error) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) MeasurementFields(measurement string) *MeasurementFields - DeleteMeasurement(name string, seriesKeys []string) error - - CreateSnapshot() (string, error) - SetEnabled(enabled bool) - - // Format will return the format for the engine - Format() EngineFormat // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index efcac07ae4..d61dd49ab5 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -282,13 +282,6 @@ func (e *Engine) disableSnapshotCompactions() { // Path returns the path the engine was opened with. func (e *Engine) Path() string { return e.path } -// Index returns the database index. -func (e *Engine) Index() *tsdb.DatabaseIndex { - e.mu.Lock() - defer e.mu.Unlock() - return e.index -} - func (e *Engine) Measurement(name string) (*tsdb.Measurement, error) { return e.index.Measurement(name), nil } @@ -297,6 +290,10 @@ func (e *Engine) Measurements() (tsdb.Measurements, error) { return e.index.Measurements(), nil } +func (e *Engine) MeasurementCardinality() (int64, error) { + panic("TODO: edd") +} + func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) { return e.index.MeasurementsByExpr(expr) } @@ -325,9 +322,8 @@ func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields { return m } -// Format returns the format type of this engine. -func (e *Engine) Format() tsdb.EngineFormat { - return tsdb.TSM1Format +func (e *Engine) SeriesCardinality() (int64, error) { + panic("TODO: edd") } // EngineStatistics maintains statistics for the engine. @@ -735,9 +731,9 @@ func (e *Engine) WritePoints(points []models.Point) error { return err } -// ContainsSeries returns a map of keys indicating whether the key exists and +// containsSeries returns a map of keys indicating whether the key exists and // has values or not. -func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) { +func (e *Engine) containsSeries(keys []string) (map[string]bool, error) { // keyMap is used to see if a given key exists. keys // are the measurement + tagset (minus separate & field) keyMap := map[string]bool{} @@ -763,8 +759,8 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) { return keyMap, nil } -// DeleteSeries removes all series keys from the engine. -func (e *Engine) DeleteSeries(seriesKeys []string) error { +// deleteSeries removes all series keys from the engine. +func (e *Engine) deleteSeries(seriesKeys []string) error { return e.DeleteSeriesRange(seriesKeys, math.MinInt64, math.MaxInt64) } @@ -837,7 +833,7 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { // Have we deleted all points for the series? If so, we need to remove // the series from the index. - existing, err := e.ContainsSeries(seriesKeys) + existing, err := e.containsSeries(seriesKeys) if err != nil { return err } @@ -864,7 +860,7 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error { delete(e.measurementFields, name) e.fieldsMu.Unlock() - if err := e.DeleteSeries(seriesKeys); err != nil { + if err := e.deleteSeries(seriesKeys); err != nil { return err } @@ -873,23 +869,6 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error { return nil } -// SeriesCount returns the number of series buckets on the shard. -func (e *Engine) SeriesCount() (n int, err error) { - return e.index.SeriesN(), nil -} - -// LastModified returns the time when this shard was last modified. -func (e *Engine) LastModified() time.Time { - walTime := e.WAL.LastWriteTime() - fsTime := e.FileStore.LastModified() - - if walTime.After(fsTime) { - return walTime - } - - return fsTime -} - func (e *Engine) CreateSeries(measurment string, series *tsdb.Series) (*tsdb.Series, error) { return e.index.CreateSeriesIndexIfNotExists(measurment, series), nil } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index f571be9973..d2cc08b3f4 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -5,6 +5,7 @@ import ( "bytes" "fmt" "io/ioutil" + "math" "math/rand" "os" "path/filepath" @@ -113,7 +114,7 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) { } // Remove series. - if err := e.DeleteSeries([]string{"cpu,host=A"}); err != nil { + if err := e.DeleteSeriesRange([]string{"cpu,host=A"}, math.MinInt64, math.MaxInt64); err != nil { t.Fatalf("failed to delete series: %s", err.Error()) } @@ -221,11 +222,9 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { e := MustOpenEngine() defer e.Close() - e.Index().CreateMeasurementIndexIfNotExists("cpu") + e.CreateMeasurement("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) - si.AssignShard(1) - + e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -276,11 +275,9 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) { e := MustOpenEngine() defer e.Close() - e.Index().CreateMeasurementIndexIfNotExists("cpu") + e.CreateMeasurement("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) - si.AssignShard(1) - + e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -331,11 +328,9 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { e := MustOpenEngine() defer e.Close() - e.Index().CreateMeasurementIndexIfNotExists("cpu") + e.CreateMeasurement("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) - si.AssignShard(1) - + e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -387,11 +382,9 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { e := MustOpenEngine() defer e.Close() - e.Index().CreateMeasurementIndexIfNotExists("cpu") + e.CreateMeasurement("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) - si.AssignShard(1) - + e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -443,12 +436,10 @@ func TestEngine_CreateIterator_Aux(t *testing.T) { e := MustOpenEngine() defer e.Close() - e.Index().CreateMeasurementIndexIfNotExists("cpu") + e.CreateMeasurement("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false) - si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) - si.AssignShard(1) - + e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A F=100 1000000000`, @@ -502,15 +493,14 @@ func TestEngine_CreateIterator_Condition(t *testing.T) { e := MustOpenEngine() defer e.Close() - e.Index().CreateMeasurementIndexIfNotExists("cpu") - e.Index().Measurement("cpu").SetFieldName("X") - e.Index().Measurement("cpu").SetFieldName("Y") + e.CreateMeasurement("cpu") + e.MustMeasurement("cpu").SetFieldName("X") + e.MustMeasurement("cpu").SetFieldName("Y") + e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false) - si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) - si.AssignShard(1) - + e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A X=10 1000000000`, @@ -595,7 +585,7 @@ func TestEngine_DeleteSeries(t *testing.T) { t.Fatalf("series count mismatch: exp %v, got %v", exp, got) } - if err := e.DeleteSeries([]string{"cpu,host=A"}); err != nil { + if err := e.DeleteSeriesRange([]string{"cpu,host=A"}, math.MinInt64, math.MaxInt64); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -760,7 +750,7 @@ func benchmarkEngine_WritePoints(b *testing.B, batchSize int) { e := MustOpenEngine() defer e.Close() - e.Index().CreateMeasurementIndexIfNotExists("cpu") + e.CreateMeasurement("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) pp := make([]models.Point, 0, batchSize) @@ -902,10 +892,9 @@ func MustInitBenchmarkEngine(pointN int) *Engine { e := MustOpenEngine() // Initialize metadata. - e.Index().CreateMeasurementIndexIfNotExists("cpu") + e.CreateMeasurement("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) - si.AssignShard(1) + e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) // Generate time ascending points with jitterred time & value. rand := rand.New(rand.NewSource(0)) @@ -1004,6 +993,16 @@ func (e *Engine) MustWriteSnapshot() { } } +// MustMeasurement calls Measurement on the underlying tsdb.Engine, and panics +// if it returns an error. +func (e *Engine) MustMeasurement(name string) *tsdb.Measurement { + m, err := e.Engine.Measurement(name) + if err != nil { + panic(err) + } + return m +} + // WritePointsString parses a string buffer and writes the points. func (e *Engine) WritePointsString(buf ...string) error { return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n"))) diff --git a/tsdb/shard.go b/tsdb/shard.go index fb087ac880..19141e3b53 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "io" + "log" + "math" "os" "path/filepath" "sort" @@ -127,8 +129,9 @@ type Shard struct { // NewShard returns a new initialized Shard. func NewShard(id uint64, path string, walPath string, options EngineOptions) *Shard { - db, rp := DecodeStorePath(path) + db, rp := decodeStorePath(path) logger := zap.New(zap.NullEncoder()) + s := &Shard{ id: id, path: path, @@ -198,7 +201,7 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { return nil } - seriesN, _ := s.engine.SeriesCount() + seriesN, _ := s.engine.SeriesCardinality() tags = s.defaultTags.Merge(tags) statistics := []models.Statistic{{ Name: "shard", @@ -261,7 +264,7 @@ func (s *Shard) Open() error { s.engine = e - count, err := s.engine.SeriesCount() + count, err := s.engine.SeriesCardinality() if err != nil { return err } @@ -418,26 +421,9 @@ func (s *Shard) WritePoints(points []models.Point) error { return writeError } -// ContainsSeries determines if the shard contains the provided series keys. The -// returned map contains all the provided keys that are in the shard, and the -// value for each key will be true if the shard has values for that key. -func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error) { - if err := s.ready(); err != nil { - return nil, err - } - - return s.engine.ContainsSeries(seriesKeys) -} - // DeleteSeries deletes a list of series. func (s *Shard) DeleteSeries(seriesKeys []string) error { - if err := s.ready(); err != nil { - return err - } - if err := s.engine.DeleteSeries(seriesKeys); err != nil { - return err - } - return nil + return s.DeleteSeriesRange(seriesKeys, math.MinInt64, math.MaxInt64) } // DeleteSeriesRange deletes all values from seriesKeys with timestamps between min and max (inclusive). @@ -585,7 +571,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, return nil, nil, err } - if s.options.Config.MaxSeriesPerDatabase > 0 && cnt+1 > s.options.Config.MaxSeriesPerDatabase { + if s.options.Config.MaxSeriesPerDatabase > 0 && cnt+1 > int64(s.options.Config.MaxSeriesPerDatabase) { atomic.AddInt64(&s.stats.WritePointsDropped, 1) dropped++ reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)", s.database, cnt, s.options.Config.MaxSeriesPerDatabase) @@ -681,12 +667,12 @@ func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, erro return s.engine.MeasurementsByExpr(cond) } -// SeriesCount returns the number of series buckets on the shard. -func (s *Shard) SeriesCount() (int, error) { +// SeriesCardinality returns the number of series buckets on the shard. +func (s *Shard) SeriesCardinality() (int64, error) { if err := s.ready(); err != nil { return 0, err } - return s.engine.SeriesCount() + return s.engine.SeriesCardinality() } // Series returns a series by key. diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 7ec41742a5..d75a86e733 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -65,12 +65,12 @@ func TestShardWriteAndIndex(t *testing.T) { } validateIndex := func() { - cnt, err := sh.SeriesCount() + cnt, err := sh.SeriesCardinality() if err != nil { t.Fatal(err) } - if got, exp := cnt, 1; got != exp { + if got, exp := cnt, int64(1); got != exp { t.Fatalf("got %v series, exp %v series in index", got, exp) } @@ -346,11 +346,11 @@ func TestShardWriteAddNewField(t *testing.T) { t.Fatalf(err.Error()) } - cnt, err := sh.SeriesCount() + cnt, err := sh.SeriesCardinality() if err != nil { t.Fatal(err) } - if got, exp := cnt, 1; got != exp { + if got, exp := cnt, int64(1); got != exp { t.Fatalf("got %d series, exp %d series in index", got, exp) } @@ -470,11 +470,11 @@ func TestShard_Close_RemoveIndex(t *testing.T) { t.Fatalf(err.Error()) } - cnt, err := sh.SeriesCount() + cnt, err := sh.SeriesCardinality() if err != nil { t.Fatal(err) } - if got, exp := cnt, 1; got != exp { + if got, exp := cnt, int64(1); got != exp { t.Fatalf("got %d series, exp %d series in index", got, exp) } @@ -482,10 +482,10 @@ func TestShard_Close_RemoveIndex(t *testing.T) { sh.Close() sh.Open() - if cnt, err = sh.SeriesCount(); err != nil { + if cnt, err = sh.SeriesCardinality(); err != nil { t.Fatal(err) } - if got, exp := cnt, 1; got != exp { + if got, exp := cnt, int64(1); got != exp { t.Fatalf("got %d series, exp %d series in index", got, exp) } diff --git a/tsdb/store.go b/tsdb/store.go index e1162b45f7..9de5022015 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -450,9 +450,16 @@ func (s *Store) DeleteMeasurement(database, name string) error { } // filterShards returns a slice of shards where fn returns true -// for the shard. +// for the shard. If the provided predicate is nil then all shards are returned. func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard { - shards := make([]*Shard, 0, len(s.shards)) + var shards []*Shard + if fn == nil { + shards = make([]*Shard, 0, len(s.shards)) + fn = func(*Shard) bool { return true } + } else { + shards = make([]*Shard, 0) + } + for _, sh := range s.shards { if fn(sh) { shards = append(shards, sh) @@ -504,21 +511,6 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { return err } -// ShardIDs returns a slice of all ShardIDs under management, in arbitrary order. -func (s *Store) ShardIDs() []uint64 { - s.mu.RLock() - defer s.mu.RUnlock() - return s.shardIDs() -} - -func (s *Store) shardIDs() []uint64 { - a := make([]uint64, 0, len(s.shards)) - for shardID := range s.shards { - a = append(a, shardID) - } - return a -} - // shardsSlice returns an ordered list of shards. func (s *Store) shardsSlice() []*Shard { a := make([]*Shard, 0, len(s.shards)) @@ -544,12 +536,14 @@ func (s *Store) Databases() []string { // DiskSize returns the size of all the shard files in bytes. // This size does not include the WAL size. func (s *Store) DiskSize() (int64, error) { - s.mu.RLock() - defer s.mu.RUnlock() var size int64 - for _, shardID := range s.ShardIDs() { - shard := s.Shard(shardID) - sz, err := shard.DiskSize() + + s.mu.RLock() + allShards := s.filterShards(nil) + s.mu.RUnlock() + + for _, sh := range allShards { + sz, err := sh.DiskSize() if err != nil { return 0, err } @@ -558,6 +552,17 @@ func (s *Store) DiskSize() (int64, error) { return size, nil } +// SeriesCardinality returns the series cardinality for the provided database. +func (s *Store) SeriesCardinality(database string) (int64, error) { + panic("TODO: edd") +} + +// MeasurementsCardinality returns the measurement cardinality for the provided +// database. +func (s *Store) MeasurementsCardinality(database string) (int64, error) { + panic("TODO: edd") +} + // BackupShard will get the shard and have the engine backup since the passed in // time to the writer. func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { @@ -669,14 +674,6 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi } // delete the raw series data. - return s.deleteSeries(database, seriesKeys, min, max) -} - -func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int64) error { - s.mu.RLock() - shards := s.filterShards(byDatabase(database)) - s.mu.RUnlock() - return s.walkShards(shards, func(sh *Shard) error { if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil { return err @@ -953,9 +950,9 @@ func (e *Store) filterShowSeriesResult(limit, offset int, rows models.Rows) mode return filteredSeries } -// DecodeStorePath extracts the database and retention policy names +// decodeStorePath extracts the database and retention policy names // from a given shard or WAL path. -func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string) { +func decodeStorePath(shardOrWALPath string) (database, retentionPolicy string) { // shardOrWALPath format: /maybe/absolute/base/then/:database/:retentionPolicy/:nameOfShardOrWAL // Discard the last part of the path (the shard name or the wal name).