influxdb/tsdb/store.go

2601 lines
71 KiB
Go

package tsdb // import "github.com/influxdata/influxdb/tsdb"
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"maps"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/estimator/hll"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
// ErrShardNotFound is returned when trying to get a non existing shard.
ErrShardNotFound = fmt.Errorf("shard not found")
// ErrStoreClosed is returned when trying to use a closed Store.
ErrStoreClosed = fmt.Errorf("store is closed")
// ErrShardDeletion is returned when trying to create a shard that is being deleted
ErrShardDeletion = errors.New("shard is being deleted")
// ErrMultipleIndexTypes is returned when trying to do deletes on a database with
// multiple index types.
ErrMultipleIndexTypes = errors.New("cannot delete data. DB contains shards using both inmem and tsi1 indexes. Please convert all shards to use the same index type to delete data.")
)
// Statistics gathered by the store.
const (
statDatabaseSeries = "numSeries" // number of series in a database
statDatabaseMeasurements = "numMeasurements" // number of measurements in a database
statPointsWritten = "pointsWritten" // number of points parsed by engines successfully
statValuesWritten = "valuesWritten" // number of values parsed by engines successfully
statSeriesCreated = "seriesCreated" // number of series created since startup
)
// SeriesFileDirectory is the name of the directory containing series files for
// a database.
const SeriesFileDirectory = "_series"
// databaseState keeps track of the state of a database.
type databaseState struct{ indexTypes map[string]int }
// struct to hold the result of opening each readegr in a goroutine
type shardResponse struct {
s *Shard
err error
}
// addIndexType records that the database has a shard with the given index type.
func (d *databaseState) addIndexType(indexType string) {
if d.indexTypes == nil {
d.indexTypes = make(map[string]int)
}
d.indexTypes[indexType]++
}
// addIndexType records that the database no longer has a shard with the given index type.
func (d *databaseState) removeIndexType(indexType string) {
if d.indexTypes != nil {
d.indexTypes[indexType]--
if d.indexTypes[indexType] <= 0 {
delete(d.indexTypes, indexType)
}
}
}
// hasMultipleIndexTypes returns true if the database has multiple index types.
func (d *databaseState) hasMultipleIndexTypes() bool { return d != nil && len(d.indexTypes) > 1 }
type StoreStatistics struct {
ValuesWritten int64
PointsWritten int64
SeriesCreated int64
}
type shardErrorMap struct {
mu sync.Mutex
shardErrors map[uint64]error
}
func (se *shardErrorMap) setShardOpenError(shardID uint64, err error) {
se.mu.Lock()
defer se.mu.Unlock()
if err == nil {
delete(se.shardErrors, shardID)
} else {
// Ignore incoming error if it is from a previous open failure. We don't want to keep
// re-wrapping the same error. For safety, make sure we have an ErrPreviousShardFail in
// case we hadn't recorded it.
if !errors.Is(err, ErrPreviousShardFail{}) || !errors.Is(se.shardErrors[shardID], ErrPreviousShardFail{}) {
se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)}
}
}
}
func (se *shardErrorMap) shardError(shardID uint64) (error, bool) {
se.mu.Lock()
defer se.mu.Unlock()
oldErr, hasErr := se.shardErrors[shardID]
return oldErr, hasErr
}
// Store manages shards and indexes for databases.
type Store struct {
mu sync.RWMutex
shards map[uint64]*Shard
databases map[string]*databaseState
sfiles map[string]*SeriesFile
SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests.
path string
// shared per-database indexes, only if using "inmem".
indexes map[string]interface{}
// Maintains a set of shards that are in the process of deletion.
// This prevents new shards from being created while old ones are being deleted.
pendingShardDeletes map[uint64]struct{}
// Maintains a set of shards that failed to open
badShards shardErrorMap
// Epoch tracker helps serialize writes and deletes that may conflict. It
// is stored by shard.
epochs map[uint64]*epochTracker
// Statistics for the store
stats StoreStatistics
ingressMetrics IngressMetrics
EngineOptions EngineOptions
baseLogger *zap.Logger
Logger *zap.Logger
startupProgressMetrics interface {
AddShard()
CompletedShard()
}
closing chan struct{}
wg sync.WaitGroup
opened bool
}
// NewStore returns a new store with the given path and a default configuration.
// The returned store must be initialized by calling Open before using it.
func NewStore(path string) *Store {
logger := zap.NewNop()
return &Store{
databases: make(map[string]*databaseState),
path: path,
sfiles: make(map[string]*SeriesFile),
indexes: make(map[string]interface{}),
pendingShardDeletes: make(map[uint64]struct{}),
badShards: shardErrorMap{shardErrors: make(map[uint64]error)},
epochs: make(map[uint64]*epochTracker),
EngineOptions: NewEngineOptions(),
Logger: logger,
baseLogger: logger,
}
}
// WithLogger sets the logger for the store.
func (s *Store) WithLogger(log *zap.Logger) {
s.baseLogger = log
s.Logger = log.With(zap.String("service", "store"))
for _, sh := range s.shards {
sh.WithLogger(s.baseLogger)
}
}
func (s *Store) WithStartupMetrics(sp interface {
AddShard()
CompletedShard()
}) {
s.startupProgressMetrics = sp
}
// Statistics returns statistics for period monitoring.
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
s.mu.RLock()
shards := s.shardsSlice()
s.mu.RUnlock()
// Add all the series and measurements cardinality estimations.
databases := s.Databases()
statistics := make([]models.Statistic, 0, len(databases))
for _, database := range databases {
log := s.Logger.With(logger.Database(database))
sc, err := s.SeriesCardinality(context.Background(), database)
if err != nil {
log.Info("Cannot retrieve series cardinality", zap.Error(err))
continue
}
mc, err := s.MeasurementsCardinality(context.Background(), database)
if err != nil {
log.Info("Cannot retrieve measurement cardinality", zap.Error(err))
continue
}
statistics = append(statistics, models.Statistic{
Name: "database",
Tags: models.StatisticTags{"database": database}.Merge(tags),
Values: map[string]interface{}{
statDatabaseSeries: sc,
statDatabaseMeasurements: mc,
},
})
}
// Gather all statistics for all shards.
for _, shard := range shards {
statistics = append(statistics, shard.Statistics(tags)...)
}
statistics = append(statistics, models.Statistic{
Name: "localStore",
Tags: tags,
Values: map[string]interface{}{
statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten),
statValuesWritten: atomic.LoadInt64(&s.stats.ValuesWritten),
statSeriesCreated: atomic.LoadInt64(&s.stats.SeriesCreated),
},
})
ingressTags := func(key MetricKey, tags map[string]string) map[string]string {
newTags := make(map[string]string, 2)
if s.EngineOptions.Config.IngressMetricByMeasurement {
newTags["measurement"] = key.measurement
newTags["db"] = key.db
newTags["rp"] = key.rp
}
if s.EngineOptions.Config.IngressMetricByLogin {
newTags["login"] = key.login
}
return models.StatisticTags(newTags).Merge(tags)
}
s.ingressMetrics.ForEach(func(key MetricKey, points, values, series int64) {
statistics = append(statistics, models.Statistic{
Name: "ingress",
Tags: ingressTags(key, tags),
Values: map[string]interface{}{
statPointsWritten: points,
statValuesWritten: values,
statSeriesCreated: series,
},
})
})
return statistics
}
func (s *Store) IndexBytes() int {
// Build index set to work on.
is := IndexSet{Indexes: make([]Index, 0, len(s.shardIDs()))}
s.mu.RLock()
for _, sid := range s.shardIDs() {
shard, ok := s.shards[sid]
if !ok {
continue
}
if is.SeriesFile == nil {
is.SeriesFile = shard.sfile
}
is.Indexes = append(is.Indexes, shard.index)
}
s.mu.RUnlock()
is = is.DedupeInmemIndexes()
var b int
for _, idx := range is.Indexes {
b += idx.Bytes()
}
return b
}
// Path returns the store's root path.
func (s *Store) Path() string { return s.path }
// Open initializes the store, creating all necessary directories, loading all
// shards as well as initializing periodic maintenance of them.
func (s *Store) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.opened {
// Already open
return nil
}
s.closing = make(chan struct{})
s.shards = map[uint64]*Shard{}
s.Logger.Info("Using data dir", zap.String("path", s.Path()))
// Create directory.
if err := os.MkdirAll(s.path, 0777); err != nil {
return err
}
if err := s.loadShards(); err != nil {
return err
}
s.opened = true
if !s.EngineOptions.MonitorDisabled {
s.wg.Add(1)
go func() {
s.wg.Done()
s.monitorShards()
}()
}
return nil
}
// generateTrailingPath returns the last part of a shard path or WAL path
// based on the shardID, db, and rp.
func (s *Store) generateTrailingPath(shardID uint64, db, rp string) string {
return filepath.Join(db, rp, strconv.FormatUint(shardID, 10))
}
// generatePath returns the path to a shard based on its db, rp, and shardID.
func (s *Store) generatePath(shardID uint64, db, rp string) string {
return filepath.Join(s.path, s.generateTrailingPath(shardID, db, rp))
}
// generateWALPath returns the WAL path to a shard based on its db, rp, and shardID.
func (s *Store) generateWALPath(shardID uint64, db, rp string) string {
return filepath.Join(s.EngineOptions.Config.WALDir, s.generateTrailingPath(shardID, db, rp))
}
// shardLoader is an independent object that can load shards from disk in a thread-safe manner.
// It should be created with Store.newShardLoader. The result of shardLoader.Load should then
// be registered with Store.registerShard.
type shardLoader struct {
// NOTE: shardLoader should not directly reference the Store that creates it or any of its fields.
shardID uint64
db string
rp string
sfile *SeriesFile
engineOpts EngineOptions
enabled bool
logger *zap.Logger
// Shard we are working with. Could be created by the loader or given by client code.
shard *Shard
// Should be loaded even if loading failed previously?
force bool
// Path to shard on disk
path string
// Path to WAL on disk
walPath string
// loadErr indicates if Load should fail immediately with an error.
loadErr error
}
// Load loads a shard from disk in a thread-safe manner. After a call to Load,
// the result must be registered with Store.registerShard, whether or not an error
// occurred. The returned shard is guaranteed to not be nil and have the correct shard ID,
// although it will not be properly loaded if there was an error.
func (l *shardLoader) Load() *shardResponse {
// Open engine.
if l.shard == nil {
l.shard = NewShard(l.shardID, l.path, l.walPath, l.sfile, l.engineOpts)
// Set options based on caller preferences.
l.shard.EnableOnOpen = l.enabled
l.shard.CompactionDisabled = l.engineOpts.CompactionDisabled
l.shard.WithLogger(l.logger)
}
err := func() error {
// Stop and return error if previous open failed.
if l.loadErr != nil {
return l.loadErr
}
// Open the shard.
return l.shard.Open()
}()
return &shardResponse{s: l.shard, err: err}
}
type shardLoaderOption func(*shardLoader)
// withForceLoad allows forcing shard opens even if a previous load failed with an error.
func withForceLoad(force bool) shardLoaderOption {
return func(l *shardLoader) {
l.force = force
}
}
// withIndexVersion overrides the index version for a shard during loading.
func withIndexVersion(indexVersion string) shardLoaderOption {
return func(l *shardLoader) {
l.engineOpts.IndexVersion = indexVersion
}
}
// withExistingShard uses an existing Shard already registered with Store instead
// of creating a new one.
func withExistingShard(shard *Shard) shardLoaderOption {
return func(l *shardLoader) {
l.shard = shard
}
}
// newShardLoader generates a shardLoader that can be used to load a shard in a
// thread-safe manner. The result of the shardLoader.Load() must then be
// populated into s using Store.registerShard.
// s.mu must be held before calling newShardLoader. newShardLoader is not thread-safe.
// Note that any errors detected during newShardLoader will not be returned to caller until
// Load is called. This is to simplify error handling for client code.
func (s *Store) newShardLoader(shardID uint64, db, rp string, enabled bool, opts ...shardLoaderOption) *shardLoader {
l := &shardLoader{
shardID: shardID,
db: db,
rp: rp,
engineOpts: s.EngineOptions,
enabled: enabled,
logger: s.baseLogger,
path: s.generatePath(shardID, db, rp),
walPath: s.generateWALPath(shardID, db, rp),
}
for _, o := range opts {
o(l)
}
// Check for error from last load attempt.
lastErr, _ := s.badShards.shardError(shardID)
if lastErr != nil && !l.force {
l.loadErr = fmt.Errorf("not attempting to open shard %d; %w", shardID, lastErr)
return l
}
// Provide an implementation of the ShardIDSets
l.engineOpts.SeriesIDSets = shardSet{store: s, db: db}
// Retrieve cached series file or load it if not cached in s.
sfile, err := s.openSeriesFile(db)
if err != nil {
l.loadErr = fmt.Errorf("error loading series file for database %q: %w", db, err)
return l
}
l.sfile = sfile
// Retrieve cached index or create if not already cached in s.
idx, err := s.createIndexIfNotExists(db)
if err != nil {
l.loadErr = fmt.Errorf("error loading index for database %q: %w", db, err)
return l
}
l.engineOpts.InmemIndex = idx
return l
}
// loadShards loads all shards on disk. s.mu must be held before calling loadShards.
func (s *Store) loadShards() error {
// Limit the number of concurrent TSM files to be opened to the number of cores.
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
// Setup a shared limiter for compactions
lim := s.EngineOptions.Config.MaxConcurrentCompactions
if lim == 0 {
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions
if lim < 1 {
lim = 1
}
}
// Don't allow more compactions to run than cores.
if lim > runtime.GOMAXPROCS(0) {
lim = runtime.GOMAXPROCS(0)
}
s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)
compactionSettings := []zapcore.Field{zap.Int("max_concurrent_compactions", lim)}
throughput := int(s.EngineOptions.Config.CompactThroughput)
throughputBurst := int(s.EngineOptions.Config.CompactThroughputBurst)
if throughput > 0 {
if throughputBurst < throughput {
throughputBurst = throughput
}
compactionSettings = append(
compactionSettings,
zap.Int("throughput_bytes_per_second", throughput),
zap.Int("throughput_bytes_per_second_burst", throughputBurst),
)
s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(throughput, throughputBurst)
} else {
compactionSettings = append(
compactionSettings,
zap.String("throughput_bytes_per_second", "unlimited"),
zap.String("throughput_bytes_per_second_burst", "unlimited"),
)
}
s.Logger.Info("Compaction settings", compactionSettings...)
log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open")
defer logEnd()
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
// Get list of shards and their db / rp.
shards, err := s.findShards(log)
if err != nil {
return fmt.Errorf("error while finding shards to load: %w", err)
}
// Setup progress metrics.
if s.startupProgressMetrics != nil {
for _, _ = range shards {
s.startupProgressMetrics.AddShard()
}
}
// Do the actual work of loading shards.
shardResC := make(chan *shardResponse, len(shards))
pendingShardCount := 0
for _, sh := range shards {
pendingShardCount++
// Override index type in EngineOptions if no TSI index found on disk.
indexVersion := s.EngineOptions.IndexVersion
if indexVersion != InmemIndexName {
if _, err := os.Stat(filepath.Join(s.generatePath(sh.id, sh.db, sh.rp), "index")); errors.Is(err, fs.ErrNotExist) {
indexVersion = InmemIndexName
}
}
// loader must be created serially for thread-safety, then they can be used in parallel manner.
loader := s.newShardLoader(sh.id, sh.db, sh.rp, false, withIndexVersion(indexVersion))
// Now perform the actual loading in parallel in separate goroutines.
go func(log *zap.Logger) {
t.Take()
defer t.Release()
start := time.Now()
res := loader.Load()
if res.err == nil {
log.Info("Opened shard", zap.String("index_version", res.s.IndexType()), zap.Duration("duration", time.Since(start)))
} else {
log.Error("Failed to open shard", zap.Error(res.err))
}
shardResC <- res
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
}(log.With(logger.Shard(sh.id), zap.String("path", loader.path)))
}
// Register shards serially as the parallel goroutines finish opening them.
for finishedShardCount := 0; finishedShardCount < pendingShardCount; finishedShardCount++ {
res := <-shardResC
s.registerShard(res)
}
// Check and log if any databases are running multiple index types.
s.warnMixedIndexTypes()
// Enable all shards
for _, sh := range s.shards {
sh.SetEnabled(true)
if isIdle, _ := sh.IsIdle(); isIdle {
if err := sh.Free(); err != nil {
return err
}
}
}
return nil
}
// registerShard registers a shardResponse from a shardLoader.Load operation with s.
// registerShard should always be called with the result of shardLoader.Load, even if
// the shard loading failed. This makes sure errors opening shards are properly tracked.
// s.mu should be held before calling registerShard. registerShard is not thread-safe and
// and should not be used in a paralell manner.
func (s *Store) registerShard(res *shardResponse) {
if res.s == nil {
s.Logger.Error("registerShard called with nil")
return
}
if res.err != nil {
s.badShards.setShardOpenError(res.s.ID(), res.err)
return
}
// Avoid registering an already registered shard.
if s.shards[res.s.id] != res.s {
s.shards[res.s.id] = res.s
s.epochs[res.s.id] = newEpochTracker()
if _, ok := s.databases[res.s.database]; !ok {
s.databases[res.s.database] = new(databaseState)
}
s.databases[res.s.database].addIndexType(res.s.IndexType())
}
}
// warnMixedIndexTypes checks the databases listed in dbList for mixed
// index types and logs warnings if any are found. If no dbList is given, then
// all databases in s are checked.
func (s *Store) warnMixedIndexTypes(dbList ...string) {
var dbStates map[string]*databaseState
if len(dbList) == 0 {
dbStates = s.databases
} else {
dbStates = make(map[string]*databaseState)
for _, db := range dbList {
if state, ok := s.databases[db]; ok {
dbStates[db] = state
}
}
}
for db, state := range dbStates {
if state.hasMultipleIndexTypes() {
var fields []zapcore.Field
for idx, cnt := range state.indexTypes {
fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))
}
s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...)
}
}
}
// Close closes the store and all associated shards. After calling Close accessing
// shards through the Store will result in ErrStoreClosed being returned.
func (s *Store) Close() error {
s.mu.Lock()
if s.opened {
close(s.closing)
}
s.mu.Unlock()
s.wg.Wait()
// No other goroutines accessing the store, so no need for a Lock.
// Close all the shards in parallel.
if err := s.walkShards(s.shardsSlice(), func(sh *Shard) error {
return sh.Close()
}); err != nil {
return err
}
s.mu.Lock()
for _, sfile := range s.sfiles {
// Close out the series files.
if err := sfile.Close(); err != nil {
s.mu.Unlock()
return err
}
}
s.databases = make(map[string]*databaseState)
s.sfiles = map[string]*SeriesFile{}
s.indexes = make(map[string]interface{})
s.pendingShardDeletes = make(map[uint64]struct{})
s.shards = nil
s.opened = false // Store may now be opened again.
s.mu.Unlock()
return nil
}
// epochsForShards returns a copy of the epoch trackers only including what is necessary
// for the provided shards. Must be called under the lock.
func (s *Store) epochsForShards(shards []*Shard) map[uint64]*epochTracker {
out := make(map[uint64]*epochTracker)
for _, sh := range shards {
out[sh.id] = s.epochs[sh.id]
}
return out
}
// openSeriesFile either returns or creates a series file for the provided
// database. It must be called under a full lock.
func (s *Store) openSeriesFile(database string) (*SeriesFile, error) {
if sfile := s.sfiles[database]; sfile != nil {
return sfile, nil
}
sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileDirectory))
sfile.WithMaxCompactionConcurrency(s.EngineOptions.Config.SeriesFileMaxConcurrentSnapshotCompactions)
sfile.Logger = s.baseLogger
if err := sfile.Open(); err != nil {
return nil, err
}
s.sfiles[database] = sfile
return sfile, nil
}
func (s *Store) seriesFile(database string) *SeriesFile {
s.mu.RLock()
defer s.mu.RUnlock()
return s.sfiles[database]
}
// createIndexIfNotExists returns a shared index for a database, if the inmem
// index is being used. If the TSI index is being used, then this method is
// basically a no-op.
func (s *Store) createIndexIfNotExists(name string) (interface{}, error) {
if idx := s.indexes[name]; idx != nil {
return idx, nil
}
sfile, err := s.openSeriesFile(name)
if err != nil {
return nil, err
}
idx, err := NewInmemIndex(name, sfile)
if err != nil {
return nil, err
}
s.indexes[name] = idx
return idx, nil
}
// Shard returns a shard by id.
func (s *Store) Shard(id uint64) *Shard {
s.mu.RLock()
defer s.mu.RUnlock()
sh, ok := s.shards[id]
if !ok {
return nil
}
return sh
}
// ClearBadShardList will remove all shards from the badShards cache
// this will allow for lazy loading of bad shards if/when they are no
// longer in a "bad" state. This method will return any shards that
// were removed from the cache.
func (s *Store) ClearBadShardList() map[uint64]error {
badShards := s.GetBadShardList()
s.mu.Lock()
defer s.mu.Unlock()
clear(s.badShards.shardErrors)
return badShards
}
// GetBadShardList is exposed as a method for test purposes
func (s *Store) GetBadShardList() map[uint64]error {
s.badShards.mu.Lock()
defer s.badShards.mu.Unlock()
if s.badShards.shardErrors == nil {
s.Logger.Warn("badShards was nil")
s.badShards.shardErrors = make(map[uint64]error)
}
shardList := maps.Clone(s.badShards.shardErrors)
return shardList
}
type ErrPreviousShardFail struct {
error
}
func (e ErrPreviousShardFail) Unwrap() error {
return e.error
}
func (e ErrPreviousShardFail) Is(err error) bool {
_, sOk := err.(ErrPreviousShardFail)
_, pOk := err.(*ErrPreviousShardFail)
return sOk || pOk
}
func (e ErrPreviousShardFail) Error() string {
return e.error.Error()
}
func (s *Store) ReopenShard(shardID uint64, force bool) error {
sh := s.Shard(shardID)
if sh == nil {
return ErrShardNotFound
}
s.mu.Lock()
defer s.mu.Unlock()
loader := s.newShardLoader(shardID, "", "", true, withExistingShard(sh), withForceLoad(force))
res := loader.Load()
s.registerShard(res)
return res.err
}
func (s *Store) SetShardOpenErrorForTest(shardID uint64, err error) {
s.badShards.setShardOpenError(shardID, err)
}
// Shards returns a list of shards by id.
func (s *Store) Shards(ids []uint64) []*Shard {
s.mu.RLock()
defer s.mu.RUnlock()
a := make([]*Shard, 0, len(ids))
for _, id := range ids {
sh, ok := s.shards[id]
if !ok {
continue
}
a = append(a, sh)
}
return a
}
// ShardGroup returns a ShardGroup with a list of shards by id.
func (s *Store) ShardGroup(ids []uint64) ShardGroup {
return Shards(s.Shards(ids))
}
// ShardN returns the number of shards in the store.
func (s *Store) ShardN() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.shards)
}
// ShardDigest returns a digest of the shard with the specified ID.
func (s *Store) ShardDigest(id uint64) (io.ReadCloser, int64, error) {
sh := s.Shard(id)
if sh == nil {
return nil, 0, ErrShardNotFound
}
readCloser, size, err, _ := sh.Digest()
return readCloser, size, err
}
// CreateShard creates a shard with the given id and retention policy on a database.
func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error {
s.mu.Lock()
defer s.mu.Unlock()
select {
case <-s.closing:
return ErrStoreClosed
default:
}
// Shard already exists.
if _, ok := s.shards[shardID]; ok {
return nil
}
// Shard may be undergoing a pending deletion. While the shard can be
// recreated, it must wait for the pending delete to finish.
if _, ok := s.pendingShardDeletes[shardID]; ok {
return ErrShardDeletion
}
// Create the db and retention policy directories if they don't exist.
if err := os.MkdirAll(filepath.Join(s.path, database, retentionPolicy), 0700); err != nil {
return err
}
// Create the WAL directory.
walPath := filepath.Join(s.EngineOptions.Config.WALDir, database, retentionPolicy, fmt.Sprintf("%d", shardID))
if err := os.MkdirAll(walPath, 0700); err != nil {
return err
}
loader := s.newShardLoader(shardID, database, retentionPolicy, enabled)
res := loader.Load()
s.registerShard(res)
s.warnMixedIndexTypes(database)
return res.err
}
// CreateShardSnapShot will create a hard link to the underlying shard and return a path.
// The caller is responsible for cleaning up (removing) the file path returned.
func (s *Store) CreateShardSnapshot(id uint64, skipCacheOk bool) (string, error) {
sh := s.Shard(id)
if sh == nil {
return "", ErrShardNotFound
}
return sh.CreateSnapshot(skipCacheOk)
}
// SetShardEnabled enables or disables a shard for read and writes.
func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error {
sh := s.Shard(shardID)
if sh == nil {
return ErrShardNotFound
}
sh.SetEnabled(enabled)
return nil
}
// SetShardNewReadersBlocked sets if new readers can access the shard. If blocked
// is true, the number of reader blocks is incremented and new readers will
// receive an error instead of shard access. If blocked is false, the number
// of reader blocks is decremented. If the reader blocks drops to 0, then
// new readers will be granted access to the shard.
func (s *Store) SetShardNewReadersBlocked(shardID uint64, blocked bool) error {
sh := s.Shard(shardID)
if sh == nil {
return fmt.Errorf("SetShardNewReadersBlocked: shardID=%d, blocked=%t: %w", shardID, blocked, ErrShardNotFound)
}
return sh.SetNewReadersBlocked(blocked)
}
// ShardInUse returns true if a shard is in-use (e.g. has active readers).
// SetShardNewReadersBlocked(id, true) should be called before checking
// ShardInUse to prevent race conditions where a reader could gain
// access to the shard immediately after ShardInUse is called.
func (s *Store) ShardInUse(shardID uint64) (bool, error) {
sh := s.Shard(shardID)
if sh == nil {
return false, fmt.Errorf("ShardInUse: shardID=%d: %w", shardID, ErrShardNotFound)
}
return sh.InUse()
}
// DeleteShard removes a shard from disk.
func (s *Store) DeleteShard(shardID uint64) error {
sh := s.Shard(shardID)
if sh == nil {
return ErrShardNotFound
}
// Remove the shard from Store, so it's not returned to callers requesting
// shards. Also mark that this shard is currently being deleted in a separate
// map so that we do not have to retain the global store lock while deleting
// files.
s.mu.Lock()
if _, ok := s.pendingShardDeletes[shardID]; ok {
// We are already being deleted? This is possible if delete shard
// was called twice in sequence before the shard could be removed from
// the mapping.
// This is not an error because deleting a shard twice is not an error.
s.mu.Unlock()
return nil
}
delete(s.shards, shardID)
s.pendingShardDeletes[shardID] = struct{}{}
db := sh.Database()
// Determine if the shard contained any series that are not present in any
// other shards in the database.
shards := s.filterShards(byDatabase(db))
epoch := s.epochs[shardID]
s.mu.Unlock()
// Ensure the pending deletion flag is cleared on exit.
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.epochs, shardID)
delete(s.pendingShardDeletes, shardID)
}()
// Get the shard's local bitset of series IDs.
index, err := sh.Index()
if err != nil {
return err
}
ss := index.SeriesIDSet()
err = s.walkShards(shards, func(sh *Shard) error {
index, err := sh.Index()
if err != nil {
s.Logger.Error("cannot find shard index", zap.Uint64("shard_id", sh.ID()), zap.Error(err))
return err
}
ss.Diff(index.SeriesIDSet())
return nil
})
if err != nil {
// We couldn't get the index for a shard. Rather than deleting series which may
// exist in that shard as well as in the current shard, we stop the current deletion
return err
}
// Remove any remaining series in the set from the series file, as they don't
// exist in any of the database's remaining shards.
if ss.Cardinality() > 0 {
sfile := s.seriesFile(db)
if sfile != nil {
// If the inmem index is in use, then the series being removed from the
// series file will also need to be removed from the index.
if index.Type() == InmemIndexName {
var keyBuf []byte // Series key buffer.
var name []byte
var tagsBuf models.Tags // Buffer for tags container.
var errs []error
ss.ForEach(func(id uint64) {
skey := sfile.SeriesKey(id) // Series File series key
if skey == nil {
return
}
name, tagsBuf = ParseSeriesKeyInto(skey, tagsBuf)
keyBuf = keyBuf[:0]
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
if tmpErr := index.DropSeriesGlobal(keyBuf); tmpErr != nil {
sfile.Logger.Error(
"cannot drop series",
zap.Uint64("series_id", id),
zap.String("key", string(keyBuf)),
zap.Error(tmpErr))
errs = append(errs, tmpErr)
}
})
if len(errs) != 0 {
return errors.Join(errs...)
}
}
ss.ForEach(func(id uint64) {
if err := sfile.DeleteSeriesID(id); err != nil {
sfile.Logger.Error(
"cannot delete series in shard",
zap.Uint64("series_id", id),
zap.Uint64("shard_id", shardID),
zap.Error(err))
}
})
}
}
// enter the epoch tracker
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)
// wait for any guards before closing the shard
for _, guard := range guards {
guard.Wait()
}
// Close the shard.
if err := sh.Close(); err != nil {
return err
}
// Remove the on-disk shard data.
if err := os.RemoveAll(sh.path); err != nil {
return err
} else if err = os.RemoveAll(sh.walPath); err != nil {
return err
} else {
// Remove index type from the database on success
s.databases[db].removeIndexType(sh.IndexType())
return nil
}
}
// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (s *Store) DeleteDatabase(name string) error {
s.mu.RLock()
if _, ok := s.databases[name]; !ok {
s.mu.RUnlock()
// no files locally, so nothing to do
return nil
}
shards := s.filterShards(byDatabase(name))
epochs := s.epochsForShards(shards)
s.mu.RUnlock()
if err := s.walkShards(shards, func(sh *Shard) error {
if sh.database != name {
return nil
}
epoch := epochs[sh.id]
// enter the epoch tracker
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)
// wait for any guards before closing the shard
for _, guard := range guards {
guard.Wait()
}
return sh.Close()
}); err != nil {
return err
}
dbPath := filepath.Clean(filepath.Join(s.path, name))
s.mu.Lock()
defer s.mu.Unlock()
sfile := s.sfiles[name]
delete(s.sfiles, name)
// Close series file.
if sfile != nil {
if err := sfile.Close(); err != nil {
return err
}
}
// extra sanity check to make sure that even if someone named their database "../.."
// that we don't delete everything because of it, they'll just have extra files forever
if filepath.Clean(s.path) != filepath.Dir(dbPath) {
return fmt.Errorf("invalid database directory location for database '%s': %s", name, dbPath)
}
if err := os.RemoveAll(dbPath); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, name)); err != nil {
return err
}
for _, sh := range shards {
delete(s.shards, sh.id)
delete(s.epochs, sh.id)
}
// Remove database from store list of databases
delete(s.databases, name)
// Remove shared index for database if using inmem index.
delete(s.indexes, name)
return nil
}
// DeleteRetentionPolicy will close all shards associated with the
// provided retention policy, remove the retention policy directories on
// both the DB and WAL, and remove all shard files from disk.
func (s *Store) DeleteRetentionPolicy(database, name string) error {
s.mu.RLock()
if _, ok := s.databases[database]; !ok {
s.mu.RUnlock()
// unknown database, nothing to do
return nil
}
shards := s.filterShards(ComposeShardFilter(byDatabase(database), byRetentionPolicy(name)))
s.mu.RUnlock()
// Close and delete all shards under the retention policy on the
// database.
if err := s.walkShards(shards, func(sh *Shard) error {
if sh.database != database || sh.retentionPolicy != name {
return nil
}
return sh.Close()
}); err != nil {
return err
}
// Remove the retention policy folder.
rpPath := filepath.Clean(filepath.Join(s.path, database, name))
// ensure Store's path is the grandparent of the retention policy
if filepath.Clean(s.path) != filepath.Dir(filepath.Dir(rpPath)) {
return fmt.Errorf("invalid path for database '%s', retention policy '%s': %s", database, name, rpPath)
}
// Remove the retention policy folder.
if err := os.RemoveAll(filepath.Join(s.path, database, name)); err != nil {
return err
}
// Remove the retention policy folder from the the WAL.
if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, database, name)); err != nil {
return err
}
s.mu.Lock()
state := s.databases[database]
for _, sh := range shards {
delete(s.shards, sh.id)
state.removeIndexType(sh.IndexType())
}
s.mu.Unlock()
return nil
}
// DeleteMeasurement removes a measurement and all associated series from a database.
func (s *Store) DeleteMeasurement(database, name string) error {
s.mu.RLock()
if s.databases[database].hasMultipleIndexTypes() {
s.mu.RUnlock()
return ErrMultipleIndexTypes
}
shards := s.filterShards(byDatabase(database))
epochs := s.epochsForShards(shards)
s.mu.RUnlock()
// Limit deletes for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes)
return s.walkShards(shards, func(sh *Shard) error {
limit.Take()
defer limit.Release()
// install our guard and wait for any prior deletes to finish. the
// guard ensures future deletes that could conflict wait for us.
guard := newGuard(influxql.MinTime, influxql.MaxTime, []string{name}, nil)
waiter := epochs[sh.id].WaitDelete(guard)
waiter.Wait()
defer waiter.Done()
return sh.DeleteMeasurement([]byte(name))
})
}
// filterShards returns a slice of shards where fn returns true
// for the shard. If the provided predicate is nil then all shards are returned.
// filterShards should be called under a lock.
func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard {
var shards []*Shard
if fn == nil {
shards = make([]*Shard, 0, len(s.shards))
fn = func(*Shard) bool { return true }
} else {
shards = make([]*Shard, 0)
}
for _, sh := range s.shards {
if fn(sh) {
shards = append(shards, sh)
}
}
return shards
}
type ShardPredicate = func(sh *Shard) bool
func ComposeShardFilter(fns ...ShardPredicate) ShardPredicate {
return func(sh *Shard) bool {
for _, fn := range fns {
if !fn(sh) {
return false
}
}
return true
}
}
// byDatabase provides a predicate for filterShards that matches on the name of
// the database passed in.
func byDatabase(name string) ShardPredicate {
return func(sh *Shard) bool {
return sh.database == name
}
}
// byRetentionPolicy provides a predicate for filterShards that matches on the name of
// the retention policy passed in.
func byRetentionPolicy(name string) ShardPredicate {
return func(sh *Shard) bool {
return sh.retentionPolicy == name
}
}
func byIndexType(name string) ShardPredicate {
return func(sh *Shard) bool {
return sh.IndexType() == name
}
}
// walkShards apply a function to each shard in parallel. fn must be safe for
// concurrent use. If any of the functions return an error, the first error is
// returned.
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
resC := make(chan shardResponse, len(shards))
var n int
for _, sh := range shards {
n++
go func(sh *Shard) {
if err := fn(sh); err != nil {
resC <- shardResponse{err: fmt.Errorf("shard %d: %s", sh.id, err)}
return
}
resC <- shardResponse{}
}(sh)
}
var err error
for i := 0; i < n; i++ {
res := <-resC
if res.err != nil {
err = res.err
}
}
close(resC)
return err
}
// ShardIDs returns a slice of all ShardIDs under management.
func (s *Store) ShardIDs() []uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.shardIDs()
}
func (s *Store) shardIDs() []uint64 {
a := make([]uint64, 0, len(s.shards))
for shardID := range s.shards {
a = append(a, shardID)
}
return a
}
// shardsSlice returns an ordered list of shards.
func (s *Store) shardsSlice() []*Shard {
a := make([]*Shard, 0, len(s.shards))
for _, sh := range s.shards {
a = append(a, sh)
}
sort.Sort(Shards(a))
return a
}
// Databases returns the names of all databases managed by the store.
func (s *Store) Databases() []string {
s.mu.RLock()
defer s.mu.RUnlock()
databases := make([]string, 0, len(s.databases))
for k := range s.databases {
databases = append(databases, k)
}
return databases
}
// DiskSize returns the size of all the shard files in bytes.
// This size does not include the WAL size.
func (s *Store) DiskSize() (int64, error) {
var size int64
s.mu.RLock()
allShards := s.filterShards(nil)
s.mu.RUnlock()
for _, sh := range allShards {
sz, err := sh.DiskSize()
if err != nil {
return 0, err
}
size += sz
}
return size, nil
}
// sketchesForDatabase returns merged sketches for the provided database, by
// walking each shard in the database and merging the sketches found there.
func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (estimator.Sketch, estimator.Sketch, error) {
var (
ss estimator.Sketch // Sketch estimating number of items.
ts estimator.Sketch // Sketch estimating number of tombstoned items.
)
s.mu.RLock()
shards := s.filterShards(byDatabase(dbName))
s.mu.RUnlock()
// Never return nil sketches. In the case that db exists but no data written
// return empty sketches.
if len(shards) == 0 {
ss, ts = hll.NewDefaultPlus(), hll.NewDefaultPlus()
}
// Iterate over all shards for the database and combine all of the sketches.
for _, shard := range shards {
s, t, err := getSketches(shard)
if err != nil {
return nil, nil, err
}
if ss == nil {
ss, ts = s, t
} else if err = ss.Merge(s); err != nil {
return nil, nil, err
} else if err = ts.Merge(t); err != nil {
return nil, nil, err
}
}
return ss, ts, nil
}
// SeriesCardinality returns the exact series cardinality for the provided
// database.
//
// Cardinality is calculated exactly by unioning all shards' bitsets of series
// IDs. The result of this method cannot be combined with any other results.
func (s *Store) SeriesCardinality(ctx context.Context, database string) (int64, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
var setMu sync.Mutex
others := make([]*SeriesIDSet, 0, len(shards))
err := s.walkShards(shards, func(sh *Shard) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
index, err := sh.Index()
if err != nil {
return err
}
seriesIDs := index.SeriesIDSet()
setMu.Lock()
others = append(others, seriesIDs)
setMu.Unlock()
return nil
}
})
if err != nil {
return 0, err
}
ss := NewSeriesIDSet()
ss.Merge(others...)
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
return int64(ss.Cardinality()), nil
}
}
// SeriesSketches returns the sketches associated with the series data in all
// the shards in the provided database.
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) SeriesSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.SeriesSketches()
})
}
// MeasurementsCardinality returns an estimation of the measurement cardinality
// for the provided database.
//
// Cardinality is calculated using a sketch-based estimation. The result of this
// method cannot be combined with any other results.
func (s *Store) MeasurementsCardinality(ctx context.Context, database string) (int64, error) {
ss, ts, err := s.MeasurementsSketches(ctx, database)
if err != nil {
return 0, err
}
mc := int64(ss.Count() - ts.Count())
if mc < 0 {
mc = 0
}
return mc, nil
}
// MeasurementsSketches returns the sketches associated with the measurement
// data in all the shards in the provided database.
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) MeasurementsSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
// every iteration, check for timeout.
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.MeasurementsSketches()
}
})
}
// BackupShard will get the shard and have the engine backup since the passed in
// time to the writer.
func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
shard := s.Shard(id)
if shard == nil {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}
path, err := relativePath(s.path, shard.path)
if err != nil {
return err
}
return shard.Backup(w, path, since)
}
func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error {
shard := s.Shard(id)
if shard == nil {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}
path, err := relativePath(s.path, shard.path)
if err != nil {
return err
}
return shard.Export(w, path, start, end)
}
// RestoreShard restores a backup from r to a given shard.
// This will only overwrite files included in the backup.
func (s *Store) RestoreShard(id uint64, r io.Reader) error {
shard := s.Shard(id)
if shard == nil {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}
path, err := relativePath(s.path, shard.path)
if err != nil {
return err
}
return shard.Restore(r, path)
}
// ImportShard imports the contents of r to a given shard.
// All files in the backup are added as new files which may
// cause duplicated data to occur requiring more expensive
// compactions.
func (s *Store) ImportShard(id uint64, r io.Reader) error {
shard := s.Shard(id)
if shard == nil {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}
path, err := relativePath(s.path, shard.path)
if err != nil {
return err
}
return shard.Import(r, path)
}
// ShardRelativePath will return the relative path to the shard, i.e.,
// <database>/<retention>/<id>.
func (s *Store) ShardRelativePath(id uint64) (string, error) {
shard := s.Shard(id)
if shard == nil {
return "", fmt.Errorf("shard %d doesn't exist on this server", id)
}
return relativePath(s.path, shard.path)
}
// DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys.
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
// Expand regex expressions in the FROM clause.
a, err := s.ExpandSources(sources)
if err != nil {
return err
} else if len(sources) > 0 && len(a) == 0 {
return nil
}
sources = a
// Determine deletion time range.
condition, timeRange, err := influxql.ConditionExpr(condition, nil)
if err != nil {
return err
}
var min, max int64
if !timeRange.Min.IsZero() {
min = timeRange.Min.UnixNano()
} else {
min = influxql.MinTime
}
if !timeRange.Max.IsZero() {
max = timeRange.Max.UnixNano()
} else {
max = influxql.MaxTime
}
s.mu.RLock()
if s.databases[database].hasMultipleIndexTypes() {
s.mu.RUnlock()
return ErrMultipleIndexTypes
}
sfile := s.sfiles[database]
if sfile == nil {
s.mu.RUnlock()
// No series file means nothing has been written to this DB and thus nothing to delete.
return nil
}
shardFilterFn := byDatabase(database)
if len(sources) != 0 {
var rp string
for idx, source := range sources {
if measurement, ok := source.(*influxql.Measurement); ok {
if idx == 0 {
rp = measurement.RetentionPolicy
} else if rp != measurement.RetentionPolicy {
return fmt.Errorf("mixed retention policies not supported, wanted %q got %q", rp, measurement.RetentionPolicy)
}
} else {
return fmt.Errorf("unsupported source type in delete %v", source)
}
}
if rp != "" {
shardFilterFn = ComposeShardFilter(shardFilterFn, byRetentionPolicy(rp))
}
}
shards := s.filterShards(shardFilterFn)
epochs := s.epochsForShards(shards)
s.mu.RUnlock()
// Limit deletes for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes)
return s.walkShards(shards, func(sh *Shard) error {
// Determine list of measurements from sources.
// Use all measurements if no FROM clause was provided.
var names []string
if len(sources) > 0 {
for _, source := range sources {
names = append(names, source.(*influxql.Measurement).Name)
}
} else {
if err := sh.ForEachMeasurementName(func(name []byte) error {
names = append(names, string(name))
return nil
}); err != nil {
return err
}
}
sort.Strings(names)
limit.Take()
defer limit.Release()
// install our guard and wait for any prior deletes to finish. the
// guard ensures future deletes that could conflict wait for us.
waiter := epochs[sh.id].WaitDelete(newGuard(min, max, names, condition))
waiter.Wait()
defer waiter.Done()
index, err := sh.Index()
if err != nil {
return err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sfile}
// Find matching series keys for each measurement.
for _, name := range names {
itr, err := indexSet.MeasurementSeriesByExprIterator([]byte(name), condition)
if err != nil {
return err
} else if itr == nil {
continue
}
defer itr.Close()
if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max); err != nil {
return err
}
}
return nil
})
}
// ExpandSources expands sources against all local shards.
func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
shards := func() Shards {
s.mu.RLock()
defer s.mu.RUnlock()
return Shards(s.shardsSlice())
}()
return shards.ExpandSources(sources)
}
func (s *Store) statsTracker(db, rp, login string) StatsTracker {
var tracker StatsTracker
if s.EngineOptions.Config.IngressMetricByLogin && s.EngineOptions.Config.IngressMetricByMeasurement {
tracker.AddedMeasurementPoints = func(measurement []byte, points, values int64) {
atomic.AddInt64(&s.stats.ValuesWritten, values)
atomic.AddInt64(&s.stats.PointsWritten, points)
s.ingressMetrics.AddMetric(string(measurement), db, rp, login, points, values, 0)
}
tracker.AddedMeasurementSeries = func(measurement []byte, series int64) {
atomic.AddInt64(&s.stats.SeriesCreated, series)
s.ingressMetrics.AddMetric(string(measurement), db, rp, login, 0, 0, series)
}
} else if s.EngineOptions.Config.IngressMetricByLogin {
tracker.AddedPoints = func(points, values int64) {
atomic.AddInt64(&s.stats.ValuesWritten, values)
atomic.AddInt64(&s.stats.PointsWritten, points)
s.ingressMetrics.AddMetric("", "", "", login, points, values, 0)
}
tracker.AddedSeries = func(series int64) {
atomic.AddInt64(&s.stats.SeriesCreated, series)
s.ingressMetrics.AddMetric("", "", "", login, 0, 0, series)
}
} else if s.EngineOptions.Config.IngressMetricByMeasurement {
tracker.AddedMeasurementPoints = func(measurement []byte, points, values int64) {
atomic.AddInt64(&s.stats.ValuesWritten, values)
atomic.AddInt64(&s.stats.PointsWritten, points)
s.ingressMetrics.AddMetric(string(measurement), db, rp, "", points, values, 0)
}
tracker.AddedMeasurementSeries = func(measurement []byte, series int64) {
atomic.AddInt64(&s.stats.SeriesCreated, series)
s.ingressMetrics.AddMetric(string(measurement), db, rp, "", 0, 0, series)
}
} else {
tracker.AddedPoints = func(points, values int64) {
atomic.AddInt64(&s.stats.ValuesWritten, values)
atomic.AddInt64(&s.stats.PointsWritten, points)
}
tracker.AddedSeries = func(series int64) {
atomic.AddInt64(&s.stats.SeriesCreated, series)
}
}
return tracker
}
// 'Fake' user names for write loggging / metrics
const (
HintedHandoffUser = "_systemuser_hintedhandoff"
UnknownUser = "_systemuser_unknown" // for when user authentication is off
SelectIntoUser = "_systemuser_selectinto"
CollectdUser = "_systemuser_collectd"
OpenTsdbUser = "_systemuser_opentsdb"
GraphiteUser = "_systemuser_graphite"
UdpUser = "_systemuser_udpwriter"
MonitorUser = "_systemuser_monitor"
)
// WriteContext holds some request-scoped details about the write,
// for metrics and logging. Currently just the UserId login name.
// Eventually it could also hold a context.Context for cancellation.
type WriteContext struct {
// Could be a system UserId, e.g. HintedHandoffUser
UserId string
}
// WriteToShard writes a list of points to a shard identified by its ID.
func (s *Store) WriteToShard(writeCtx WriteContext, shardID uint64, points []models.Point) error {
s.mu.RLock()
select {
case <-s.closing:
s.mu.RUnlock()
return ErrStoreClosed
default:
}
sh := s.shards[shardID]
if sh == nil {
s.mu.RUnlock()
return ErrShardNotFound
}
epoch := s.epochs[shardID]
s.mu.RUnlock()
// enter the epoch tracker
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)
// wait for any guards before writing the points.
for _, guard := range guards {
if guard.Matches(points) {
guard.Wait()
}
}
// Ensure snapshot compactions are enabled since the shard might have been cold
// and disabled by the monitor.
if isIdle, _ := sh.IsIdle(); isIdle {
sh.SetCompactionsEnabled(true)
}
return sh.WritePoints(points, s.statsTracker(sh.database, sh.retentionPolicy, writeCtx.UserId))
}
// MeasurementNames returns a slice of all measurements. Measurements accepts an
// optional condition expression. If cond is nil, then all measurements for the
// database will be returned.
// retentionPolicy is only valid for tsi databases, inmem will error.
func (s *Store) MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, retentionPolicy string, cond influxql.Expr) ([][]byte, error) {
var filterFunc func(sh *Shard) bool
if retentionPolicy == "" {
filterFunc = byDatabase(database)
} else {
// We don't support retention-policy level index checks on inmem since we only have a database-level index.
if s.EngineOptions.IndexVersion != TSI1IndexName {
return nil, fmt.Errorf("retention policy filter for measurements not supported for index %s", s.EngineOptions.IndexVersion)
}
filterFunc = ComposeShardFilter(byDatabase(database), byRetentionPolicy(retentionPolicy))
}
s.mu.RLock()
shards := s.filterShards(filterFunc)
s.mu.RUnlock()
sfile := s.seriesFile(database)
if sfile == nil {
return nil, nil
}
// Build indexset.
is := IndexSet{Indexes: make([]Index, 0, len(shards)), SeriesFile: sfile}
for _, sh := range shards {
index, err := sh.Index()
if err != nil {
return nil, err
}
is.Indexes = append(is.Indexes, index)
}
is = is.DedupeInmemIndexes()
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return is.MeasurementNamesByExpr(auth, cond)
}
// MeasurementSeriesCounts returns the number of measurements and series in all
// the shards' indices.
func (s *Store) MeasurementSeriesCounts(database string) (measuments int, series int) {
// TODO: implement me
return 0, 0
}
type TagKeys struct {
Measurement string
Keys []string
}
type TagKeysSlice []TagKeys
func (a TagKeysSlice) Len() int { return len(a) }
func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }
// TagKeys returns the tag keys in the given database, matching the condition.
func (s *Store) TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
if len(shardIDs) == 0 {
return nil, nil
}
// take out the _name = 'mymeasurement' clause from 'FROM' clause
measurementExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(cond), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if ok && tag.Val == "_name" {
return true, nil
}
}
}
return false, nil
})
if err != nil {
return nil, err
}
// take out the _tagKey = 'mykey' clause from 'WITH KEY' clause
tagKeyExpr, filterExpr, err := influxql.PartitionExpr(remainingExpr, isTagKeyClause)
if err != nil {
return nil, err
}
if err = isBadQuoteTagValueClause(filterExpr); err != nil {
return nil, err
}
// Get all the shards we're interested in.
is := IndexSet{Indexes: make([]Index, 0, len(shardIDs))}
s.mu.RLock()
for _, sid := range shardIDs {
shard, ok := s.shards[sid]
if !ok {
continue
}
if is.SeriesFile == nil {
sfile, err := shard.SeriesFile()
if err != nil {
s.mu.RUnlock()
return nil, err
}
is.SeriesFile = sfile
}
index, err := shard.Index()
if err != nil {
s.mu.RUnlock()
return nil, err
}
is.Indexes = append(is.Indexes, index)
}
s.mu.RUnlock()
// Determine list of measurements.
is = is.DedupeInmemIndexes()
names, err := is.MeasurementNamesByExpr(nil, measurementExpr)
if err != nil {
return nil, err
}
// Iterate over each measurement.
var results []TagKeys
for _, name := range names {
// Check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Build keyset over all indexes for measurement.
tagKeySet, err := is.MeasurementTagKeysByExpr(name, tagKeyExpr)
if err != nil {
return nil, err
} else if len(tagKeySet) == 0 {
continue
}
keys := make([]string, 0, len(tagKeySet))
// If no tag value filter is present then all the tag keys can be returned
// If they have authorized series associated with them.
if filterExpr == nil {
for tagKey := range tagKeySet {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ok, err := is.TagKeyHasAuthorizedSeries(auth, []byte(name), []byte(tagKey))
if err != nil {
return nil, err
} else if ok {
keys = append(keys, tagKey)
}
}
sort.Strings(keys)
// Add to resultset.
results = append(results, TagKeys{
Measurement: string(name),
Keys: keys,
})
continue
}
// Tag filter provided so filter keys first.
// Sort the tag keys.
for k := range tagKeySet {
keys = append(keys, k)
}
sort.Strings(keys)
// Filter against tag values, skip if no values exist.
values, err := is.MeasurementTagKeyValuesByExpr(auth, name, keys, filterExpr, true, s.Logger)
if err != nil {
return nil, err
}
// Filter final tag keys using the matching values. If a key has one or
// more matching values then it will be included in the final set.
finalKeys := keys[:0] // Use same backing array as keys to save allocation.
for i, k := range keys {
if len(values[i]) > 0 {
// Tag key k has one or more matching tag values.
finalKeys = append(finalKeys, k)
}
}
// Add to resultset.
results = append(results, TagKeys{
Measurement: string(name),
Keys: finalKeys,
})
}
return results, nil
}
type TagValues struct {
Measurement string
Values []KeyValue
}
type TagValuesSlice []TagValues
func (a TagValuesSlice) Len() int { return len(a) }
func (a TagValuesSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TagValuesSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }
// tagValues is a temporary representation of a TagValues. Rather than allocating
// KeyValues as we build up a TagValues object, We hold off allocating KeyValues
// until we have merged multiple tagValues together.
type tagValues struct {
name []byte
keys []string
values [][]string
}
// Is a slice of tagValues that can be sorted by measurement.
type tagValuesSlice []tagValues
func (a tagValuesSlice) Len() int { return len(a) }
func (a tagValuesSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
func isTagKeyClause(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if ok && tag.Val == "_tagKey" {
return true, nil
}
case influxql.OR, influxql.AND:
ok1, err := isTagKeyClause(e.LHS)
if err != nil {
return false, err
}
ok2, err := isTagKeyClause(e.RHS)
if err != nil {
return false, err
}
return ok1 && ok2, nil
}
case *influxql.ParenExpr:
return isTagKeyClause(e.Expr)
}
return false, nil
}
func isBadQuoteTagValueClause(e influxql.Expr) error {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ:
_, lOk := e.LHS.(*influxql.VarRef)
_, rOk := e.RHS.(*influxql.VarRef)
if lOk && rOk {
return fmt.Errorf("bad WHERE clause for metaquery; one term must be a string literal tag value within single quotes: %s", e.String())
}
case influxql.OR, influxql.AND:
if err := isBadQuoteTagValueClause(e.LHS); err != nil {
return err
} else if err = isBadQuoteTagValueClause(e.RHS); err != nil {
return err
} else {
return nil
}
}
case *influxql.ParenExpr:
return isBadQuoteTagValueClause(e.Expr)
}
return nil
}
// TagValues returns the tag keys and values for the provided shards, where the
// tag values satisfy the provided condition.
func (s *Store) TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
if len(shardIDs) == 0 {
return nil, nil
}
if cond == nil {
return nil, errors.New("a condition is required")
}
// take out the _name = 'mymeasurement' clause from 'FROM' clause
measurementExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(cond), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if ok && tag.Val == "_name" {
return true, nil
}
}
}
return false, nil
})
if err != nil {
return nil, err
}
// take out the _tagKey = 'mykey' clause from 'WITH KEY' / 'WITH KEY IN' clause
tagKeyExpr, filterExpr, err := influxql.PartitionExpr(remainingExpr, isTagKeyClause)
if err != nil {
return nil, err
}
if err = isBadQuoteTagValueClause(filterExpr); err != nil {
return nil, err
}
// Build index set to work on.
is := IndexSet{Indexes: make([]Index, 0, len(shardIDs))}
s.mu.RLock()
for _, sid := range shardIDs {
shard, ok := s.shards[sid]
if !ok {
continue
}
if is.SeriesFile == nil {
sfile, err := shard.SeriesFile()
if err != nil {
s.mu.RUnlock()
return nil, err
}
is.SeriesFile = sfile
}
index, err := shard.Index()
if err != nil {
s.mu.RUnlock()
return nil, err
}
is.Indexes = append(is.Indexes, index)
}
s.mu.RUnlock()
is = is.DedupeInmemIndexes()
var maxMeasurements int // Hint as to lower bound on number of measurements.
// names will be sorted by MeasurementNamesByExpr.
// Authorisation can be done later on, when series may have been filtered
// out by other conditions.
names, err := is.MeasurementNamesByExpr(nil, measurementExpr)
if err != nil {
return nil, err
}
if len(names) > maxMeasurements {
maxMeasurements = len(names)
}
// Stores each list of TagValues for each measurement.
allResults := make([]tagValues, 0, len(names))
// Iterate over each matching measurement in the shard. For each
// measurement we'll get the matching tag keys (e.g., when a WITH KEYS)
// statement is used, and we'll then use those to fetch all the relevant
// values from matching series. Series may be filtered using a WHERE
// filter.
for _, name := range names {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Determine a list of keys from condition.
keySet, err := is.MeasurementTagKeysByExpr(name, tagKeyExpr)
if err != nil {
return nil, err
}
if len(keySet) == 0 {
// No matching tag keys for this measurement
continue
}
result := tagValues{
name: name,
keys: make([]string, 0, len(keySet)),
}
// Add the keys to the tagValues and sort them.
for k := range keySet {
result.keys = append(result.keys, k)
}
sort.Strings(result.keys)
// get all the tag values for each key in the keyset.
// Each slice in the results contains the sorted values associated
// associated with each tag key for the measurement from the key set.
if result.values, err = is.MeasurementTagKeyValuesByExpr(auth, name, result.keys, filterExpr, true, s.Logger); err != nil {
return nil, err
}
// remove any tag keys that didn't have any authorized values
j := 0
for i := range result.keys {
if len(result.values[i]) == 0 {
continue
}
result.keys[j] = result.keys[i]
result.values[j] = result.values[i]
j++
}
result.keys = result.keys[:j]
result.values = result.values[:j]
// only include result if there are keys with values
if len(result.keys) > 0 {
allResults = append(allResults, result)
}
}
// Not sure this is necessary, should be pre-sorted
sort.Sort(tagValuesSlice(allResults))
result := make([]TagValues, 0, maxMeasurements)
for _, r := range allResults {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
nextResult := makeTagValues(r)
if len(nextResult.Values) > 0 {
result = append(result, nextResult)
}
}
return result, nil
}
func makeTagValues(tv tagValues) TagValues {
var result TagValues
result.Measurement = string(tv.name)
// TODO(edd): will be too small likely. Find a hint?
result.Values = make([]KeyValue, 0, len(tv.values))
for ki, key := range tv.keys {
for _, value := range tv.values[ki] {
result.Values = append(result.Values, KeyValue{Key: key, Value: value})
}
}
return result
}
func (s *Store) monitorShards() {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
t2 := time.NewTicker(time.Minute)
defer t2.Stop()
for {
select {
case <-s.closing:
return
case <-t.C:
s.mu.RLock()
for _, sh := range s.shards {
if isIdle, _ := sh.IsIdle(); isIdle {
if err := sh.Free(); err != nil {
s.Logger.Warn("Error while freeing cold shard resources",
zap.Error(err),
logger.Shard(sh.ID()))
}
} else {
sh.SetCompactionsEnabled(true)
}
}
s.mu.RUnlock()
case <-t2.C:
if s.EngineOptions.Config.MaxValuesPerTag == 0 {
continue
}
s.mu.RLock()
shards := s.filterShards(byIndexType(InmemIndexName))
s.mu.RUnlock()
// No inmem shards...
if len(shards) == 0 {
continue
}
var dbLock sync.Mutex
databases := make(map[string]struct{}, len(shards))
s.walkShards(shards, func(sh *Shard) error {
db := sh.database
// Only process 1 shard from each database
dbLock.Lock()
if _, ok := databases[db]; ok {
dbLock.Unlock()
return nil
}
databases[db] = struct{}{}
dbLock.Unlock()
sfile := s.seriesFile(sh.database)
if sfile == nil {
return nil
}
firstShardIndex, err := sh.Index()
if err != nil {
return err
}
index, err := sh.Index()
if err != nil {
return err
}
// inmem shards share the same index instance so just use the first one to avoid
// allocating the same measurements repeatedly
indexSet := IndexSet{Indexes: []Index{firstShardIndex}, SeriesFile: sfile}
names, err := indexSet.MeasurementNamesByExpr(nil, nil)
if err != nil {
s.Logger.Warn("Cannot retrieve measurement names",
zap.Error(err),
logger.Shard(sh.ID()),
logger.Database(db))
return nil
}
indexSet.Indexes = []Index{index}
for _, name := range names {
indexSet.ForEachMeasurementTagKey(name, func(k []byte) error {
n := sh.TagKeyCardinality(name, k)
perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
}
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.Logger.Warn("max-values-per-tag limit may be exceeded soon",
zap.String("perc", fmt.Sprintf("%d%%", perc)),
zap.Int("n", n),
zap.Int("max", s.EngineOptions.Config.MaxValuesPerTag),
logger.Database(db),
zap.ByteString("measurement", name),
zap.ByteString("tag", k))
}
return nil
})
}
return nil
})
}
}
}
// KeyValue holds a string key and a string value.
type KeyValue struct {
Key, Value string
}
// KeyValues is a sortable slice of KeyValue.
type KeyValues []KeyValue
// Len implements sort.Interface.
func (a KeyValues) Len() int { return len(a) }
// Swap implements sort.Interface.
func (a KeyValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Less implements sort.Interface. Keys are compared before values.
func (a KeyValues) Less(i, j int) bool {
ki, kj := a[i].Key, a[j].Key
if ki == kj {
return a[i].Value < a[j].Value
}
return ki < kj
}
// decodeStorePath extracts the database and retention policy names
// from a given shard or WAL path.
func decodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {
// shardOrWALPath format: /maybe/absolute/base/then/:database/:retentionPolicy/:nameOfShardOrWAL
// Discard the last part of the path (the shard name or the wal name).
path, _ := filepath.Split(filepath.Clean(shardOrWALPath))
// Extract the database and retention policy.
path, rp := filepath.Split(filepath.Clean(path))
_, db := filepath.Split(filepath.Clean(path))
return db, rp
}
// relativePath will expand out the full paths passed in and return
// the relative shard path from the store
func relativePath(storePath, shardPath string) (string, error) {
path, err := filepath.Abs(storePath)
if err != nil {
return "", fmt.Errorf("store abs path: %s", err)
}
fp, err := filepath.Abs(shardPath)
if err != nil {
return "", fmt.Errorf("file abs path: %s", err)
}
name, err := filepath.Rel(path, fp)
if err != nil {
return "", fmt.Errorf("file rel path: %s", err)
}
return name, nil
}
type shardSet struct {
store *Store
db string
}
func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
s.store.mu.RLock()
shards := s.store.filterShards(byDatabase(s.db))
s.store.mu.RUnlock()
for _, sh := range shards {
idx, err := sh.Index()
if err != nil {
return err
}
f(idx.SeriesIDSet())
}
return nil
}
type shardInfo struct {
id uint64
db string
rp string
}
// findShards returns a list of all shards and their db / rp that are found
// in s.path.
func (s *Store) findShards(log *zap.Logger) ([]shardInfo, error) {
var shards []shardInfo
// Determine how many shards we need to open by checking the store path.
dbDirs, err := os.ReadDir(s.path)
if err != nil {
return nil, err
}
for _, db := range dbDirs {
rpDirs, err := s.getRetentionPolicyDirs(db, log)
if err != nil {
return nil, err
} else if rpDirs == nil {
continue
}
for _, rp := range rpDirs {
shardDirs, err := s.getShards(rp, db, log)
if err != nil {
return nil, err
} else if shardDirs == nil {
continue
}
for _, sh := range shardDirs {
fullPath := filepath.Join(s.path, db.Name(), rp.Name())
// Series file should not be in a retention policy but skip just in case.
if sh.Name() == SeriesFileDirectory {
log.Warn("Skipping series file in retention policy dir", zap.String("path", fullPath))
continue
}
// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh.Name(), 10, 64)
if err != nil {
log.Warn("invalid shard ID found at path", zap.String("path", fullPath))
continue
}
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db.Name(), rp.Name(), shardID) {
log.Info("skipping shard", zap.String("path", fullPath), logger.Shard(shardID))
continue
}
shards = append(shards, shardInfo{id: shardID, db: db.Name(), rp: rp.Name()})
}
}
}
return shards, nil
}
func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
return nil, nil
}
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
return nil, nil
}
// Load each retention policy within the database directory.
rpDirs, err := os.ReadDir(dbPath)
if err != nil {
return nil, err
}
return rpDirs, nil
}
func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name())
if !rpDir.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory"))
return nil, nil
}
// The .series directory is not a retention policy.
if rpDir.Name() == SeriesFileDirectory {
return nil, nil
}
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter"))
return nil, nil
}
shardDirs, err := os.ReadDir(rpPath)
if err != nil {
return nil, err
}
return shardDirs, nil
}