pull/7913/head
Edd Robinson 2016-09-14 14:55:44 +01:00 committed by Ben Johnson
parent c535e3899a
commit 05bc4dec00
No known key found for this signature in database
GPG Key ID: 81741CD251883081
8 changed files with 123 additions and 160 deletions

View File

@ -500,15 +500,25 @@ func (s *Server) reportServer() {
dbs := s.MetaClient.Databases()
numDatabases := len(dbs)
numMeasurements := 0
numSeries := 0
var (
numMeasurements int64
numSeries int64
)
// Only needed in the case of a data node
if s.TSDBStore != nil {
for _, db := range dbs {
m, s := s.TSDBStore.MeasurementSeriesCounts(db.Name)
numMeasurements += m
numSeries += s
for _, db := range dbs {
name := db.Name
n, err := s.TSDBStore.SeriesCardinality(name)
if err != nil {
s.Logger.Printf("Unable to get series cardinality for database %s: %v", name, err)
} else {
numSeries += n
}
n, err = s.TSDBStore.MeasurementsCardinality(name)
if err != nil {
s.Logger.Printf("Unable to get measurement cardinality for database %s: %v", name, err)
} else {
numMeasurements += n
}
}

View File

@ -18,7 +18,6 @@ type Service struct {
PruneShardGroups() error
}
TSDBStore interface {
ShardIDs() []uint64
DeleteShard(shardID uint64) error
}
@ -119,16 +118,14 @@ func (s *Service) deleteShards() {
}
}
for _, id := range s.TSDBStore.ShardIDs() {
if di, ok := deletedShardIDs[id]; ok {
if err := s.TSDBStore.DeleteShard(id); err != nil {
s.logger.Info(fmt.Sprintf("failed to delete shard ID %d from database %s, retention policy %s: %s",
id, di.db, di.rp, err.Error()))
continue
}
s.logger.Info(fmt.Sprintf("shard ID %d from database %s, retention policy %s, deleted",
id, di.db, di.rp))
for id, info := range deletedShardIDs {
if err := s.TSDBStore.DeleteShard(id); err != nil {
s.logger.Printf("failed to delete shard ID %d from database %s, retention policy %s: %s",
id, info.db, info.rp, err.Error())
continue
}
s.logger.Printf("shard ID %d from database %s, retention policy %s, deleted",
id, info.db, info.rp)
}
if err := s.MetaClient.PruneShardGroups(); err != nil {
s.logger.Info(fmt.Sprintf("error pruning shard groups: %s", err))

View File

@ -28,10 +28,12 @@ var (
type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
WithLogger(zap.Logger)
LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error
CreateSnapshot() (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Restore(r io.Reader, basePath string) error
@ -39,25 +41,18 @@ type Engine interface {
WritePoints(points []models.Point) error
CreateSeries(measurment string, series *Series) (*Series, error)
Series(key string) (*Series, error)
ContainsSeries(keys []string) (map[string]bool, error)
DeleteSeries(keys []string) error
DeleteSeriesRange(keys []string, min, max int64) error
SeriesCount() (n int, err error)
Series(key string) (*Series, error)
SeriesCardinality() (n int64, err error)
CreateMeasurement(name string) (*Measurement, error)
DeleteMeasurement(name string, seriesKeys []string) error
Measurement(name string) (*Measurement, error)
Measurements() (Measurements, error)
MeasurementCardinality() (n int64, err error)
MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
MeasurementsByRegex(re *regexp.Regexp) (Measurements, error)
MeasurementFields(measurement string) *MeasurementFields
DeleteMeasurement(name string, seriesKeys []string) error
CreateSnapshot() (string, error)
SetEnabled(enabled bool)
// Format will return the format for the engine
Format() EngineFormat
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic

View File

@ -282,13 +282,6 @@ func (e *Engine) disableSnapshotCompactions() {
// Path returns the path the engine was opened with.
func (e *Engine) Path() string { return e.path }
// Index returns the database index.
func (e *Engine) Index() *tsdb.DatabaseIndex {
e.mu.Lock()
defer e.mu.Unlock()
return e.index
}
func (e *Engine) Measurement(name string) (*tsdb.Measurement, error) {
return e.index.Measurement(name), nil
}
@ -297,6 +290,10 @@ func (e *Engine) Measurements() (tsdb.Measurements, error) {
return e.index.Measurements(), nil
}
func (e *Engine) MeasurementCardinality() (int64, error) {
panic("TODO: edd")
}
func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
return e.index.MeasurementsByExpr(expr)
}
@ -325,9 +322,8 @@ func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields {
return m
}
// Format returns the format type of this engine.
func (e *Engine) Format() tsdb.EngineFormat {
return tsdb.TSM1Format
func (e *Engine) SeriesCardinality() (int64, error) {
panic("TODO: edd")
}
// EngineStatistics maintains statistics for the engine.
@ -735,9 +731,9 @@ func (e *Engine) WritePoints(points []models.Point) error {
return err
}
// ContainsSeries returns a map of keys indicating whether the key exists and
// containsSeries returns a map of keys indicating whether the key exists and
// has values or not.
func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) {
func (e *Engine) containsSeries(keys []string) (map[string]bool, error) {
// keyMap is used to see if a given key exists. keys
// are the measurement + tagset (minus separate & field)
keyMap := map[string]bool{}
@ -763,8 +759,8 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) {
return keyMap, nil
}
// DeleteSeries removes all series keys from the engine.
func (e *Engine) DeleteSeries(seriesKeys []string) error {
// deleteSeries removes all series keys from the engine.
func (e *Engine) deleteSeries(seriesKeys []string) error {
return e.DeleteSeriesRange(seriesKeys, math.MinInt64, math.MaxInt64)
}
@ -837,7 +833,7 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
// Have we deleted all points for the series? If so, we need to remove
// the series from the index.
existing, err := e.ContainsSeries(seriesKeys)
existing, err := e.containsSeries(seriesKeys)
if err != nil {
return err
}
@ -864,7 +860,7 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
delete(e.measurementFields, name)
e.fieldsMu.Unlock()
if err := e.DeleteSeries(seriesKeys); err != nil {
if err := e.deleteSeries(seriesKeys); err != nil {
return err
}
@ -873,23 +869,6 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
return nil
}
// SeriesCount returns the number of series buckets on the shard.
func (e *Engine) SeriesCount() (n int, err error) {
return e.index.SeriesN(), nil
}
// LastModified returns the time when this shard was last modified.
func (e *Engine) LastModified() time.Time {
walTime := e.WAL.LastWriteTime()
fsTime := e.FileStore.LastModified()
if walTime.After(fsTime) {
return walTime
}
return fsTime
}
func (e *Engine) CreateSeries(measurment string, series *tsdb.Series) (*tsdb.Series, error) {
return e.index.CreateSeriesIndexIfNotExists(measurment, series), nil
}

View File

@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
@ -113,7 +114,7 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
}
// Remove series.
if err := e.DeleteSeries([]string{"cpu,host=A"}); err != nil {
if err := e.DeleteSeriesRange([]string{"cpu,host=A"}, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %s", err.Error())
}
@ -221,11 +222,9 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -276,11 +275,9 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -331,11 +328,9 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -387,11 +382,9 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -443,12 +436,10 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false)
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A F=100 1000000000`,
@ -502,15 +493,14 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.Index().Measurement("cpu").SetFieldName("X")
e.Index().Measurement("cpu").SetFieldName("Y")
e.CreateMeasurement("cpu")
e.MustMeasurement("cpu").SetFieldName("X")
e.MustMeasurement("cpu").SetFieldName("Y")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false)
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A X=10 1000000000`,
@ -595,7 +585,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}
if err := e.DeleteSeries([]string{"cpu,host=A"}); err != nil {
if err := e.DeleteSeriesRange([]string{"cpu,host=A"}, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -760,7 +750,7 @@ func benchmarkEngine_WritePoints(b *testing.B, batchSize int) {
e := MustOpenEngine()
defer e.Close()
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
pp := make([]models.Point, 0, batchSize)
@ -902,10 +892,9 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
e := MustOpenEngine()
// Initialize metadata.
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
e.CreateSeries("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
// Generate time ascending points with jitterred time & value.
rand := rand.New(rand.NewSource(0))
@ -1004,6 +993,16 @@ func (e *Engine) MustWriteSnapshot() {
}
}
// MustMeasurement calls Measurement on the underlying tsdb.Engine, and panics
// if it returns an error.
func (e *Engine) MustMeasurement(name string) *tsdb.Measurement {
m, err := e.Engine.Measurement(name)
if err != nil {
panic(err)
}
return m
}
// WritePointsString parses a string buffer and writes the points.
func (e *Engine) WritePointsString(buf ...string) error {
return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n")))

View File

@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"io"
"log"
"math"
"os"
"path/filepath"
"sort"
@ -127,8 +129,9 @@ type Shard struct {
// NewShard returns a new initialized Shard.
func NewShard(id uint64, path string, walPath string, options EngineOptions) *Shard {
db, rp := DecodeStorePath(path)
db, rp := decodeStorePath(path)
logger := zap.New(zap.NullEncoder())
s := &Shard{
id: id,
path: path,
@ -198,7 +201,7 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
return nil
}
seriesN, _ := s.engine.SeriesCount()
seriesN, _ := s.engine.SeriesCardinality()
tags = s.defaultTags.Merge(tags)
statistics := []models.Statistic{{
Name: "shard",
@ -261,7 +264,7 @@ func (s *Shard) Open() error {
s.engine = e
count, err := s.engine.SeriesCount()
count, err := s.engine.SeriesCardinality()
if err != nil {
return err
}
@ -418,26 +421,9 @@ func (s *Shard) WritePoints(points []models.Point) error {
return writeError
}
// ContainsSeries determines if the shard contains the provided series keys. The
// returned map contains all the provided keys that are in the shard, and the
// value for each key will be true if the shard has values for that key.
func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error) {
if err := s.ready(); err != nil {
return nil, err
}
return s.engine.ContainsSeries(seriesKeys)
}
// DeleteSeries deletes a list of series.
func (s *Shard) DeleteSeries(seriesKeys []string) error {
if err := s.ready(); err != nil {
return err
}
if err := s.engine.DeleteSeries(seriesKeys); err != nil {
return err
}
return nil
return s.DeleteSeriesRange(seriesKeys, math.MinInt64, math.MaxInt64)
}
// DeleteSeriesRange deletes all values from seriesKeys with timestamps between min and max (inclusive).
@ -585,7 +571,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
return nil, nil, err
}
if s.options.Config.MaxSeriesPerDatabase > 0 && cnt+1 > s.options.Config.MaxSeriesPerDatabase {
if s.options.Config.MaxSeriesPerDatabase > 0 && cnt+1 > int64(s.options.Config.MaxSeriesPerDatabase) {
atomic.AddInt64(&s.stats.WritePointsDropped, 1)
dropped++
reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)", s.database, cnt, s.options.Config.MaxSeriesPerDatabase)
@ -681,12 +667,12 @@ func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, erro
return s.engine.MeasurementsByExpr(cond)
}
// SeriesCount returns the number of series buckets on the shard.
func (s *Shard) SeriesCount() (int, error) {
// SeriesCardinality returns the number of series buckets on the shard.
func (s *Shard) SeriesCardinality() (int64, error) {
if err := s.ready(); err != nil {
return 0, err
}
return s.engine.SeriesCount()
return s.engine.SeriesCardinality()
}
// Series returns a series by key.

View File

@ -65,12 +65,12 @@ func TestShardWriteAndIndex(t *testing.T) {
}
validateIndex := func() {
cnt, err := sh.SeriesCount()
cnt, err := sh.SeriesCardinality()
if err != nil {
t.Fatal(err)
}
if got, exp := cnt, 1; got != exp {
if got, exp := cnt, int64(1); got != exp {
t.Fatalf("got %v series, exp %v series in index", got, exp)
}
@ -346,11 +346,11 @@ func TestShardWriteAddNewField(t *testing.T) {
t.Fatalf(err.Error())
}
cnt, err := sh.SeriesCount()
cnt, err := sh.SeriesCardinality()
if err != nil {
t.Fatal(err)
}
if got, exp := cnt, 1; got != exp {
if got, exp := cnt, int64(1); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
@ -470,11 +470,11 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
t.Fatalf(err.Error())
}
cnt, err := sh.SeriesCount()
cnt, err := sh.SeriesCardinality()
if err != nil {
t.Fatal(err)
}
if got, exp := cnt, 1; got != exp {
if got, exp := cnt, int64(1); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
@ -482,10 +482,10 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
sh.Close()
sh.Open()
if cnt, err = sh.SeriesCount(); err != nil {
if cnt, err = sh.SeriesCardinality(); err != nil {
t.Fatal(err)
}
if got, exp := cnt, 1; got != exp {
if got, exp := cnt, int64(1); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}

View File

@ -450,9 +450,16 @@ func (s *Store) DeleteMeasurement(database, name string) error {
}
// filterShards returns a slice of shards where fn returns true
// for the shard.
// for the shard. If the provided predicate is nil then all shards are returned.
func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard {
shards := make([]*Shard, 0, len(s.shards))
var shards []*Shard
if fn == nil {
shards = make([]*Shard, 0, len(s.shards))
fn = func(*Shard) bool { return true }
} else {
shards = make([]*Shard, 0)
}
for _, sh := range s.shards {
if fn(sh) {
shards = append(shards, sh)
@ -504,21 +511,6 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
return err
}
// ShardIDs returns a slice of all ShardIDs under management, in arbitrary order.
func (s *Store) ShardIDs() []uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.shardIDs()
}
func (s *Store) shardIDs() []uint64 {
a := make([]uint64, 0, len(s.shards))
for shardID := range s.shards {
a = append(a, shardID)
}
return a
}
// shardsSlice returns an ordered list of shards.
func (s *Store) shardsSlice() []*Shard {
a := make([]*Shard, 0, len(s.shards))
@ -544,12 +536,14 @@ func (s *Store) Databases() []string {
// DiskSize returns the size of all the shard files in bytes.
// This size does not include the WAL size.
func (s *Store) DiskSize() (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var size int64
for _, shardID := range s.ShardIDs() {
shard := s.Shard(shardID)
sz, err := shard.DiskSize()
s.mu.RLock()
allShards := s.filterShards(nil)
s.mu.RUnlock()
for _, sh := range allShards {
sz, err := sh.DiskSize()
if err != nil {
return 0, err
}
@ -558,6 +552,17 @@ func (s *Store) DiskSize() (int64, error) {
return size, nil
}
// SeriesCardinality returns the series cardinality for the provided database.
func (s *Store) SeriesCardinality(database string) (int64, error) {
panic("TODO: edd")
}
// MeasurementsCardinality returns the measurement cardinality for the provided
// database.
func (s *Store) MeasurementsCardinality(database string) (int64, error) {
panic("TODO: edd")
}
// BackupShard will get the shard and have the engine backup since the passed in
// time to the writer.
func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
@ -669,14 +674,6 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
}
// delete the raw series data.
return s.deleteSeries(database, seriesKeys, min, max)
}
func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int64) error {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
return s.walkShards(shards, func(sh *Shard) error {
if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil {
return err
@ -953,9 +950,9 @@ func (e *Store) filterShowSeriesResult(limit, offset int, rows models.Rows) mode
return filteredSeries
}
// DecodeStorePath extracts the database and retention policy names
// decodeStorePath extracts the database and retention policy names
// from a given shard or WAL path.
func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {
func decodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {
// shardOrWALPath format: /maybe/absolute/base/then/:database/:retentionPolicy/:nameOfShardOrWAL
// Discard the last part of the path (the shard name or the wal name).