2016-02-10 18:30:52 +00:00
|
|
|
|
package tsdb // import "github.com/influxdata/influxdb/tsdb"
|
2015-05-26 19:56:54 +00:00
|
|
|
|
|
|
|
|
|
import (
|
2017-07-27 11:48:31 +00:00
|
|
|
|
"bytes"
|
2016-02-12 22:10:02 +00:00
|
|
|
|
"errors"
|
2015-05-26 19:56:54 +00:00
|
|
|
|
"fmt"
|
2015-12-25 13:23:22 +00:00
|
|
|
|
"io"
|
2015-05-28 22:02:12 +00:00
|
|
|
|
"io/ioutil"
|
2015-05-26 19:56:54 +00:00
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
2016-05-13 21:10:18 +00:00
|
|
|
|
"runtime"
|
2015-11-04 21:06:06 +00:00
|
|
|
|
"sort"
|
2015-05-28 22:02:12 +00:00
|
|
|
|
"strconv"
|
2016-11-16 18:57:55 +00:00
|
|
|
|
"strings"
|
2015-05-28 22:02:12 +00:00
|
|
|
|
"sync"
|
2015-09-29 02:50:00 +00:00
|
|
|
|
"time"
|
2015-05-28 22:02:12 +00:00
|
|
|
|
|
2018-02-21 20:08:44 +00:00
|
|
|
|
"github.com/influxdata/influxdb/logger"
|
2018-02-09 15:29:42 +00:00
|
|
|
|
"github.com/influxdata/influxdb/pkg/estimator/hll"
|
|
|
|
|
|
2016-02-10 17:26:18 +00:00
|
|
|
|
"github.com/influxdata/influxdb/models"
|
2016-09-21 15:04:37 +00:00
|
|
|
|
"github.com/influxdata/influxdb/pkg/estimator"
|
2016-07-18 18:00:58 +00:00
|
|
|
|
"github.com/influxdata/influxdb/pkg/limiter"
|
2017-09-19 14:38:16 +00:00
|
|
|
|
"github.com/influxdata/influxdb/query"
|
2017-10-30 21:40:26 +00:00
|
|
|
|
"github.com/influxdata/influxql"
|
2017-10-27 17:27:01 +00:00
|
|
|
|
"go.uber.org/zap"
|
2015-05-26 19:56:54 +00:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// ErrShardNotFound is returned when trying to get a non existing shard.
|
2015-05-26 19:56:54 +00:00
|
|
|
|
ErrShardNotFound = fmt.Errorf("shard not found")
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// ErrStoreClosed is returned when trying to use a closed Store.
|
2016-02-10 20:04:18 +00:00
|
|
|
|
ErrStoreClosed = fmt.Errorf("store is closed")
|
2015-05-26 19:56:54 +00:00
|
|
|
|
)
|
|
|
|
|
|
2016-09-26 13:03:31 +00:00
|
|
|
|
// Statistics gathered by the store.
|
|
|
|
|
const (
|
|
|
|
|
statDatabaseSeries = "numSeries" // number of series in a database
|
|
|
|
|
statDatabaseMeasurements = "numMeasurements" // number of measurements in a database
|
|
|
|
|
)
|
|
|
|
|
|
2018-01-03 15:44:58 +00:00
|
|
|
|
// SeriesFileDirectory is the name of the directory containing series files for
|
|
|
|
|
// a database.
|
2018-01-05 16:40:23 +00:00
|
|
|
|
const SeriesFileDirectory = "_series"
|
2017-11-15 23:09:25 +00:00
|
|
|
|
|
2016-02-10 20:04:18 +00:00
|
|
|
|
// Store manages shards and indexes for databases.
|
2015-05-26 19:56:54 +00:00
|
|
|
|
type Store struct {
|
2017-12-15 14:24:26 +00:00
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
shards map[uint64]*Shard
|
|
|
|
|
databases map[string]struct{}
|
|
|
|
|
sfiles map[string]*SeriesFile
|
|
|
|
|
SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests.
|
|
|
|
|
path string
|
2016-02-15 13:00:58 +00:00
|
|
|
|
|
2016-12-19 16:57:05 +00:00
|
|
|
|
// shared per-database indexes, only if using "inmem".
|
|
|
|
|
indexes map[string]interface{}
|
|
|
|
|
|
2018-02-20 19:00:08 +00:00
|
|
|
|
// Maintains a set of shards that are in the process of deletion.
|
|
|
|
|
// This prevents new shards from being created while old ones are being deleted.
|
|
|
|
|
pendingShardDeletes map[uint64]struct{}
|
|
|
|
|
|
2015-07-22 14:53:20 +00:00
|
|
|
|
EngineOptions EngineOptions
|
2016-11-15 16:20:00 +00:00
|
|
|
|
|
2017-10-27 17:27:01 +00:00
|
|
|
|
baseLogger *zap.Logger
|
|
|
|
|
Logger *zap.Logger
|
2016-04-20 20:07:08 +00:00
|
|
|
|
|
2015-09-29 02:50:00 +00:00
|
|
|
|
closing chan struct{}
|
|
|
|
|
wg sync.WaitGroup
|
2015-10-06 20:00:31 +00:00
|
|
|
|
opened bool
|
2015-05-26 19:56:54 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-02-10 20:04:18 +00:00
|
|
|
|
// NewStore returns a new store with the given path and a default configuration.
|
|
|
|
|
// The returned store must be initialized by calling Open before using it.
|
2015-11-04 21:06:06 +00:00
|
|
|
|
func NewStore(path string) *Store {
|
2017-10-27 17:27:01 +00:00
|
|
|
|
logger := zap.NewNop()
|
2015-11-04 21:06:06 +00:00
|
|
|
|
return &Store{
|
2018-02-20 19:00:08 +00:00
|
|
|
|
databases: make(map[string]struct{}),
|
|
|
|
|
path: path,
|
|
|
|
|
sfiles: make(map[string]*SeriesFile),
|
|
|
|
|
indexes: make(map[string]interface{}),
|
|
|
|
|
pendingShardDeletes: make(map[uint64]struct{}),
|
|
|
|
|
EngineOptions: NewEngineOptions(),
|
|
|
|
|
Logger: logger,
|
|
|
|
|
baseLogger: logger,
|
2016-04-20 20:07:08 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// WithLogger sets the logger for the store.
|
2017-10-27 17:27:01 +00:00
|
|
|
|
func (s *Store) WithLogger(log *zap.Logger) {
|
2016-12-01 18:26:23 +00:00
|
|
|
|
s.baseLogger = log
|
|
|
|
|
s.Logger = log.With(zap.String("service", "store"))
|
|
|
|
|
for _, sh := range s.shards {
|
|
|
|
|
sh.WithLogger(s.baseLogger)
|
2015-11-04 21:06:06 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// Statistics returns statistics for period monitoring.
|
2016-07-07 16:13:56 +00:00
|
|
|
|
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
shards := s.shardsSlice()
|
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
|
2016-09-26 13:03:31 +00:00
|
|
|
|
// Add all the series and measurements cardinality estimations.
|
|
|
|
|
databases := s.Databases()
|
|
|
|
|
statistics := make([]models.Statistic, 0, len(databases))
|
|
|
|
|
for _, database := range databases {
|
|
|
|
|
sc, err := s.SeriesCardinality(database)
|
|
|
|
|
if err != nil {
|
2018-02-15 21:47:08 +00:00
|
|
|
|
s.Logger.Info("Cannot retrieve series cardinality", zap.Error(err))
|
2016-09-26 13:03:31 +00:00
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mc, err := s.MeasurementsCardinality(database)
|
|
|
|
|
if err != nil {
|
2018-02-15 21:47:08 +00:00
|
|
|
|
s.Logger.Info("Cannot retrieve measurement cardinality", zap.Error(err))
|
2016-09-26 13:03:31 +00:00
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
statistics = append(statistics, models.Statistic{
|
|
|
|
|
Name: "database",
|
|
|
|
|
Tags: models.StatisticTags{"database": database}.Merge(tags),
|
|
|
|
|
Values: map[string]interface{}{
|
|
|
|
|
statDatabaseSeries: sc,
|
|
|
|
|
statDatabaseMeasurements: mc,
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Gather all statistics for all shards.
|
2016-07-07 16:13:56 +00:00
|
|
|
|
for _, shard := range shards {
|
|
|
|
|
statistics = append(statistics, shard.Statistics(tags)...)
|
|
|
|
|
}
|
|
|
|
|
return statistics
|
|
|
|
|
}
|
|
|
|
|
|
2015-06-08 19:07:05 +00:00
|
|
|
|
// Path returns the store's root path.
|
|
|
|
|
func (s *Store) Path() string { return s.path }
|
|
|
|
|
|
2016-02-10 20:04:18 +00:00
|
|
|
|
// Open initializes the store, creating all necessary directories, loading all
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// shards as well as initializing periodic maintenance of them.
|
2015-11-04 21:06:06 +00:00
|
|
|
|
func (s *Store) Open() error {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
2017-11-07 12:48:43 +00:00
|
|
|
|
if s.opened {
|
|
|
|
|
// Already open
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
|
s.closing = make(chan struct{})
|
2016-02-23 20:07:21 +00:00
|
|
|
|
s.shards = map[uint64]*Shard{}
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
2018-02-15 21:47:08 +00:00
|
|
|
|
s.Logger.Info("Using data dir", zap.String("path", s.Path()))
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
|
|
|
|
// Create directory.
|
|
|
|
|
if err := os.MkdirAll(s.path, 0777); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := s.loadShards(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.opened = true
|
2017-05-02 15:20:01 +00:00
|
|
|
|
s.wg.Add(1)
|
|
|
|
|
go s.monitorShards()
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) loadShards() error {
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// res holds the result from opening each shard in a goroutine
|
2016-01-11 18:00:25 +00:00
|
|
|
|
type res struct {
|
|
|
|
|
s *Shard
|
|
|
|
|
err error
|
|
|
|
|
}
|
2016-03-16 23:15:22 +00:00
|
|
|
|
|
2017-05-01 17:11:29 +00:00
|
|
|
|
// Setup a shared limiter for compactions
|
|
|
|
|
lim := s.EngineOptions.Config.MaxConcurrentCompactions
|
|
|
|
|
if lim == 0 {
|
2017-09-20 21:27:34 +00:00
|
|
|
|
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions
|
2017-12-06 16:40:49 +00:00
|
|
|
|
|
|
|
|
|
// On systems with more cores, cap at 4 to reduce disk utilization
|
|
|
|
|
if lim > 4 {
|
|
|
|
|
lim = 4
|
|
|
|
|
}
|
|
|
|
|
|
2017-09-20 21:27:34 +00:00
|
|
|
|
if lim < 1 {
|
|
|
|
|
lim = 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Don't allow more compactions to run than cores.
|
|
|
|
|
if lim > runtime.GOMAXPROCS(0) {
|
2017-05-01 17:11:29 +00:00
|
|
|
|
lim = runtime.GOMAXPROCS(0)
|
|
|
|
|
}
|
|
|
|
|
|
2017-09-27 00:15:39 +00:00
|
|
|
|
s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)
|
2017-09-20 21:27:34 +00:00
|
|
|
|
|
2017-12-12 19:16:16 +00:00
|
|
|
|
// Env var to disable throughput limiter. This will be moved to a config option in 1.5.
|
|
|
|
|
if os.Getenv("INFLUXDB_DATA_COMPACTION_THROUGHPUT") == "" {
|
|
|
|
|
s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(48*1024*1024, 48*1024*1024)
|
|
|
|
|
} else {
|
|
|
|
|
s.Logger.Info("Compaction throughput limit disabled")
|
|
|
|
|
}
|
|
|
|
|
|
2018-02-21 20:08:44 +00:00
|
|
|
|
log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb.open")
|
|
|
|
|
defer logEnd()
|
|
|
|
|
|
2017-09-20 21:27:34 +00:00
|
|
|
|
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
2016-01-11 18:00:25 +00:00
|
|
|
|
resC := make(chan *res)
|
|
|
|
|
var n int
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Determine how many shards we need to open by checking the store path.
|
|
|
|
|
dbDirs, err := ioutil.ReadDir(s.path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, db := range dbDirs {
|
2018-02-21 20:08:44 +00:00
|
|
|
|
dbPath := filepath.Join(s.path, db.Name())
|
2016-09-01 12:40:16 +00:00
|
|
|
|
if !db.IsDir() {
|
2018-02-21 20:08:44 +00:00
|
|
|
|
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
|
2016-09-01 12:40:16 +00:00
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-15 23:09:25 +00:00
|
|
|
|
// Load series file.
|
|
|
|
|
sfile, err := s.openSeriesFile(db.Name())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-19 16:57:05 +00:00
|
|
|
|
// Retrieve database index.
|
|
|
|
|
idx, err := s.createIndexIfNotExists(db.Name())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Load each retention policy within the database directory.
|
2018-02-21 20:08:44 +00:00
|
|
|
|
rpDirs, err := ioutil.ReadDir(dbPath)
|
2015-11-04 21:06:06 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
for _, rp := range rpDirs {
|
2018-02-21 20:08:44 +00:00
|
|
|
|
rpPath := filepath.Join(s.path, db.Name(), rp.Name())
|
2015-11-04 21:06:06 +00:00
|
|
|
|
if !rp.IsDir() {
|
2018-02-21 20:08:44 +00:00
|
|
|
|
log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
|
2015-11-04 21:06:06 +00:00
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-04 14:19:20 +00:00
|
|
|
|
// The .series directory is not a retention policy.
|
|
|
|
|
if rp.Name() == SeriesFileDirectory {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2018-02-21 20:08:44 +00:00
|
|
|
|
shardDirs, err := ioutil.ReadDir(rpPath)
|
2015-11-04 21:06:06 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2017-02-02 17:43:48 +00:00
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
for _, sh := range shardDirs {
|
2016-01-11 18:00:25 +00:00
|
|
|
|
n++
|
2016-09-01 12:40:16 +00:00
|
|
|
|
go func(db, rp, sh string) {
|
2016-07-16 05:26:25 +00:00
|
|
|
|
t.Take()
|
|
|
|
|
defer t.Release()
|
2016-03-16 23:15:22 +00:00
|
|
|
|
|
2016-01-11 18:00:25 +00:00
|
|
|
|
start := time.Now()
|
|
|
|
|
path := filepath.Join(s.path, db, rp, sh)
|
|
|
|
|
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
|
|
|
|
|
|
|
|
|
|
// Shard file names are numeric shardIDs
|
|
|
|
|
shardID, err := strconv.ParseUint(sh, 10, 64)
|
|
|
|
|
if err != nil {
|
2018-02-21 20:08:44 +00:00
|
|
|
|
log.Info("invalid shard ID found at path", zap.String("path", path))
|
2016-01-11 18:00:25 +00:00
|
|
|
|
resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-19 16:57:05 +00:00
|
|
|
|
// Copy options and assign shared index.
|
|
|
|
|
opt := s.EngineOptions
|
|
|
|
|
opt.InmemIndex = idx
|
|
|
|
|
|
2018-01-10 23:37:18 +00:00
|
|
|
|
// Provide an implementation of the ShardIDSets
|
|
|
|
|
opt.SeriesIDSets = shardSet{store: s, db: db}
|
|
|
|
|
|
2017-02-02 17:04:25 +00:00
|
|
|
|
// Existing shards should continue to use inmem index.
|
|
|
|
|
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
|
|
|
|
|
opt.IndexVersion = "inmem"
|
|
|
|
|
}
|
|
|
|
|
|
2016-11-15 16:20:00 +00:00
|
|
|
|
// Open engine.
|
2017-11-15 23:09:25 +00:00
|
|
|
|
shard := NewShard(shardID, path, walPath, sfile, opt)
|
2017-05-01 15:14:04 +00:00
|
|
|
|
|
|
|
|
|
// Disable compactions, writes and queries until all shards are loaded
|
|
|
|
|
shard.EnableOnOpen = false
|
2016-12-01 18:26:23 +00:00
|
|
|
|
shard.WithLogger(s.baseLogger)
|
2016-01-11 18:00:25 +00:00
|
|
|
|
|
|
|
|
|
err = shard.Open()
|
|
|
|
|
if err != nil {
|
2018-02-21 20:08:44 +00:00
|
|
|
|
log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err))
|
2016-01-11 18:00:25 +00:00
|
|
|
|
resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resC <- &res{s: shard}
|
2018-02-21 20:08:44 +00:00
|
|
|
|
log.Info("Opened shard", zap.String("path", path), zap.Duration("duration", time.Since(start)))
|
2017-01-06 16:31:25 +00:00
|
|
|
|
}(db.Name(), rp.Name(), sh.Name())
|
2015-11-04 21:06:06 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Gather results of opening shards concurrently, keeping track of how
|
|
|
|
|
// many databases we are managing.
|
2016-01-11 18:00:25 +00:00
|
|
|
|
for i := 0; i < n; i++ {
|
|
|
|
|
res := <-resC
|
|
|
|
|
if res.err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
s.shards[res.s.id] = res.s
|
2016-09-01 12:40:16 +00:00
|
|
|
|
s.databases[res.s.database] = struct{}{}
|
2016-01-11 18:00:25 +00:00
|
|
|
|
}
|
|
|
|
|
close(resC)
|
2017-05-01 15:14:04 +00:00
|
|
|
|
|
|
|
|
|
// Enable all shards
|
|
|
|
|
for _, sh := range s.shards {
|
|
|
|
|
sh.SetEnabled(true)
|
2017-05-02 15:20:01 +00:00
|
|
|
|
if sh.IsIdle() {
|
2017-09-14 18:42:34 +00:00
|
|
|
|
if err := sh.Free(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2017-05-02 15:20:01 +00:00
|
|
|
|
}
|
2017-05-01 15:14:04 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-10 20:04:18 +00:00
|
|
|
|
// Close closes the store and all associated shards. After calling Close accessing
|
|
|
|
|
// shards through the Store will result in ErrStoreClosed being returned.
|
2015-11-04 21:06:06 +00:00
|
|
|
|
func (s *Store) Close() error {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
if s.opened {
|
|
|
|
|
close(s.closing)
|
|
|
|
|
}
|
2017-11-07 12:48:43 +00:00
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
|
s.wg.Wait()
|
2017-11-07 12:48:43 +00:00
|
|
|
|
// No other goroutines accessing the store, so no need for a Lock.
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
2016-10-10 15:42:02 +00:00
|
|
|
|
// Close all the shards in parallel.
|
|
|
|
|
if err := s.walkShards(s.shardsSlice(), func(sh *Shard) error {
|
2018-01-15 15:06:14 +00:00
|
|
|
|
return sh.Close()
|
2016-10-10 15:42:02 +00:00
|
|
|
|
}); err != nil {
|
|
|
|
|
return err
|
2015-11-04 21:06:06 +00:00
|
|
|
|
}
|
2016-10-10 15:42:02 +00:00
|
|
|
|
|
2017-11-07 12:48:43 +00:00
|
|
|
|
s.mu.Lock()
|
2017-12-15 14:24:26 +00:00
|
|
|
|
for _, sfile := range s.sfiles {
|
|
|
|
|
// Close out the series files.
|
|
|
|
|
if err := sfile.Close(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-23 20:07:21 +00:00
|
|
|
|
s.shards = nil
|
2017-12-15 14:24:26 +00:00
|
|
|
|
s.sfiles = map[string]*SeriesFile{}
|
2017-11-07 12:48:43 +00:00
|
|
|
|
s.opened = false // Store may now be opened again.
|
|
|
|
|
s.mu.Unlock()
|
2015-11-04 21:06:06 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-12-15 14:24:26 +00:00
|
|
|
|
// openSeriesFile either returns or creates a series file for the provided
|
|
|
|
|
// database. It must be called under a full lock.
|
2017-11-15 23:09:25 +00:00
|
|
|
|
func (s *Store) openSeriesFile(database string) (*SeriesFile, error) {
|
|
|
|
|
if sfile := s.sfiles[database]; sfile != nil {
|
|
|
|
|
return sfile, nil
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-03 15:44:58 +00:00
|
|
|
|
sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileDirectory))
|
2018-01-02 19:20:03 +00:00
|
|
|
|
sfile.Logger = s.baseLogger
|
2017-11-15 23:09:25 +00:00
|
|
|
|
if err := sfile.Open(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
s.sfiles[database] = sfile
|
|
|
|
|
return sfile, nil
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-08 18:34:04 +00:00
|
|
|
|
func (s *Store) seriesFile(database string) *SeriesFile {
|
2017-12-15 14:24:26 +00:00
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
2018-01-08 18:34:04 +00:00
|
|
|
|
return s.sfiles[database]
|
2017-12-15 14:24:26 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-03-30 12:05:31 +00:00
|
|
|
|
// createIndexIfNotExists returns a shared index for a database, if the inmem
|
|
|
|
|
// index is being used. If the TSI index is being used, then this method is
|
|
|
|
|
// basically a no-op.
|
2016-12-19 16:57:05 +00:00
|
|
|
|
func (s *Store) createIndexIfNotExists(name string) (interface{}, error) {
|
|
|
|
|
if idx := s.indexes[name]; idx != nil {
|
|
|
|
|
return idx, nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-15 23:09:25 +00:00
|
|
|
|
sfile, err := s.openSeriesFile(name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
idx, err := NewInmemIndex(name, sfile)
|
2016-12-19 16:57:05 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.indexes[name] = idx
|
|
|
|
|
return idx, nil
|
|
|
|
|
}
|
|
|
|
|
|
2015-07-20 19:59:46 +00:00
|
|
|
|
// Shard returns a shard by id.
|
|
|
|
|
func (s *Store) Shard(id uint64) *Shard {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
2016-02-23 20:07:21 +00:00
|
|
|
|
sh, ok := s.shards[id]
|
2016-02-15 13:00:58 +00:00
|
|
|
|
if !ok {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2016-02-23 20:07:21 +00:00
|
|
|
|
return sh
|
2015-07-20 19:59:46 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
|
// Shards returns a list of shards by id.
|
|
|
|
|
func (s *Store) Shards(ids []uint64) []*Shard {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
|
a := make([]*Shard, 0, len(ids))
|
|
|
|
|
for _, id := range ids {
|
2016-02-23 20:07:21 +00:00
|
|
|
|
sh, ok := s.shards[id]
|
2016-02-15 13:00:58 +00:00
|
|
|
|
if !ok {
|
2016-02-08 18:02:33 +00:00
|
|
|
|
continue
|
|
|
|
|
}
|
2016-02-23 20:07:21 +00:00
|
|
|
|
a = append(a, sh)
|
2015-11-04 21:06:06 +00:00
|
|
|
|
}
|
|
|
|
|
return a
|
|
|
|
|
}
|
|
|
|
|
|
2016-11-23 20:32:42 +00:00
|
|
|
|
// ShardGroup returns a ShardGroup with a list of shards by id.
|
|
|
|
|
func (s *Store) ShardGroup(ids []uint64) ShardGroup {
|
|
|
|
|
return Shards(s.Shards(ids))
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-19 19:32:44 +00:00
|
|
|
|
// ShardN returns the number of shards in the store.
|
2015-07-20 19:59:46 +00:00
|
|
|
|
func (s *Store) ShardN() int {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
2016-02-23 20:07:21 +00:00
|
|
|
|
return len(s.shards)
|
2015-07-20 19:59:46 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-11-14 01:35:53 +00:00
|
|
|
|
// ShardDigest returns a digest of the shard with the specified ID.
|
2018-01-05 18:39:33 +00:00
|
|
|
|
func (s *Store) ShardDigest(id uint64) (io.ReadCloser, int64, error) {
|
2017-11-14 01:35:53 +00:00
|
|
|
|
sh := s.Shard(id)
|
|
|
|
|
if sh == nil {
|
2018-01-05 18:39:33 +00:00
|
|
|
|
return nil, 0, ErrShardNotFound
|
2017-11-14 01:35:53 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return sh.Digest()
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-10 20:04:18 +00:00
|
|
|
|
// CreateShard creates a shard with the given id and retention policy on a database.
|
2016-06-01 22:17:18 +00:00
|
|
|
|
func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error {
|
2015-05-26 22:35:16 +00:00
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
2015-09-04 22:32:08 +00:00
|
|
|
|
select {
|
|
|
|
|
case <-s.closing:
|
2015-10-06 20:00:31 +00:00
|
|
|
|
return ErrStoreClosed
|
2015-09-04 22:32:08 +00:00
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Shard already exists.
|
2016-02-23 20:07:21 +00:00
|
|
|
|
if _, ok := s.shards[shardID]; ok {
|
2015-05-26 22:35:16 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2018-02-20 19:00:08 +00:00
|
|
|
|
// Shard may be undergoing a pending deletion. While the shard can be
|
|
|
|
|
// recreated, it must wait for the pending delete to finish.
|
|
|
|
|
if _, ok := s.pendingShardDeletes[shardID]; ok {
|
|
|
|
|
return fmt.Errorf("shard %d is pending deletion and cannot be created again until finished", shardID)
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Create the db and retention policy directories if they don't exist.
|
2015-05-26 22:35:16 +00:00
|
|
|
|
if err := os.MkdirAll(filepath.Join(s.path, database, retentionPolicy), 0700); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Create the WAL directory.
|
2015-08-21 15:22:04 +00:00
|
|
|
|
walPath := filepath.Join(s.EngineOptions.Config.WALDir, database, retentionPolicy, fmt.Sprintf("%d", shardID))
|
|
|
|
|
if err := os.MkdirAll(walPath, 0700); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-15 23:09:25 +00:00
|
|
|
|
// Retrieve database series file.
|
|
|
|
|
sfile, err := s.openSeriesFile(database)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-19 16:57:05 +00:00
|
|
|
|
// Retrieve shared index, if needed.
|
|
|
|
|
idx, err := s.createIndexIfNotExists(database)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Copy index options and pass in shared index.
|
|
|
|
|
opt := s.EngineOptions
|
|
|
|
|
opt.InmemIndex = idx
|
2018-01-10 23:37:18 +00:00
|
|
|
|
opt.SeriesIDSets = shardSet{store: s, db: database}
|
2016-12-19 16:57:05 +00:00
|
|
|
|
|
2016-02-26 19:41:54 +00:00
|
|
|
|
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
|
2017-11-15 23:09:25 +00:00
|
|
|
|
shard := NewShard(shardID, path, walPath, sfile, opt)
|
2016-12-01 18:26:23 +00:00
|
|
|
|
shard.WithLogger(s.baseLogger)
|
2016-06-01 22:17:18 +00:00
|
|
|
|
shard.EnableOnOpen = enabled
|
|
|
|
|
|
2015-05-29 21:15:05 +00:00
|
|
|
|
if err := shard.Open(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2015-05-26 22:35:16 +00:00
|
|
|
|
|
2016-02-23 20:07:21 +00:00
|
|
|
|
s.shards[shardID] = shard
|
2016-09-01 12:40:16 +00:00
|
|
|
|
s.databases[database] = struct{}{} // Ensure we are tracking any new db.
|
2015-05-26 22:35:16 +00:00
|
|
|
|
|
2015-05-26 19:56:54 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// CreateShardSnapShot will create a hard link to the underlying shard and return a path.
|
|
|
|
|
// The caller is responsible for cleaning up (removing) the file path returned.
|
2016-05-09 15:53:34 +00:00
|
|
|
|
func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
|
|
|
|
|
sh := s.Shard(id)
|
|
|
|
|
if sh == nil {
|
|
|
|
|
return "", ErrShardNotFound
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return sh.CreateSnapshot()
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// SetShardEnabled enables or disables a shard for read and writes.
|
2016-05-27 22:47:33 +00:00
|
|
|
|
func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error {
|
|
|
|
|
sh := s.Shard(shardID)
|
|
|
|
|
if sh == nil {
|
|
|
|
|
return ErrShardNotFound
|
|
|
|
|
}
|
|
|
|
|
sh.SetEnabled(enabled)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2015-06-04 01:02:49 +00:00
|
|
|
|
// DeleteShard removes a shard from disk.
|
|
|
|
|
func (s *Store) DeleteShard(shardID uint64) error {
|
2016-07-14 22:41:07 +00:00
|
|
|
|
sh := s.Shard(shardID)
|
|
|
|
|
if sh == nil {
|
2015-06-04 01:02:49 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-15 14:30:02 +00:00
|
|
|
|
// Remove the shard from Store so it's not returned to callers requesting
|
2018-02-20 19:00:08 +00:00
|
|
|
|
// shards. Also mark that this shard is currently being deleted in a separate
|
|
|
|
|
// map so that we do not have to retain the global store lock while deleting
|
|
|
|
|
// files.
|
2018-01-15 14:30:02 +00:00
|
|
|
|
s.mu.Lock()
|
2018-02-20 19:00:08 +00:00
|
|
|
|
if _, ok := s.pendingShardDeletes[shardID]; ok {
|
|
|
|
|
// We are already being deleted? This is possible if delete shard
|
|
|
|
|
// was called twice in sequence before the shard could be removed from
|
|
|
|
|
// the mapping.
|
|
|
|
|
// This is not an error because deleting a shard twice is not an error.
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2018-01-15 14:30:02 +00:00
|
|
|
|
delete(s.shards, shardID)
|
2018-02-20 19:00:08 +00:00
|
|
|
|
s.pendingShardDeletes[shardID] = struct{}{}
|
2018-01-15 14:30:02 +00:00
|
|
|
|
s.mu.Unlock()
|
2016-12-19 16:57:05 +00:00
|
|
|
|
|
2018-02-20 19:00:08 +00:00
|
|
|
|
// Ensure the pending deletion flag is cleared on exit.
|
|
|
|
|
defer func() {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
delete(s.pendingShardDeletes, shardID)
|
|
|
|
|
}()
|
|
|
|
|
|
2018-01-15 14:30:02 +00:00
|
|
|
|
// Get the shard's local bitset of series IDs.
|
|
|
|
|
index, err := sh.Index()
|
|
|
|
|
if err != nil {
|
2015-06-04 01:02:49 +00:00
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-15 14:30:02 +00:00
|
|
|
|
var ss *SeriesIDSet
|
|
|
|
|
if i, ok := index.(interface {
|
|
|
|
|
SeriesIDSet() *SeriesIDSet
|
|
|
|
|
}); ok {
|
|
|
|
|
ss = i.SeriesIDSet()
|
2015-06-04 01:02:49 +00:00
|
|
|
|
}
|
|
|
|
|
|
2018-01-15 14:30:02 +00:00
|
|
|
|
db := sh.Database()
|
|
|
|
|
if err := sh.Close(); err != nil {
|
2015-08-21 15:22:04 +00:00
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-15 14:30:02 +00:00
|
|
|
|
// Determine if the shard contained any series that are not present in any
|
|
|
|
|
// other shards in the database.
|
|
|
|
|
shards := s.filterShards(byDatabase(db))
|
2016-07-14 22:41:07 +00:00
|
|
|
|
|
2018-01-15 14:30:02 +00:00
|
|
|
|
s.walkShards(shards, func(sh *Shard) error {
|
|
|
|
|
index, err := sh.Index()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if i, ok := index.(interface {
|
|
|
|
|
SeriesIDSet() *SeriesIDSet
|
|
|
|
|
}); ok {
|
|
|
|
|
ss.Diff(i.SeriesIDSet())
|
|
|
|
|
} else {
|
|
|
|
|
return fmt.Errorf("unable to get series id set for index in shard at %s", sh.Path())
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Remove any remaining series in the set from the series file, as they don't
|
|
|
|
|
// exist in any of the database's remaining shards.
|
|
|
|
|
if ss.Cardinality() > 0 {
|
|
|
|
|
sfile := s.seriesFile(db)
|
|
|
|
|
if sfile != nil {
|
|
|
|
|
ss.ForEach(func(id uint64) {
|
|
|
|
|
sfile.DeleteSeriesID(id)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Remove the on-disk shard data.
|
|
|
|
|
if err := os.RemoveAll(sh.path); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return os.RemoveAll(sh.walPath)
|
2015-06-04 01:02:49 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
|
// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
|
2016-02-15 13:00:58 +00:00
|
|
|
|
func (s *Store) DeleteDatabase(name string) error {
|
2016-05-13 16:14:49 +00:00
|
|
|
|
s.mu.RLock()
|
2017-01-24 16:27:47 +00:00
|
|
|
|
if _, ok := s.databases[name]; !ok {
|
2017-01-12 15:02:25 +00:00
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
// no files locally, so nothing to do
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2016-07-14 22:37:46 +00:00
|
|
|
|
shards := s.filterShards(func(sh *Shard) bool {
|
|
|
|
|
return sh.database == name
|
|
|
|
|
})
|
2016-05-13 16:14:49 +00:00
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
|
2016-07-14 22:37:46 +00:00
|
|
|
|
if err := s.walkShards(shards, func(sh *Shard) error {
|
|
|
|
|
if sh.database != name {
|
|
|
|
|
return nil
|
2015-06-05 16:31:04 +00:00
|
|
|
|
}
|
2016-07-14 22:37:46 +00:00
|
|
|
|
|
2018-01-15 15:06:14 +00:00
|
|
|
|
return sh.Close()
|
2016-07-14 22:37:46 +00:00
|
|
|
|
}); err != nil {
|
|
|
|
|
return err
|
2015-06-05 16:31:04 +00:00
|
|
|
|
}
|
2015-10-26 19:04:57 +00:00
|
|
|
|
|
2017-01-12 15:02:25 +00:00
|
|
|
|
dbPath := filepath.Clean(filepath.Join(s.path, name))
|
|
|
|
|
|
2018-01-05 18:06:16 +00:00
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
2018-01-10 14:25:15 +00:00
|
|
|
|
sfile := s.sfiles[name]
|
|
|
|
|
delete(s.sfiles, name)
|
|
|
|
|
|
2018-01-05 18:06:16 +00:00
|
|
|
|
// Close series file.
|
|
|
|
|
if sfile != nil {
|
|
|
|
|
if err := sfile.Close(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-01-12 15:02:25 +00:00
|
|
|
|
// extra sanity check to make sure that even if someone named their database "../.."
|
|
|
|
|
// that we don't delete everything because of it, they'll just have extra files forever
|
|
|
|
|
if filepath.Clean(s.path) != filepath.Dir(dbPath) {
|
|
|
|
|
return fmt.Errorf("invalid database directory location for database '%s': %s", name, dbPath)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := os.RemoveAll(dbPath); err != nil {
|
2015-06-22 18:44:46 +00:00
|
|
|
|
return err
|
|
|
|
|
}
|
2015-08-21 15:22:04 +00:00
|
|
|
|
if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, name)); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
2016-07-14 22:37:46 +00:00
|
|
|
|
for _, sh := range shards {
|
|
|
|
|
delete(s.shards, sh.id)
|
|
|
|
|
}
|
2016-09-01 12:40:16 +00:00
|
|
|
|
|
|
|
|
|
// Remove database from store list of databases
|
|
|
|
|
delete(s.databases, name)
|
2017-03-30 12:05:31 +00:00
|
|
|
|
|
|
|
|
|
// Remove shared index for database if using inmem index.
|
|
|
|
|
delete(s.indexes, name)
|
2016-07-14 22:37:46 +00:00
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-15 13:00:58 +00:00
|
|
|
|
// DeleteRetentionPolicy will close all shards associated with the
|
|
|
|
|
// provided retention policy, remove the retention policy directories on
|
|
|
|
|
// both the DB and WAL, and remove all shard files from disk.
|
|
|
|
|
func (s *Store) DeleteRetentionPolicy(database, name string) error {
|
2016-07-14 22:24:01 +00:00
|
|
|
|
s.mu.RLock()
|
2017-01-24 16:27:47 +00:00
|
|
|
|
if _, ok := s.databases[database]; !ok {
|
2017-01-12 20:19:43 +00:00
|
|
|
|
s.mu.RUnlock()
|
2017-01-12 15:02:25 +00:00
|
|
|
|
// unknown database, nothing to do
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2016-07-14 22:24:01 +00:00
|
|
|
|
shards := s.filterShards(func(sh *Shard) bool {
|
|
|
|
|
return sh.database == database && sh.retentionPolicy == name
|
|
|
|
|
})
|
|
|
|
|
s.mu.RUnlock()
|
2016-02-15 13:00:58 +00:00
|
|
|
|
|
|
|
|
|
// Close and delete all shards under the retention policy on the
|
|
|
|
|
// database.
|
2016-07-14 22:24:01 +00:00
|
|
|
|
if err := s.walkShards(shards, func(sh *Shard) error {
|
|
|
|
|
if sh.database != database || sh.retentionPolicy != name {
|
|
|
|
|
return nil
|
2016-02-15 13:00:58 +00:00
|
|
|
|
}
|
2016-07-14 22:24:01 +00:00
|
|
|
|
|
|
|
|
|
return sh.Close()
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return err
|
2016-02-15 13:00:58 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-01-24 16:27:47 +00:00
|
|
|
|
// Remove the retention policy folder.
|
2017-01-12 15:02:25 +00:00
|
|
|
|
rpPath := filepath.Clean(filepath.Join(s.path, database, name))
|
|
|
|
|
|
|
|
|
|
// ensure Store's path is the grandparent of the retention policy
|
|
|
|
|
if filepath.Clean(s.path) != filepath.Dir(filepath.Dir(rpPath)) {
|
|
|
|
|
return fmt.Errorf("invalid path for database '%s', retention policy '%s': %s", database, name, rpPath)
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Remove the retention policy folder.
|
2016-02-15 13:00:58 +00:00
|
|
|
|
if err := os.RemoveAll(filepath.Join(s.path, database, name)); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Remove the retention policy folder from the the WAL.
|
2016-07-14 22:24:01 +00:00
|
|
|
|
if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, database, name)); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
for _, sh := range shards {
|
|
|
|
|
delete(s.shards, sh.id)
|
|
|
|
|
}
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return nil
|
2016-02-15 13:00:58 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
|
// DeleteMeasurement removes a measurement and all associated series from a database.
|
|
|
|
|
func (s *Store) DeleteMeasurement(database, name string) error {
|
2016-07-15 16:08:26 +00:00
|
|
|
|
s.mu.RLock()
|
2016-09-01 12:40:16 +00:00
|
|
|
|
shards := s.filterShards(byDatabase(database))
|
2016-07-15 16:08:26 +00:00
|
|
|
|
s.mu.RUnlock()
|
2016-07-14 22:06:13 +00:00
|
|
|
|
|
2017-07-25 22:20:52 +00:00
|
|
|
|
// Limit to 1 delete for each shard since expanding the measurement into the list
|
|
|
|
|
// of series keys can be very memory intensive if run concurrently.
|
|
|
|
|
limit := limiter.NewFixed(1)
|
2016-09-01 12:40:16 +00:00
|
|
|
|
return s.walkShards(shards, func(sh *Shard) error {
|
2017-07-25 22:20:52 +00:00
|
|
|
|
limit.Take()
|
|
|
|
|
defer limit.Release()
|
|
|
|
|
|
2018-01-21 17:41:27 +00:00
|
|
|
|
return sh.DeleteMeasurement([]byte(name))
|
2016-09-01 12:40:16 +00:00
|
|
|
|
})
|
2016-07-14 22:06:13 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// filterShards returns a slice of shards where fn returns true
|
2016-09-14 13:55:44 +00:00
|
|
|
|
// for the shard. If the provided predicate is nil then all shards are returned.
|
2016-07-14 22:06:13 +00:00
|
|
|
|
func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard {
|
2016-09-14 13:55:44 +00:00
|
|
|
|
var shards []*Shard
|
|
|
|
|
if fn == nil {
|
|
|
|
|
shards = make([]*Shard, 0, len(s.shards))
|
|
|
|
|
fn = func(*Shard) bool { return true }
|
|
|
|
|
} else {
|
|
|
|
|
shards = make([]*Shard, 0)
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-23 20:07:21 +00:00
|
|
|
|
for _, sh := range s.shards {
|
2016-07-14 22:06:13 +00:00
|
|
|
|
if fn(sh) {
|
|
|
|
|
shards = append(shards, sh)
|
2016-02-05 17:23:35 +00:00
|
|
|
|
}
|
2016-07-14 22:06:13 +00:00
|
|
|
|
}
|
|
|
|
|
return shards
|
|
|
|
|
}
|
2016-02-15 13:00:58 +00:00
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// byDatabase provides a predicate for filterShards that matches on the name of
|
|
|
|
|
// the database passed in.
|
2016-10-04 18:45:09 +00:00
|
|
|
|
func byDatabase(name string) func(sh *Shard) bool {
|
2016-09-01 12:40:16 +00:00
|
|
|
|
return func(sh *Shard) bool {
|
|
|
|
|
return sh.database == name
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-02-15 11:16:32 +00:00
|
|
|
|
// walkShards apply a function to each shard in parallel. fn must be safe for
|
|
|
|
|
// concurrent use. If any of the functions return an error, the first error is
|
|
|
|
|
// returned.
|
2016-07-14 22:06:13 +00:00
|
|
|
|
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
|
|
|
|
|
// struct to hold the result of opening each reader in a goroutine
|
|
|
|
|
type res struct {
|
|
|
|
|
err error
|
2015-11-04 21:06:06 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-07-15 18:14:25 +00:00
|
|
|
|
resC := make(chan res)
|
2016-07-14 22:06:13 +00:00
|
|
|
|
var n int
|
|
|
|
|
|
|
|
|
|
for _, sh := range shards {
|
|
|
|
|
n++
|
|
|
|
|
|
|
|
|
|
go func(sh *Shard) {
|
|
|
|
|
if err := fn(sh); err != nil {
|
2016-07-15 18:14:25 +00:00
|
|
|
|
resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)}
|
2016-07-14 22:06:13 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-15 18:14:25 +00:00
|
|
|
|
resC <- res{}
|
2016-07-14 22:06:13 +00:00
|
|
|
|
}(sh)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
for i := 0; i < n; i++ {
|
|
|
|
|
res := <-resC
|
|
|
|
|
if res.err != nil {
|
|
|
|
|
err = res.err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
close(resC)
|
|
|
|
|
return err
|
2015-06-05 16:31:04 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-10-04 18:51:50 +00:00
|
|
|
|
// ShardIDs returns a slice of all ShardIDs under management.
|
|
|
|
|
func (s *Store) ShardIDs() []uint64 {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
|
return s.shardIDs()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) shardIDs() []uint64 {
|
|
|
|
|
a := make([]uint64, 0, len(s.shards))
|
|
|
|
|
for shardID := range s.shards {
|
|
|
|
|
a = append(a, shardID)
|
|
|
|
|
}
|
|
|
|
|
return a
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
|
// shardsSlice returns an ordered list of shards.
|
|
|
|
|
func (s *Store) shardsSlice() []*Shard {
|
2016-02-23 20:07:21 +00:00
|
|
|
|
a := make([]*Shard, 0, len(s.shards))
|
|
|
|
|
for _, sh := range s.shards {
|
|
|
|
|
a = append(a, sh)
|
2015-05-28 22:02:12 +00:00
|
|
|
|
}
|
2015-11-04 21:06:06 +00:00
|
|
|
|
sort.Sort(Shards(a))
|
|
|
|
|
return a
|
2015-05-28 22:02:12 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// Databases returns the names of all databases managed by the store.
|
2015-08-25 21:44:42 +00:00
|
|
|
|
func (s *Store) Databases() []string {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
databases := make([]string, 0, len(s.databases))
|
|
|
|
|
for k, _ := range s.databases {
|
|
|
|
|
databases = append(databases, k)
|
2015-05-28 22:02:12 +00:00
|
|
|
|
}
|
2016-09-01 12:40:16 +00:00
|
|
|
|
return databases
|
2015-05-28 22:02:12 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// DiskSize returns the size of all the shard files in bytes.
|
|
|
|
|
// This size does not include the WAL size.
|
2015-08-25 21:44:42 +00:00
|
|
|
|
func (s *Store) DiskSize() (int64, error) {
|
|
|
|
|
var size int64
|
2016-09-14 13:55:44 +00:00
|
|
|
|
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
allShards := s.filterShards(nil)
|
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
for _, sh := range allShards {
|
|
|
|
|
sz, err := sh.DiskSize()
|
2015-08-25 21:44:42 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
size += sz
|
|
|
|
|
}
|
|
|
|
|
return size, nil
|
|
|
|
|
}
|
|
|
|
|
|
2018-02-01 16:20:52 +00:00
|
|
|
|
// sketchesForDatabase returns merged sketches for the provided database, by
|
|
|
|
|
// walking each shard in the database and merging the sketches found there.
|
|
|
|
|
func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (estimator.Sketch, estimator.Sketch, error) {
|
2016-09-23 13:33:47 +00:00
|
|
|
|
var (
|
|
|
|
|
ss estimator.Sketch // Sketch estimating number of items.
|
|
|
|
|
ts estimator.Sketch // Sketch estimating number of tombstoned items.
|
|
|
|
|
)
|
|
|
|
|
|
2016-09-21 15:04:37 +00:00
|
|
|
|
s.mu.RLock()
|
2016-09-23 13:33:47 +00:00
|
|
|
|
shards := s.filterShards(byDatabase(dbName))
|
2016-09-21 15:04:37 +00:00
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
|
2018-02-09 15:29:42 +00:00
|
|
|
|
// Never return nil sketches. In the case that db exists but no data written
|
|
|
|
|
// return empty sketches.
|
|
|
|
|
if len(shards) == 0 {
|
|
|
|
|
ss, ts = hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-23 13:33:47 +00:00
|
|
|
|
// Iterate over all shards for the database and combine all of the sketches.
|
2016-09-21 15:04:37 +00:00
|
|
|
|
for _, shard := range shards {
|
2016-09-23 13:33:47 +00:00
|
|
|
|
s, t, err := getSketches(shard)
|
2016-09-21 15:04:37 +00:00
|
|
|
|
if err != nil {
|
2018-02-01 16:20:52 +00:00
|
|
|
|
return nil, nil, err
|
2016-09-21 15:04:37 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-09-23 13:33:47 +00:00
|
|
|
|
if ss == nil {
|
|
|
|
|
ss, ts = s, t
|
|
|
|
|
} else if err = ss.Merge(s); err != nil {
|
2018-02-01 16:20:52 +00:00
|
|
|
|
return nil, nil, err
|
2016-09-23 13:33:47 +00:00
|
|
|
|
} else if err = ts.Merge(t); err != nil {
|
2018-02-01 16:20:52 +00:00
|
|
|
|
return nil, nil, err
|
2016-09-21 15:04:37 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2018-02-01 16:20:52 +00:00
|
|
|
|
return ss, ts, nil
|
2016-09-14 13:55:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
2018-02-01 16:20:52 +00:00
|
|
|
|
// SeriesCardinality returns the exact series cardinality for the provided
|
|
|
|
|
// database.
|
|
|
|
|
//
|
|
|
|
|
// Cardinality is calculated exactly by unioning all shards' bitsets of series
|
|
|
|
|
// IDs. The result of this method cannot be combined with any other results.
|
2018-01-16 23:02:57 +00:00
|
|
|
|
//
|
2016-09-23 13:33:47 +00:00
|
|
|
|
func (s *Store) SeriesCardinality(database string) (int64, error) {
|
2018-01-16 23:02:57 +00:00
|
|
|
|
s.mu.RLock()
|
|
|
|
|
shards := s.filterShards(byDatabase(database))
|
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
|
2018-02-15 11:16:32 +00:00
|
|
|
|
var setMu sync.Mutex
|
2018-01-16 23:02:57 +00:00
|
|
|
|
others := make([]*SeriesIDSet, 0, len(shards))
|
2018-02-15 11:16:32 +00:00
|
|
|
|
|
2018-01-16 23:02:57 +00:00
|
|
|
|
s.walkShards(shards, func(sh *Shard) error {
|
|
|
|
|
index, err := sh.Index()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if i, ok := index.(interface {
|
|
|
|
|
SeriesIDSet() *SeriesIDSet
|
|
|
|
|
}); ok {
|
2018-02-15 11:16:32 +00:00
|
|
|
|
seriesIDs := i.SeriesIDSet()
|
|
|
|
|
setMu.Lock()
|
|
|
|
|
others = append(others, seriesIDs)
|
|
|
|
|
setMu.Unlock()
|
2018-01-16 23:02:57 +00:00
|
|
|
|
} else {
|
|
|
|
|
return fmt.Errorf("unable to get series id set for index in shard at %s", sh.Path())
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
ss := NewSeriesIDSet()
|
|
|
|
|
ss.Merge(others...)
|
|
|
|
|
return int64(ss.Cardinality()), nil
|
2016-09-23 13:33:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
2018-02-01 16:20:52 +00:00
|
|
|
|
// SeriesSketches returns the sketches associated with the series data in all
|
|
|
|
|
// the shards in the provided database.
|
|
|
|
|
//
|
|
|
|
|
// The returned sketches can be combined with other sketches to provide an
|
|
|
|
|
// estimation across distributed databases.
|
|
|
|
|
func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
|
|
|
|
|
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
|
|
|
|
|
if sh == nil {
|
|
|
|
|
return nil, nil, errors.New("shard nil, can't get cardinality")
|
|
|
|
|
}
|
|
|
|
|
return sh.SeriesSketches()
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MeasurementsCardinality returns an estimation of the measurement cardinality
|
|
|
|
|
// for the provided database.
|
2018-01-16 23:02:57 +00:00
|
|
|
|
//
|
2018-02-01 16:20:52 +00:00
|
|
|
|
// Cardinality is calculated using a sketch-based estimation. The result of this
|
|
|
|
|
// method cannot be combined with any other results.
|
2016-09-14 13:55:44 +00:00
|
|
|
|
func (s *Store) MeasurementsCardinality(database string) (int64, error) {
|
2018-02-01 16:20:52 +00:00
|
|
|
|
ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
|
|
|
|
|
if sh == nil {
|
|
|
|
|
return nil, nil, errors.New("shard nil, can't get cardinality")
|
|
|
|
|
}
|
|
|
|
|
return sh.MeasurementsSketches()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
return int64(ss.Count() - ts.Count()), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MeasurementsSketches returns the sketches associated with the measurement
|
|
|
|
|
// data in all the shards in the provided database.
|
|
|
|
|
//
|
|
|
|
|
// The returned sketches can be combined with other sketches to provide an
|
|
|
|
|
// estimation across distributed databases.
|
|
|
|
|
func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
|
|
|
|
|
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
|
2016-09-28 12:44:57 +00:00
|
|
|
|
if sh == nil {
|
|
|
|
|
return nil, nil, errors.New("shard nil, can't get cardinality")
|
|
|
|
|
}
|
2017-03-15 12:16:28 +00:00
|
|
|
|
return sh.MeasurementsSketches()
|
2016-09-23 13:33:47 +00:00
|
|
|
|
})
|
2016-09-14 13:55:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// BackupShard will get the shard and have the engine backup since the passed in
|
|
|
|
|
// time to the writer.
|
2015-12-25 13:23:22 +00:00
|
|
|
|
func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
|
|
|
|
|
shard := s.Shard(id)
|
|
|
|
|
if shard == nil {
|
|
|
|
|
return fmt.Errorf("shard %d doesn't exist on this server", id)
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-26 19:41:54 +00:00
|
|
|
|
path, err := relativePath(s.path, shard.path)
|
2015-12-25 13:23:22 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2017-09-19 12:33:45 +00:00
|
|
|
|
return shard.Backup(w, path, since)
|
2015-12-25 13:23:22 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-12-07 16:35:20 +00:00
|
|
|
|
func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error {
|
|
|
|
|
shard := s.Shard(id)
|
|
|
|
|
if shard == nil {
|
|
|
|
|
return fmt.Errorf("shard %d doesn't exist on this server", id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
path, err := relativePath(s.path, shard.path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return shard.Export(w, path, start, end)
|
|
|
|
|
}
|
|
|
|
|
|
2016-04-29 00:29:09 +00:00
|
|
|
|
// RestoreShard restores a backup from r to a given shard.
|
|
|
|
|
// This will only overwrite files included in the backup.
|
|
|
|
|
func (s *Store) RestoreShard(id uint64, r io.Reader) error {
|
|
|
|
|
shard := s.Shard(id)
|
|
|
|
|
if shard == nil {
|
|
|
|
|
return fmt.Errorf("shard %d doesn't exist on this server", id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
path, err := relativePath(s.path, shard.path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return shard.Restore(r, path)
|
|
|
|
|
}
|
|
|
|
|
|
2017-04-26 23:16:59 +00:00
|
|
|
|
// ImportShard imports the contents of r to a given shard.
|
|
|
|
|
// All files in the backup are added as new files which may
|
|
|
|
|
// cause duplicated data to occur requiring more expensive
|
|
|
|
|
// compactions.
|
|
|
|
|
func (s *Store) ImportShard(id uint64, r io.Reader) error {
|
|
|
|
|
shard := s.Shard(id)
|
|
|
|
|
if shard == nil {
|
|
|
|
|
return fmt.Errorf("shard %d doesn't exist on this server", id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
path, err := relativePath(s.path, shard.path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return shard.Import(r, path)
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// ShardRelativePath will return the relative path to the shard, i.e.,
|
|
|
|
|
// <database>/<retention>/<id>.
|
2015-12-25 13:23:22 +00:00
|
|
|
|
func (s *Store) ShardRelativePath(id uint64) (string, error) {
|
|
|
|
|
shard := s.Shard(id)
|
|
|
|
|
if shard == nil {
|
|
|
|
|
return "", fmt.Errorf("shard %d doesn't exist on this server", id)
|
|
|
|
|
}
|
2016-02-26 19:41:54 +00:00
|
|
|
|
return relativePath(s.path, shard.path)
|
2015-12-25 13:23:22 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// DeleteSeries loops through the local shards and deletes the series data for
|
|
|
|
|
// the passed in series keys.
|
2018-01-03 12:11:17 +00:00
|
|
|
|
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
|
2016-11-16 18:57:55 +00:00
|
|
|
|
// Expand regex expressions in the FROM clause.
|
|
|
|
|
a, err := s.ExpandSources(sources)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
2018-01-21 17:41:27 +00:00
|
|
|
|
} else if len(sources) > 0 && len(a) == 0 {
|
2016-11-16 18:57:55 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
sources = a
|
2016-04-26 21:43:10 +00:00
|
|
|
|
|
2016-11-16 18:57:55 +00:00
|
|
|
|
// Determine deletion time range.
|
2017-08-16 15:55:41 +00:00
|
|
|
|
condition, timeRange, err := influxql.ConditionExpr(condition, nil)
|
2016-11-16 18:57:55 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2015-12-12 04:36:34 +00:00
|
|
|
|
|
2017-08-16 15:33:00 +00:00
|
|
|
|
var min, max int64
|
|
|
|
|
if !timeRange.Min.IsZero() {
|
|
|
|
|
min = timeRange.Min.UnixNano()
|
|
|
|
|
} else {
|
|
|
|
|
min = influxql.MinTime
|
|
|
|
|
}
|
|
|
|
|
if !timeRange.Max.IsZero() {
|
|
|
|
|
max = timeRange.Max.UnixNano()
|
|
|
|
|
} else {
|
|
|
|
|
max = influxql.MaxTime
|
|
|
|
|
}
|
|
|
|
|
|
2016-11-16 18:57:55 +00:00
|
|
|
|
s.mu.RLock()
|
2017-11-27 15:41:16 +00:00
|
|
|
|
sfile := s.sfiles[database]
|
2017-12-15 14:24:26 +00:00
|
|
|
|
if sfile == nil {
|
2018-01-17 17:28:21 +00:00
|
|
|
|
s.mu.RUnlock()
|
2018-01-14 19:53:26 +00:00
|
|
|
|
// No series file means nothing has been written to this DB and thus nothing to delete.
|
|
|
|
|
return nil
|
2017-12-15 14:24:26 +00:00
|
|
|
|
}
|
2017-11-27 15:41:16 +00:00
|
|
|
|
shards := s.filterShards(byDatabase(database))
|
2018-01-17 17:28:21 +00:00
|
|
|
|
s.mu.RUnlock()
|
2017-11-27 15:41:16 +00:00
|
|
|
|
|
2017-07-25 22:20:52 +00:00
|
|
|
|
// Limit to 1 delete for each shard since expanding the measurement into the list
|
|
|
|
|
// of series keys can be very memory intensive if run concurrently.
|
|
|
|
|
limit := limiter.NewFixed(1)
|
|
|
|
|
|
2016-11-30 19:45:14 +00:00
|
|
|
|
return s.walkShards(shards, func(sh *Shard) error {
|
|
|
|
|
// Determine list of measurements from sources.
|
|
|
|
|
// Use all measurements if no FROM clause was provided.
|
|
|
|
|
var names []string
|
|
|
|
|
if len(sources) > 0 {
|
|
|
|
|
for _, source := range sources {
|
|
|
|
|
names = append(names, source.(*influxql.Measurement).Name)
|
2016-02-12 22:10:02 +00:00
|
|
|
|
}
|
2016-11-16 18:57:55 +00:00
|
|
|
|
} else {
|
2017-09-19 12:33:45 +00:00
|
|
|
|
if err := sh.ForEachMeasurementName(func(name []byte) error {
|
2016-11-30 19:45:14 +00:00
|
|
|
|
names = append(names, string(name))
|
|
|
|
|
return nil
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2016-11-16 18:57:55 +00:00
|
|
|
|
}
|
2016-11-30 19:45:14 +00:00
|
|
|
|
sort.Strings(names)
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
2017-07-25 22:20:52 +00:00
|
|
|
|
limit.Take()
|
|
|
|
|
defer limit.Release()
|
|
|
|
|
|
2017-12-15 17:54:58 +00:00
|
|
|
|
index, err := sh.Index()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sfile}
|
2016-11-30 19:45:14 +00:00
|
|
|
|
// Find matching series keys for each measurement.
|
|
|
|
|
for _, name := range names {
|
2017-11-29 18:20:18 +00:00
|
|
|
|
itr, err := indexSet.MeasurementSeriesByExprIterator([]byte(name), condition)
|
2016-11-30 19:45:14 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
2017-10-31 16:03:51 +00:00
|
|
|
|
} else if itr == nil {
|
|
|
|
|
continue
|
2016-11-30 19:45:14 +00:00
|
|
|
|
}
|
2017-11-29 18:20:18 +00:00
|
|
|
|
defer itr.Close()
|
2018-01-03 12:11:17 +00:00
|
|
|
|
if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max); err != nil {
|
2017-10-31 18:18:34 +00:00
|
|
|
|
return err
|
2017-10-31 16:03:51 +00:00
|
|
|
|
}
|
2016-04-29 22:31:57 +00:00
|
|
|
|
|
2016-11-16 18:57:55 +00:00
|
|
|
|
}
|
2017-10-31 16:03:51 +00:00
|
|
|
|
|
2016-11-16 18:57:55 +00:00
|
|
|
|
return nil
|
|
|
|
|
})
|
2015-06-03 15:32:50 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-03-04 18:01:41 +00:00
|
|
|
|
// ExpandSources expands sources against all local shards.
|
2015-11-04 21:06:06 +00:00
|
|
|
|
func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
|
2016-11-23 20:32:42 +00:00
|
|
|
|
shards := func() Shards {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
|
return Shards(s.shardsSlice())
|
|
|
|
|
}()
|
|
|
|
|
return shards.ExpandSources(sources)
|
2016-03-31 22:12:29 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-02-10 20:04:18 +00:00
|
|
|
|
// WriteToShard writes a list of points to a shard identified by its ID.
|
2015-09-16 20:33:08 +00:00
|
|
|
|
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
|
2015-06-03 17:46:18 +00:00
|
|
|
|
s.mu.RLock()
|
2015-10-06 20:00:31 +00:00
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-s.closing:
|
2016-04-01 19:30:09 +00:00
|
|
|
|
s.mu.RUnlock()
|
2015-10-06 20:00:31 +00:00
|
|
|
|
return ErrStoreClosed
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-21 17:12:09 +00:00
|
|
|
|
sh := s.shards[shardID]
|
|
|
|
|
if sh == nil {
|
2016-04-01 19:30:09 +00:00
|
|
|
|
s.mu.RUnlock()
|
2015-05-26 19:56:54 +00:00
|
|
|
|
return ErrShardNotFound
|
|
|
|
|
}
|
2016-04-01 19:30:09 +00:00
|
|
|
|
s.mu.RUnlock()
|
2015-05-26 19:56:54 +00:00
|
|
|
|
|
2017-08-31 14:10:32 +00:00
|
|
|
|
// Ensure snapshot compactions are enabled since the shard might have been cold
|
|
|
|
|
// and disabled by the monitor.
|
|
|
|
|
if sh.IsIdle() {
|
|
|
|
|
sh.SetCompactionsEnabled(true)
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-23 20:07:21 +00:00
|
|
|
|
return sh.WritePoints(points)
|
2015-05-26 19:56:54 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-12-05 17:51:06 +00:00
|
|
|
|
// MeasurementNames returns a slice of all measurements. Measurements accepts an
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// optional condition expression. If cond is nil, then all measurements for the
|
|
|
|
|
// database will be returned.
|
2017-11-15 15:48:23 +00:00
|
|
|
|
func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
|
2016-09-01 12:40:16 +00:00
|
|
|
|
s.mu.RLock()
|
|
|
|
|
shards := s.filterShards(byDatabase(database))
|
|
|
|
|
s.mu.RUnlock()
|
2016-07-28 22:38:08 +00:00
|
|
|
|
|
2018-01-08 18:34:04 +00:00
|
|
|
|
sfile := s.seriesFile(database)
|
|
|
|
|
if sfile == nil {
|
|
|
|
|
return nil, nil
|
2017-12-15 14:24:26 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// Build indexset.
|
2017-12-15 14:24:26 +00:00
|
|
|
|
is := IndexSet{Indexes: make([]Index, 0, len(shards)), SeriesFile: sfile}
|
2016-09-01 12:40:16 +00:00
|
|
|
|
for _, sh := range shards {
|
2017-12-15 17:54:58 +00:00
|
|
|
|
index, err := sh.Index()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
2017-04-06 18:17:29 +00:00
|
|
|
|
}
|
2017-12-15 17:54:58 +00:00
|
|
|
|
is.Indexes = append(is.Indexes, index)
|
2016-07-28 22:38:08 +00:00
|
|
|
|
}
|
2017-12-05 17:49:58 +00:00
|
|
|
|
is = is.DedupeInmemIndexes()
|
2017-12-08 17:11:07 +00:00
|
|
|
|
return is.MeasurementNamesByExpr(auth, cond)
|
2016-07-28 22:38:08 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-09-01 12:40:16 +00:00
|
|
|
|
// MeasurementSeriesCounts returns the number of measurements and series in all
|
|
|
|
|
// the shards' indices.
|
|
|
|
|
func (s *Store) MeasurementSeriesCounts(database string) (measuments int, series int) {
|
|
|
|
|
// TODO: implement me
|
|
|
|
|
return 0, 0
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-06 15:31:04 +00:00
|
|
|
|
type TagKeys struct {
|
|
|
|
|
Measurement string
|
|
|
|
|
Keys []string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type TagKeysSlice []TagKeys
|
|
|
|
|
|
|
|
|
|
func (a TagKeysSlice) Len() int { return len(a) }
|
|
|
|
|
func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
|
func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }
|
|
|
|
|
|
|
|
|
|
// TagKeys returns the tag keys in the given database, matching the condition.
|
|
|
|
|
func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
|
2017-12-12 21:22:42 +00:00
|
|
|
|
if len(shardIDs) == 0 {
|
|
|
|
|
return nil, nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-06 15:31:04 +00:00
|
|
|
|
measurementExpr := influxql.CloneExpr(cond)
|
|
|
|
|
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
|
|
|
|
|
switch e := e.(type) {
|
|
|
|
|
case *influxql.BinaryExpr:
|
|
|
|
|
switch e.Op {
|
|
|
|
|
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
|
|
|
|
tag, ok := e.LHS.(*influxql.VarRef)
|
|
|
|
|
if !ok || tag.Val != "_name" {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return e
|
|
|
|
|
}), nil)
|
|
|
|
|
|
|
|
|
|
filterExpr := influxql.CloneExpr(cond)
|
|
|
|
|
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
|
|
|
|
|
switch e := e.(type) {
|
|
|
|
|
case *influxql.BinaryExpr:
|
|
|
|
|
switch e.Op {
|
|
|
|
|
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
|
|
|
|
tag, ok := e.LHS.(*influxql.VarRef)
|
|
|
|
|
if !ok || strings.HasPrefix(tag.Val, "_") {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return e
|
|
|
|
|
}), nil)
|
|
|
|
|
|
|
|
|
|
// Get all the shards we're interested in.
|
2017-12-12 21:22:42 +00:00
|
|
|
|
is := IndexSet{Indexes: make([]Index, 0, len(shardIDs))}
|
2017-11-06 15:31:04 +00:00
|
|
|
|
s.mu.RLock()
|
|
|
|
|
for _, sid := range shardIDs {
|
|
|
|
|
shard, ok := s.shards[sid]
|
|
|
|
|
if !ok {
|
2017-11-08 13:33:52 +00:00
|
|
|
|
continue
|
2017-11-06 15:31:04 +00:00
|
|
|
|
}
|
2017-12-12 21:22:42 +00:00
|
|
|
|
|
|
|
|
|
if is.SeriesFile == nil {
|
|
|
|
|
is.SeriesFile = shard.sfile
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
is.Indexes = append(is.Indexes, shard.index)
|
2017-11-06 15:31:04 +00:00
|
|
|
|
}
|
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
// Determine list of measurements.
|
2017-12-05 17:49:58 +00:00
|
|
|
|
is = is.DedupeInmemIndexes()
|
2017-12-08 17:11:07 +00:00
|
|
|
|
names, err := is.MeasurementNamesByExpr(nil, measurementExpr)
|
2017-12-05 17:49:58 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
2017-11-06 15:31:04 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Iterate over each measurement.
|
|
|
|
|
var results []TagKeys
|
|
|
|
|
for _, name := range names {
|
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// Build keyset over all indexes for measurement.
|
2017-12-08 17:11:07 +00:00
|
|
|
|
tagKeySet, err := is.MeasurementTagKeysByExpr(name, nil)
|
2017-12-05 17:49:58 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
2017-12-08 17:11:07 +00:00
|
|
|
|
} else if len(tagKeySet) == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2017-12-12 21:22:42 +00:00
|
|
|
|
keys := make([]string, 0, len(tagKeySet))
|
2017-12-08 17:11:07 +00:00
|
|
|
|
// If no tag value filter is present then all the tag keys can be returned
|
|
|
|
|
// If they have authorized series associated with them.
|
|
|
|
|
if filterExpr == nil {
|
|
|
|
|
for tagKey := range tagKeySet {
|
2017-12-12 21:22:42 +00:00
|
|
|
|
ok, err := is.TagKeyHasAuthorizedSeries(auth, []byte(name), []byte(tagKey))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
} else if ok {
|
|
|
|
|
keys = append(keys, tagKey)
|
2017-11-20 22:17:12 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2017-12-12 21:22:42 +00:00
|
|
|
|
sort.Strings(keys)
|
|
|
|
|
|
|
|
|
|
// Add to resultset.
|
|
|
|
|
results = append(results, TagKeys{
|
|
|
|
|
Measurement: string(name),
|
|
|
|
|
Keys: keys,
|
|
|
|
|
})
|
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
continue
|
|
|
|
|
}
|
2017-11-06 15:31:04 +00:00
|
|
|
|
|
2017-12-12 21:22:42 +00:00
|
|
|
|
// Tag filter provided so filter keys first.
|
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// Sort the tag keys.
|
2017-12-12 21:22:42 +00:00
|
|
|
|
for k := range tagKeySet {
|
2017-12-05 17:49:58 +00:00
|
|
|
|
keys = append(keys, k)
|
|
|
|
|
}
|
|
|
|
|
sort.Strings(keys)
|
2017-11-06 15:31:04 +00:00
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// Filter against tag values, skip if no values exist.
|
2017-12-12 21:22:42 +00:00
|
|
|
|
values, err := is.MeasurementTagKeyValuesByExpr(auth, name, keys, filterExpr, true)
|
2017-12-05 17:49:58 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-05 12:51:21 +00:00
|
|
|
|
// Filter final tag keys using the matching values. If a key has one or
|
|
|
|
|
// more matching values then it will be included in the final set.
|
|
|
|
|
finalKeys := keys[:0] // Use same backing array as keys to save allocation.
|
|
|
|
|
for i, k := range keys {
|
|
|
|
|
if len(values[i]) > 0 {
|
|
|
|
|
// Tag key k has one or more matching tag values.
|
|
|
|
|
finalKeys = append(finalKeys, k)
|
2017-11-06 15:31:04 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add to resultset.
|
|
|
|
|
results = append(results, TagKeys{
|
2017-12-05 17:49:58 +00:00
|
|
|
|
Measurement: string(name),
|
2018-01-05 12:51:21 +00:00
|
|
|
|
Keys: finalKeys,
|
2017-11-06 15:31:04 +00:00
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return results, nil
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-28 22:38:08 +00:00
|
|
|
|
type TagValues struct {
|
|
|
|
|
Measurement string
|
|
|
|
|
Values []KeyValue
|
|
|
|
|
}
|
|
|
|
|
|
2017-06-01 21:35:56 +00:00
|
|
|
|
type TagValuesSlice []TagValues
|
|
|
|
|
|
|
|
|
|
func (a TagValuesSlice) Len() int { return len(a) }
|
|
|
|
|
func (a TagValuesSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
|
func (a TagValuesSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }
|
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
// tagValues is a temporary representation of a TagValues. Rather than allocating
|
|
|
|
|
// KeyValues as we build up a TagValues object, We hold off allocating KeyValues
|
|
|
|
|
// until we have merged multiple tagValues together.
|
|
|
|
|
type tagValues struct {
|
|
|
|
|
name []byte
|
|
|
|
|
keys []string
|
|
|
|
|
values [][]string
|
2017-07-27 11:48:31 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
// Is a slice of tagValues that can be sorted by measurement.
|
|
|
|
|
type tagValuesSlice []tagValues
|
2017-07-27 11:48:31 +00:00
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
func (a tagValuesSlice) Len() int { return len(a) }
|
|
|
|
|
func (a tagValuesSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
|
func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
|
2017-07-27 11:48:31 +00:00
|
|
|
|
|
2017-11-03 16:53:23 +00:00
|
|
|
|
// TagValues returns the tag keys and values for the provided shards, where the
|
|
|
|
|
// tag values satisfy the provided condition.
|
|
|
|
|
func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
|
2016-11-16 18:57:55 +00:00
|
|
|
|
if cond == nil {
|
|
|
|
|
return nil, errors.New("a condition is required")
|
|
|
|
|
}
|
2016-07-28 22:38:08 +00:00
|
|
|
|
|
2016-11-16 18:57:55 +00:00
|
|
|
|
measurementExpr := influxql.CloneExpr(cond)
|
|
|
|
|
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
|
|
|
|
|
switch e := e.(type) {
|
|
|
|
|
case *influxql.BinaryExpr:
|
|
|
|
|
switch e.Op {
|
|
|
|
|
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
|
|
|
|
tag, ok := e.LHS.(*influxql.VarRef)
|
|
|
|
|
if !ok || tag.Val != "_name" {
|
|
|
|
|
return nil
|
2016-11-08 21:07:01 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2016-11-16 18:57:55 +00:00
|
|
|
|
}
|
|
|
|
|
return e
|
|
|
|
|
}), nil)
|
2016-09-01 12:40:16 +00:00
|
|
|
|
|
2016-11-16 18:57:55 +00:00
|
|
|
|
filterExpr := influxql.CloneExpr(cond)
|
|
|
|
|
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
|
|
|
|
|
switch e := e.(type) {
|
|
|
|
|
case *influxql.BinaryExpr:
|
|
|
|
|
switch e.Op {
|
|
|
|
|
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
|
|
|
|
tag, ok := e.LHS.(*influxql.VarRef)
|
|
|
|
|
if !ok || strings.HasPrefix(tag.Val, "_") {
|
|
|
|
|
return nil
|
2016-07-28 22:38:08 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2016-11-16 18:57:55 +00:00
|
|
|
|
}
|
|
|
|
|
return e
|
|
|
|
|
}), nil)
|
2016-07-28 22:38:08 +00:00
|
|
|
|
|
2017-12-04 17:29:04 +00:00
|
|
|
|
// Build index set to work on.
|
2017-12-12 21:22:42 +00:00
|
|
|
|
is := IndexSet{Indexes: make([]Index, 0, len(shardIDs))}
|
2016-12-05 17:51:06 +00:00
|
|
|
|
s.mu.RLock()
|
2017-11-03 16:53:23 +00:00
|
|
|
|
for _, sid := range shardIDs {
|
|
|
|
|
shard, ok := s.shards[sid]
|
|
|
|
|
if !ok {
|
2017-11-08 13:33:52 +00:00
|
|
|
|
continue
|
2017-11-03 16:53:23 +00:00
|
|
|
|
}
|
2017-12-12 21:22:42 +00:00
|
|
|
|
|
|
|
|
|
if is.SeriesFile == nil {
|
|
|
|
|
is.SeriesFile = shard.sfile
|
|
|
|
|
}
|
|
|
|
|
is.Indexes = append(is.Indexes, shard.index)
|
2017-11-03 16:53:23 +00:00
|
|
|
|
}
|
2016-12-05 17:51:06 +00:00
|
|
|
|
s.mu.RUnlock()
|
2017-12-05 17:49:58 +00:00
|
|
|
|
is = is.DedupeInmemIndexes()
|
2017-07-27 11:48:31 +00:00
|
|
|
|
|
|
|
|
|
// Stores each list of TagValues for each measurement.
|
2017-08-01 17:26:35 +00:00
|
|
|
|
var allResults []tagValues
|
|
|
|
|
var maxMeasurements int // Hint as to lower bound on number of measurements.
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// names will be sorted by MeasurementNamesByExpr.
|
2017-12-08 17:11:07 +00:00
|
|
|
|
// Authorisation can be done later on, when series may have been filtered
|
|
|
|
|
// out by other conditions.
|
|
|
|
|
names, err := is.MeasurementNamesByExpr(nil, measurementExpr)
|
2017-12-05 17:49:58 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(names) > maxMeasurements {
|
|
|
|
|
maxMeasurements = len(names)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if allResults == nil {
|
2017-12-12 21:22:42 +00:00
|
|
|
|
allResults = make([]tagValues, 0, len(is.Indexes)*len(names)) // Assuming all series in all shards.
|
2017-12-05 17:49:58 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Iterate over each matching measurement in the shard. For each
|
|
|
|
|
// measurement we'll get the matching tag keys (e.g., when a WITH KEYS)
|
|
|
|
|
// statement is used, and we'll then use those to fetch all the relevant
|
|
|
|
|
// values from matching series. Series may be filtered using a WHERE
|
|
|
|
|
// filter.
|
|
|
|
|
for _, name := range names {
|
|
|
|
|
// Determine a list of keys from condition.
|
|
|
|
|
keySet, err := is.MeasurementTagKeysByExpr(name, cond)
|
2016-11-16 18:57:55 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2016-07-28 22:38:08 +00:00
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
if len(keySet) == 0 {
|
|
|
|
|
// No matching tag keys for this measurement
|
|
|
|
|
continue
|
2017-07-27 11:48:31 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
result := tagValues{
|
|
|
|
|
name: name,
|
|
|
|
|
keys: make([]string, 0, len(keySet)),
|
2017-07-27 11:48:31 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// Add the keys to the tagValues and sort them.
|
|
|
|
|
for k := range keySet {
|
|
|
|
|
result.keys = append(result.keys, k)
|
|
|
|
|
}
|
|
|
|
|
sort.Sort(sort.StringSlice(result.keys))
|
2017-07-27 11:48:31 +00:00
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// get all the tag values for each key in the keyset.
|
|
|
|
|
// Each slice in the results contains the sorted values associated
|
|
|
|
|
// associated with each tag key for the measurement from the key set.
|
2017-12-12 21:22:42 +00:00
|
|
|
|
if result.values, err = is.MeasurementTagKeyValuesByExpr(auth, name, result.keys, filterExpr, true); err != nil {
|
2017-12-05 17:49:58 +00:00
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2017-07-27 11:48:31 +00:00
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// remove any tag keys that didn't have any authorized values
|
|
|
|
|
j := 0
|
|
|
|
|
for i := range result.keys {
|
|
|
|
|
if len(result.values[i]) == 0 {
|
|
|
|
|
continue
|
2016-07-28 22:38:08 +00:00
|
|
|
|
}
|
2017-09-19 14:38:16 +00:00
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
result.keys[j] = result.keys[i]
|
|
|
|
|
result.values[j] = result.values[i]
|
|
|
|
|
j++
|
|
|
|
|
}
|
|
|
|
|
result.keys = result.keys[:j]
|
|
|
|
|
result.values = result.values[:j]
|
2017-09-19 14:38:16 +00:00
|
|
|
|
|
2017-12-05 17:49:58 +00:00
|
|
|
|
// only include result if there are keys with values
|
|
|
|
|
if len(result.keys) > 0 {
|
|
|
|
|
allResults = append(allResults, result)
|
2016-07-28 22:38:08 +00:00
|
|
|
|
}
|
2016-11-16 18:57:55 +00:00
|
|
|
|
}
|
2016-07-28 22:38:08 +00:00
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
result := make([]TagValues, 0, maxMeasurements)
|
|
|
|
|
|
|
|
|
|
// We need to sort all results by measurement name.
|
2017-12-12 21:22:42 +00:00
|
|
|
|
if len(is.Indexes) > 1 {
|
2017-08-01 17:26:35 +00:00
|
|
|
|
sort.Sort(tagValuesSlice(allResults))
|
2017-07-27 11:48:31 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
// The next stage is to merge the tagValue results for each shard's measurements.
|
|
|
|
|
var i, j int
|
|
|
|
|
// Used as a temporary buffer in mergeTagValues. There can be at most len(shards)
|
|
|
|
|
// instances of tagValues for a given measurement.
|
2017-12-12 21:22:42 +00:00
|
|
|
|
idxBuf := make([][2]int, 0, len(is.Indexes))
|
2017-08-01 17:26:35 +00:00
|
|
|
|
for i < len(allResults) {
|
|
|
|
|
// Gather all occurrences of the same measurement for merging.
|
|
|
|
|
for j+1 < len(allResults) && bytes.Equal(allResults[j+1].name, allResults[i].name) {
|
|
|
|
|
j++
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// An invariant is that there can't be more than n instances of tag
|
|
|
|
|
// key value pairs for a given measurement, where n is the number of
|
|
|
|
|
// shards.
|
2017-12-12 21:22:42 +00:00
|
|
|
|
if got, exp := j-i+1, len(is.Indexes); got > exp {
|
2017-08-01 17:26:35 +00:00
|
|
|
|
return nil, fmt.Errorf("unexpected results returned engine. Got %d measurement sets for %d shards", got, exp)
|
|
|
|
|
}
|
2017-07-27 11:48:31 +00:00
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
nextResult := mergeTagValues(idxBuf, allResults[i:j+1]...)
|
|
|
|
|
i = j + 1
|
|
|
|
|
if len(nextResult.Values) > 0 {
|
|
|
|
|
result = append(result, nextResult)
|
2017-06-01 21:35:56 +00:00
|
|
|
|
}
|
2017-08-01 17:26:35 +00:00
|
|
|
|
}
|
|
|
|
|
return result, nil
|
|
|
|
|
}
|
2017-06-01 21:35:56 +00:00
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
// mergeTagValues merges multiple sorted sets of temporary tagValues using a
|
|
|
|
|
// direct k-way merge whilst also removing duplicated entries. The result is a
|
|
|
|
|
// single TagValue type.
|
|
|
|
|
//
|
|
|
|
|
// TODO(edd): a Tournament based merge (see: Knuth's TAOCP 5.4.1) might be more
|
|
|
|
|
// appropriate at some point.
|
|
|
|
|
//
|
|
|
|
|
func mergeTagValues(valueIdxs [][2]int, tvs ...tagValues) TagValues {
|
|
|
|
|
var result TagValues
|
|
|
|
|
if len(tvs) == 0 {
|
|
|
|
|
return TagValues{}
|
|
|
|
|
} else if len(tvs) == 1 {
|
|
|
|
|
result.Measurement = string(tvs[0].name)
|
|
|
|
|
// TODO(edd): will be too small likely. Find a hint?
|
|
|
|
|
result.Values = make([]KeyValue, 0, len(tvs[0].values))
|
|
|
|
|
|
|
|
|
|
for ki, key := range tvs[0].keys {
|
|
|
|
|
for _, value := range tvs[0].values[ki] {
|
|
|
|
|
result.Values = append(result.Values, KeyValue{Key: key, Value: value})
|
|
|
|
|
}
|
2017-07-27 11:48:31 +00:00
|
|
|
|
}
|
2017-08-01 17:26:35 +00:00
|
|
|
|
return result
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result.Measurement = string(tvs[0].name)
|
2017-07-27 11:48:31 +00:00
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
var maxSize int
|
|
|
|
|
for _, tv := range tvs {
|
|
|
|
|
if len(tv.values) > maxSize {
|
|
|
|
|
maxSize = len(tv.values)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
result.Values = make([]KeyValue, 0, maxSize) // This will likely be too small but it's a start.
|
|
|
|
|
|
|
|
|
|
// Resize and reset to the number of TagValues we're merging.
|
|
|
|
|
valueIdxs = valueIdxs[:len(tvs)]
|
|
|
|
|
for i := 0; i < len(valueIdxs); i++ {
|
|
|
|
|
valueIdxs[i][0], valueIdxs[i][1] = 0, 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
j int
|
|
|
|
|
keyCmp, valCmp int
|
|
|
|
|
)
|
2017-07-27 11:48:31 +00:00
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
for {
|
|
|
|
|
// Which of the provided TagValue sets currently holds the smallest element.
|
|
|
|
|
// j is the candidate we're going to next pick for the result set.
|
|
|
|
|
j = -1
|
|
|
|
|
|
|
|
|
|
// Find the smallest element
|
|
|
|
|
for i := 0; i < len(tvs); i++ {
|
|
|
|
|
if valueIdxs[i][0] >= len(tvs[i].keys) {
|
|
|
|
|
continue // We have completely drained all tag keys and values for this shard.
|
|
|
|
|
} else if len(tvs[i].values[valueIdxs[i][0]]) == 0 {
|
|
|
|
|
// There are no tag values for these keys.
|
|
|
|
|
valueIdxs[i][0]++
|
|
|
|
|
valueIdxs[i][1] = 0
|
|
|
|
|
continue
|
|
|
|
|
} else if j == -1 {
|
|
|
|
|
// We haven't picked a best TagValues set yet. Pick this one.
|
|
|
|
|
j = i
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// It this tag key is lower than the candidate's tag key
|
|
|
|
|
keyCmp = strings.Compare(tvs[i].keys[valueIdxs[i][0]], tvs[j].keys[valueIdxs[j][0]])
|
|
|
|
|
if keyCmp == -1 {
|
|
|
|
|
j = i
|
|
|
|
|
} else if keyCmp == 0 {
|
|
|
|
|
valCmp = strings.Compare(tvs[i].values[valueIdxs[i][0]][valueIdxs[i][1]], tvs[j].values[valueIdxs[j][0]][valueIdxs[j][1]])
|
|
|
|
|
// Same tag key but this tag value is lower than the candidate.
|
|
|
|
|
if valCmp == -1 {
|
|
|
|
|
j = i
|
|
|
|
|
} else if valCmp == 0 {
|
|
|
|
|
// Duplicate tag key/value pair.... Remove and move onto
|
|
|
|
|
// the next value for shard i.
|
|
|
|
|
valueIdxs[i][1]++
|
|
|
|
|
if valueIdxs[i][1] >= len(tvs[i].values[valueIdxs[i][0]]) {
|
|
|
|
|
// Drained all these tag values, move onto next key.
|
|
|
|
|
valueIdxs[i][0]++
|
|
|
|
|
valueIdxs[i][1] = 0
|
|
|
|
|
}
|
2017-07-27 11:48:31 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-01 17:26:35 +00:00
|
|
|
|
// We could have drained all of the TagValue sets and be done...
|
|
|
|
|
if j == -1 {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Append the smallest KeyValue
|
|
|
|
|
result.Values = append(result.Values, KeyValue{
|
|
|
|
|
Key: string(tvs[j].keys[valueIdxs[j][0]]),
|
|
|
|
|
Value: tvs[j].values[valueIdxs[j][0]][valueIdxs[j][1]],
|
|
|
|
|
})
|
|
|
|
|
// Increment the indexes for the chosen TagValue.
|
|
|
|
|
valueIdxs[j][1]++
|
|
|
|
|
if valueIdxs[j][1] >= len(tvs[j].values[valueIdxs[j][0]]) {
|
|
|
|
|
// Drained all these tag values, move onto next key.
|
|
|
|
|
valueIdxs[j][0]++
|
|
|
|
|
valueIdxs[j][1] = 0
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return result
|
2016-07-28 22:38:08 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-05-02 15:20:01 +00:00
|
|
|
|
func (s *Store) monitorShards() {
|
|
|
|
|
defer s.wg.Done()
|
|
|
|
|
t := time.NewTicker(10 * time.Second)
|
|
|
|
|
defer t.Stop()
|
2017-05-03 04:42:09 +00:00
|
|
|
|
t2 := time.NewTicker(time.Minute)
|
|
|
|
|
defer t2.Stop()
|
2017-05-02 15:20:01 +00:00
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.closing:
|
|
|
|
|
return
|
|
|
|
|
case <-t.C:
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
for _, sh := range s.shards {
|
|
|
|
|
if sh.IsIdle() {
|
2017-09-14 18:42:34 +00:00
|
|
|
|
if err := sh.Free(); err != nil {
|
2018-02-15 21:47:08 +00:00
|
|
|
|
s.Logger.Warn("Error while freeing cold shard resources", zap.Error(err))
|
2017-09-14 18:42:34 +00:00
|
|
|
|
}
|
2017-05-02 15:20:01 +00:00
|
|
|
|
} else {
|
|
|
|
|
sh.SetCompactionsEnabled(true)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
s.mu.RUnlock()
|
2017-05-03 04:42:09 +00:00
|
|
|
|
case <-t2.C:
|
|
|
|
|
if s.EngineOptions.Config.MaxValuesPerTag == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
shards := s.filterShards(func(sh *Shard) bool {
|
|
|
|
|
return sh.IndexType() == "inmem"
|
|
|
|
|
})
|
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
|
2017-05-08 19:34:40 +00:00
|
|
|
|
// No inmem shards...
|
|
|
|
|
if len(shards) == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-15 18:17:18 +00:00
|
|
|
|
var dbLock sync.Mutex
|
|
|
|
|
databases := make(map[string]struct{}, len(shards))
|
2017-05-08 19:34:40 +00:00
|
|
|
|
|
2017-05-03 04:42:09 +00:00
|
|
|
|
s.walkShards(shards, func(sh *Shard) error {
|
|
|
|
|
db := sh.database
|
2017-08-15 18:17:18 +00:00
|
|
|
|
|
|
|
|
|
// Only process 1 shard from each database
|
|
|
|
|
dbLock.Lock()
|
|
|
|
|
if _, ok := databases[db]; ok {
|
|
|
|
|
dbLock.Unlock()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
databases[db] = struct{}{}
|
|
|
|
|
dbLock.Unlock()
|
|
|
|
|
|
2018-01-08 18:34:04 +00:00
|
|
|
|
sfile := s.seriesFile(sh.database)
|
|
|
|
|
if sfile == nil {
|
|
|
|
|
return nil
|
2017-12-15 14:24:26 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-12-15 17:54:58 +00:00
|
|
|
|
firstShardIndex, err := sh.Index()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
index, err := sh.Index()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-15 18:17:18 +00:00
|
|
|
|
// inmem shards share the same index instance so just use the first one to avoid
|
|
|
|
|
// allocating the same measurements repeatedly
|
2017-12-15 17:54:58 +00:00
|
|
|
|
indexSet := IndexSet{Indexes: []Index{firstShardIndex}, SeriesFile: sfile}
|
2017-12-12 21:22:42 +00:00
|
|
|
|
names, err := indexSet.MeasurementNamesByExpr(nil, nil)
|
2017-08-15 18:17:18 +00:00
|
|
|
|
if err != nil {
|
2018-02-15 21:47:08 +00:00
|
|
|
|
s.Logger.Warn("Cannot retrieve measurement names", zap.Error(err))
|
2017-08-15 18:17:18 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
2017-05-03 04:42:09 +00:00
|
|
|
|
|
2017-12-15 17:54:58 +00:00
|
|
|
|
indexSet.Indexes = []Index{index}
|
2017-05-03 04:42:09 +00:00
|
|
|
|
for _, name := range names {
|
2017-12-12 21:22:42 +00:00
|
|
|
|
indexSet.ForEachMeasurementTagKey(name, func(k []byte) error {
|
2017-05-03 04:42:09 +00:00
|
|
|
|
n := sh.TagKeyCardinality(name, k)
|
|
|
|
|
perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100)
|
|
|
|
|
if perc > 100 {
|
|
|
|
|
perc = 100
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Log at 80, 85, 90-100% levels
|
|
|
|
|
if perc == 80 || perc == 85 || perc >= 90 {
|
2018-02-15 21:47:08 +00:00
|
|
|
|
s.Logger.Warn("max-values-per-tag limit may be exceeded soon",
|
|
|
|
|
zap.String("perc", fmt.Sprintf("%d%%", perc)),
|
|
|
|
|
zap.Int("n", n),
|
|
|
|
|
zap.Int("max", s.EngineOptions.Config.MaxValuesPerTag),
|
2018-02-21 20:08:44 +00:00
|
|
|
|
logger.Database(db),
|
2018-02-15 21:47:08 +00:00
|
|
|
|
zap.ByteString("measurement", name),
|
|
|
|
|
zap.ByteString("tag", k))
|
2017-05-03 04:42:09 +00:00
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
2017-05-02 15:20:01 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// KeyValue holds a string key and a string value.
|
2016-07-28 22:38:08 +00:00
|
|
|
|
type KeyValue struct {
|
|
|
|
|
Key, Value string
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// KeyValues is a sortable slice of KeyValue.
|
2016-07-28 22:38:08 +00:00
|
|
|
|
type KeyValues []KeyValue
|
|
|
|
|
|
2016-12-31 05:12:37 +00:00
|
|
|
|
// Len implements sort.Interface.
|
|
|
|
|
func (a KeyValues) Len() int { return len(a) }
|
|
|
|
|
|
|
|
|
|
// Swap implements sort.Interface.
|
2016-07-28 22:38:08 +00:00
|
|
|
|
func (a KeyValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
2016-12-31 05:12:37 +00:00
|
|
|
|
|
|
|
|
|
// Less implements sort.Interface. Keys are compared before values.
|
2016-07-28 22:38:08 +00:00
|
|
|
|
func (a KeyValues) Less(i, j int) bool {
|
|
|
|
|
ki, kj := a[i].Key, a[j].Key
|
|
|
|
|
if ki == kj {
|
|
|
|
|
return a[i].Value < a[j].Value
|
|
|
|
|
}
|
|
|
|
|
return ki < kj
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-14 13:55:44 +00:00
|
|
|
|
// decodeStorePath extracts the database and retention policy names
|
2016-02-26 19:41:54 +00:00
|
|
|
|
// from a given shard or WAL path.
|
2016-09-14 13:55:44 +00:00
|
|
|
|
func decodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {
|
2016-02-26 19:41:54 +00:00
|
|
|
|
// shardOrWALPath format: /maybe/absolute/base/then/:database/:retentionPolicy/:nameOfShardOrWAL
|
|
|
|
|
|
|
|
|
|
// Discard the last part of the path (the shard name or the wal name).
|
|
|
|
|
path, _ := filepath.Split(filepath.Clean(shardOrWALPath))
|
|
|
|
|
|
|
|
|
|
// Extract the database and retention policy.
|
|
|
|
|
path, rp := filepath.Split(filepath.Clean(path))
|
|
|
|
|
_, db := filepath.Split(filepath.Clean(path))
|
|
|
|
|
return db, rp
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-25 13:23:22 +00:00
|
|
|
|
// relativePath will expand out the full paths passed in and return
|
|
|
|
|
// the relative shard path from the store
|
|
|
|
|
func relativePath(storePath, shardPath string) (string, error) {
|
|
|
|
|
path, err := filepath.Abs(storePath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", fmt.Errorf("store abs path: %s", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fp, err := filepath.Abs(shardPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", fmt.Errorf("file abs path: %s", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
name, err := filepath.Rel(path, fp)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", fmt.Errorf("file rel path: %s", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return name, nil
|
|
|
|
|
}
|
2018-01-10 23:37:18 +00:00
|
|
|
|
|
|
|
|
|
type shardSet struct {
|
|
|
|
|
store *Store
|
|
|
|
|
db string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
|
|
|
|
|
s.store.mu.RLock()
|
|
|
|
|
shards := s.store.filterShards(byDatabase(s.db))
|
|
|
|
|
s.store.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
for _, sh := range shards {
|
|
|
|
|
idx, err := sh.Index()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if t, ok := idx.(interface {
|
|
|
|
|
SeriesIDSet() *SeriesIDSet
|
|
|
|
|
}); ok {
|
|
|
|
|
f(t.SeriesIDSet())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|