influxdb/tsdb/engine/tsm1/engine.go

3217 lines
92 KiB
Go

// Package tsm1 provides a TSDB in the Time Structured Merge tree format.
package tsm1 // import "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
import (
"archive/tar"
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
"github.com/influxdata/influxdb/v2/pkg/estimator"
"github.com/influxdata/influxdb/v2/pkg/file"
"github.com/influxdata/influxdb/v2/pkg/limiter"
"github.com/influxdata/influxdb/v2/pkg/metrics"
"github.com/influxdata/influxdb/v2/pkg/radix"
intar "github.com/influxdata/influxdb/v2/pkg/tar"
"github.com/influxdata/influxdb/v2/pkg/tracing"
"github.com/influxdata/influxdb/v2/tsdb"
_ "github.com/influxdata/influxdb/v2/tsdb/index"
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
"github.com/influxdata/influxql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
//go:generate -command tmpl go run github.com/benbjohnson/tmpl
//go:generate tmpl -data=@iterator.gen.go.tmpldata iterator.gen.go.tmpl engine.gen.go.tmpl array_cursor.gen.go.tmpl array_cursor_iterator.gen.go.tmpl
// The file store generate uses a custom modified tmpl
// to support adding templated data from the command line.
// This can probably be worked into the upstream tmpl
// but isn't at the moment.
//go:generate go run ../../../tools/tmpl -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store.gen.go
//go:generate go run ../../../tools/tmpl -d isArray=y -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store_array.gen.go
//go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl
//go:generate tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl
//go:generate tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl
func init() {
tsdb.RegisterEngine("tsm1", NewEngine)
}
var (
// Ensure Engine implements the interface.
_ tsdb.Engine = &Engine{}
// Static objects to prevent small allocs.
timeBytes = []byte("time")
keyFieldSeparatorBytes = []byte(keyFieldSeparator)
emptyBytes = []byte{}
)
var (
tsmGroup = metrics.MustRegisterGroup("tsm1")
numberOfRefCursorsCounter = metrics.MustRegisterCounter("cursors_ref", metrics.WithGroup(tsmGroup))
numberOfAuxCursorsCounter = metrics.MustRegisterCounter("cursors_aux", metrics.WithGroup(tsmGroup))
numberOfCondCursorsCounter = metrics.MustRegisterCounter("cursors_cond", metrics.WithGroup(tsmGroup))
planningTimer = metrics.MustRegisterTimer("planning_time", metrics.WithGroup(tsmGroup))
)
const (
// keyFieldSeparator separates the series key from the field name in the composite key
// that identifies a specific field in series
keyFieldSeparator = "#!~#"
// deleteFlushThreshold is the size in bytes of a batch of series keys to delete.
deleteFlushThreshold = 50 * 1024 * 1024
)
// Engine represents a storage engine with compressed blocks.
type Engine struct {
mu sync.RWMutex
index tsdb.Index
// The following group of fields is used to track the state of level compactions within the
// Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is
// used to signal those goroutines to shutdown. Every request to disable level compactions will
// call 'Wait' on 'wg', with the first goroutine to arrive (levelWorkers == 0 while holding the
// lock) will close the done channel and re-assign 'nil' to the variable. Re-enabling will
// decrease 'levelWorkers', and when it decreases to zero, level compactions will be started
// back up again.
wg *sync.WaitGroup // waitgroup for active level compaction goroutines
done chan struct{} // channel to signal level compactions to stop
levelWorkers int // Number of "workers" that expect compactions to be in a disabled state
snapDone chan struct{} // channel to signal snapshot compactions to stop
snapWG *sync.WaitGroup // waitgroup for running snapshot compactions
id uint64
path string
sfile *tsdb.SeriesFile
logger *zap.Logger // Logger to be used for important messages
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
fieldset *tsdb.MeasurementFieldSet
WAL *WAL
Cache *Cache
Compactor *Compactor
CompactionPlan CompactionPlanner
FileStore *FileStore
MaxPointsPerBlock int
// CacheFlushMemorySizeThreshold specifies the minimum size threshold for
// the cache when the engine should write a snapshot to a TSM file
CacheFlushMemorySizeThreshold uint64
// CacheFlushWriteColdDuration specifies the length of time after which if
// no writes have been committed to the WAL, the engine will write
// a snapshot of the cache to a TSM file
CacheFlushWriteColdDuration time.Duration
// WALEnabled determines whether writes to the WAL are enabled. If this is false,
// writes will only exist in the cache and can be lost if a snapshot has not occurred.
WALEnabled bool
// Invoked when creating a backup file "as new".
formatFileName FormatFileNameFunc
// Controls whether to enabled compactions when the engine is open
enableCompactionsOnOpen bool
stats *compactionMetrics
activeCompactions *compactionCounter
// Limiter for concurrent compactions.
compactionLimiter limiter.Fixed
scheduler *scheduler
// provides access to the total set of series IDs
seriesIDSets tsdb.SeriesIDSets
// seriesTypeMap maps a series key to field type
seriesTypeMap *radix.Tree
// muDigest ensures only one goroutine can generate a digest at a time.
muDigest sync.RWMutex
}
// NewEngine returns a new instance of Engine.
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine {
etags := tsdb.EngineTags{
Path: path,
WalPath: walPath,
Id: fmt.Sprintf("%d", id),
Bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db
EngineVersion: opt.EngineVersion,
}
var wal *WAL
if opt.WALEnabled {
wal = NewWAL(walPath, opt.Config.WALMaxConcurrentWrites, opt.Config.WALMaxWriteDelay, etags)
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
}
fs := NewFileStore(path, etags)
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
}
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags)
c := NewCompactor()
c.Dir = path
c.FileStore = fs
c.RateLimit = opt.CompactionThroughputLimiter
var planner CompactionPlanner = NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration))
if opt.CompactionPlannerCreator != nil {
planner = opt.CompactionPlannerCreator(opt.Config).(CompactionPlanner)
planner.SetFileStore(fs)
}
stats := newEngineMetrics(etags)
activeCompactions := &compactionCounter{}
e := &Engine{
id: id,
path: path,
index: idx,
sfile: sfile,
logger: zap.NewNop(),
traceLogger: zap.NewNop(),
traceLogging: opt.Config.TraceLoggingEnabled,
WAL: wal,
Cache: cache,
FileStore: fs,
Compactor: c,
CompactionPlan: planner,
activeCompactions: activeCompactions,
scheduler: newScheduler(activeCompactions, opt.CompactionLimiter.Capacity()),
CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize),
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
WALEnabled: opt.WALEnabled,
formatFileName: DefaultFormatFileName,
stats: stats,
compactionLimiter: opt.CompactionLimiter,
seriesIDSets: opt.SeriesIDSets,
}
// Feature flag to enable per-series type checking, by default this is off and
// e.seriesTypeMap will be nil.
if os.Getenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED") != "" {
e.seriesTypeMap = radix.New()
}
if e.traceLogging {
fs.enableTraceLogging(true)
if e.WALEnabled {
e.WAL.enableTraceLogging(true)
}
}
return e
}
func (e *Engine) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
e.Compactor.WithFormatFileNameFunc(formatFileNameFunc)
e.formatFileName = formatFileNameFunc
}
func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
e.FileStore.WithParseFileNameFunc(parseFileNameFunc)
e.Compactor.WithParseFileNameFunc(parseFileNameFunc)
}
// Digest returns a reader for the shard's digest.
func (e *Engine) Digest() (io.ReadCloser, int64, error) {
e.muDigest.Lock()
defer e.muDigest.Unlock()
log, logEnd := logger.NewOperation(context.TODO(), e.logger, "Engine digest", "tsm1_digest")
defer logEnd()
log.Info("Starting digest", zap.String("tsm1_path", e.path))
digestPath := filepath.Join(e.path, DigestFilename)
// Get a list of tsm file paths from the FileStore.
files := e.FileStore.Files()
tsmfiles := make([]string, 0, len(files))
for _, f := range files {
tsmfiles = append(tsmfiles, f.Path())
}
// See if there's a fresh digest cached on disk.
fresh, reason := DigestFresh(e.path, tsmfiles, e.LastModified())
if fresh {
f, err := os.Open(digestPath)
if err == nil {
fi, err := f.Stat()
if err != nil {
log.Info("Digest aborted, couldn't stat digest file", logger.Shard(e.id), zap.Error(err))
return nil, 0, err
}
log.Info("Digest is fresh", logger.Shard(e.id), zap.String("path", digestPath))
// Return the cached digest.
return f, fi.Size(), nil
}
}
log.Info("Digest stale", logger.Shard(e.id), zap.String("reason", reason))
// Either no digest existed or the existing one was stale
// so generate a new digest.
// Make sure the directory exists, in case it was deleted for some reason.
if err := os.MkdirAll(e.path, 0777); err != nil {
log.Info("Digest aborted, problem creating shard directory path", zap.Error(err))
return nil, 0, err
}
// Create a tmp file to write the digest to.
tf, err := os.Create(digestPath + ".tmp")
if err != nil {
log.Info("Digest aborted, problem creating tmp digest", zap.Error(err))
return nil, 0, err
}
// Write the new digest to the tmp file.
if err := Digest(e.path, tsmfiles, tf); err != nil {
log.Info("Digest aborted, problem writing tmp digest", zap.Error(err))
tf.Close()
os.Remove(tf.Name())
return nil, 0, err
}
// Rename the temporary digest file to the actual digest file.
if err := file.RenameFile(tf.Name(), digestPath); err != nil {
log.Info("Digest aborted, problem renaming tmp digest", zap.Error(err))
return nil, 0, err
}
// Create and return a reader for the new digest file.
f, err := os.Open(digestPath)
if err != nil {
log.Info("Digest aborted, opening new digest", zap.Error(err))
return nil, 0, err
}
fi, err := f.Stat()
if err != nil {
log.Info("Digest aborted, can't stat new digest", zap.Error(err))
f.Close()
return nil, 0, err
}
log.Info("Digest written", zap.String("tsm1_digest_path", digestPath), zap.Int64("size", fi.Size()))
return f, fi.Size(), nil
}
// SetEnabled sets whether the engine is enabled.
func (e *Engine) SetEnabled(enabled bool) {
e.enableCompactionsOnOpen = enabled
e.SetCompactionsEnabled(enabled)
}
// SetCompactionsEnabled enables compactions on the engine. When disabled
// all running compactions are aborted and new compactions stop running.
func (e *Engine) SetCompactionsEnabled(enabled bool) {
if enabled {
e.enableSnapshotCompactions()
e.enableLevelCompactions(false)
} else {
e.disableSnapshotCompactions()
e.disableLevelCompactions(false)
}
}
// enableLevelCompactions will request that level compactions start back up again
//
// 'wait' signifies that a corresponding call to disableLevelCompactions(true) was made at some
// point, and the associated task that required disabled compactions is now complete
func (e *Engine) enableLevelCompactions(wait bool) {
// If we don't need to wait, see if we're already enabled
if !wait {
e.mu.RLock()
if e.done != nil {
e.mu.RUnlock()
return
}
e.mu.RUnlock()
}
e.mu.Lock()
if wait {
e.levelWorkers -= 1
}
if e.levelWorkers != 0 || e.done != nil {
// still waiting on more workers or already enabled
e.mu.Unlock()
return
}
// last one to enable, start things back up
e.Compactor.EnableCompactions()
e.done = make(chan struct{})
wg := new(sync.WaitGroup)
wg.Add(1)
e.wg = wg
e.mu.Unlock()
go func() { defer wg.Done(); e.compact(wg) }()
}
// disableLevelCompactions will stop level compactions before returning.
//
// If 'wait' is set to true, then a corresponding call to enableLevelCompactions(true) will be
// required before level compactions will start back up again.
func (e *Engine) disableLevelCompactions(wait bool) {
e.mu.Lock()
old := e.levelWorkers
if wait {
e.levelWorkers += 1
}
// Hold onto the current done channel so we can wait on it if necessary
waitCh := e.done
wg := e.wg
if old == 0 && e.done != nil {
// It's possible we have closed the done channel and released the lock and another
// goroutine has attempted to disable compactions. We're current in the process of
// disabling them so check for this and wait until the original completes.
select {
case <-e.done:
e.mu.Unlock()
return
default:
}
// Prevent new compactions from starting
e.Compactor.DisableCompactions()
// Stop all background compaction goroutines
close(e.done)
e.mu.Unlock()
wg.Wait()
// Signal that all goroutines have exited.
e.mu.Lock()
e.done = nil
e.mu.Unlock()
return
}
e.mu.Unlock()
// Compaction were already disabled.
if waitCh == nil {
return
}
// We were not the first caller to disable compactions and they were in the process
// of being disabled. Wait for them to complete before returning.
<-waitCh
wg.Wait()
}
func (e *Engine) enableSnapshotCompactions() {
// Check if already enabled under read lock
e.mu.RLock()
if e.snapDone != nil {
e.mu.RUnlock()
return
}
e.mu.RUnlock()
// Check again under write lock
e.mu.Lock()
if e.snapDone != nil {
e.mu.Unlock()
return
}
e.Compactor.EnableSnapshots()
e.snapDone = make(chan struct{})
wg := new(sync.WaitGroup)
wg.Add(1)
e.snapWG = wg
e.mu.Unlock()
go func() { defer wg.Done(); e.compactCache() }()
}
func (e *Engine) disableSnapshotCompactions() {
e.mu.Lock()
if e.snapDone == nil {
e.mu.Unlock()
return
}
// We may be in the process of stopping snapshots. See if the channel
// was closed.
select {
case <-e.snapDone:
e.mu.Unlock()
return
default:
}
// first one here, disable and wait for completion
close(e.snapDone)
e.Compactor.DisableSnapshots()
wg := e.snapWG
e.mu.Unlock()
// Wait for the snapshot goroutine to exit.
wg.Wait()
// Signal that the goroutines are exit and everything is stopped by setting
// snapDone to nil.
e.mu.Lock()
e.snapDone = nil
e.mu.Unlock()
// If the cache is empty, free up its resources as well.
if e.Cache.Size() == 0 {
e.Cache.Free()
}
}
// ScheduleFullCompaction will force the engine to fully compact all data stored.
// This will cancel and running compactions and snapshot any data in the cache to
// TSM files. This is an expensive operation.
func (e *Engine) ScheduleFullCompaction() error {
// Snapshot any data in the cache
if err := e.WriteSnapshot(); err != nil {
return err
}
// Cancel running compactions
e.SetCompactionsEnabled(false)
// Ensure compactions are restarted
defer e.SetCompactionsEnabled(true)
// Force the planner to only create a full plan.
e.CompactionPlan.ForceFull()
return nil
}
// Path returns the path the engine was opened with.
func (e *Engine) Path() string { return e.path }
func (e *Engine) MeasurementExists(name []byte) (bool, error) {
return e.index.MeasurementExists(name)
}
func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
return e.index.MeasurementNamesByRegex(re)
}
// MeasurementFieldSet returns the measurement field set.
func (e *Engine) MeasurementFieldSet() *tsdb.MeasurementFieldSet {
return e.fieldset
}
// MeasurementFields returns the measurement fields for a measurement.
func (e *Engine) MeasurementFields(measurement []byte) *tsdb.MeasurementFields {
return e.fieldset.CreateFieldsIfNotExists(measurement)
}
func (e *Engine) HasTagKey(name, key []byte) (bool, error) {
return e.index.HasTagKey(name, key)
}
func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
return e.index.MeasurementTagKeysByExpr(name, expr)
}
func (e *Engine) TagKeyCardinality(name, key []byte) int {
return e.index.TagKeyCardinality(name, key)
}
// SeriesN returns the unique number of series in the index.
func (e *Engine) SeriesN() int64 {
return e.index.SeriesN()
}
// MeasurementsSketches returns sketches that describe the cardinality of the
// measurements in this shard and measurements that were in this shard, but have
// been tombstoned.
func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
return e.index.MeasurementsSketches()
}
// SeriesSketches returns sketches that describe the cardinality of the
// series in this shard and series that were in this shard, but have
// been tombstoned.
func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
return e.index.SeriesSketches()
}
// LastModified returns the time when this shard was last modified.
func (e *Engine) LastModified() time.Time {
fsTime := e.FileStore.LastModified()
if e.WALEnabled && e.WAL.LastWriteTime().After(fsTime) {
return e.WAL.LastWriteTime()
}
return fsTime
}
var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(tsdb.EngineLabelNames())
// PrometheusCollectors returns all prometheus metrics for the tsm1 package.
func PrometheusCollectors() []prometheus.Collector {
collectors := []prometheus.Collector{
globalCompactionMetrics.Duration,
globalCompactionMetrics.Active,
globalCompactionMetrics.Failed,
globalCompactionMetrics.Queued,
}
collectors = append(collectors, FileStoreCollectors()...)
collectors = append(collectors, CacheCollectors()...)
collectors = append(collectors, WALCollectors()...)
return collectors
}
const (
storageNamespace = "storage"
engineSubsystem = "compactions"
level1 = "1"
level2 = "2"
level3 = "3"
levelOpt = "opt"
levelFull = "full"
levelKey = "level"
levelCache = "cache"
)
func labelForLevel(l int) prometheus.Labels {
switch l {
case 1:
return prometheus.Labels{levelKey: level1}
case 2:
return prometheus.Labels{levelKey: level2}
case 3:
return prometheus.Labels{levelKey: level3}
}
panic(fmt.Sprintf("labelForLevel: level out of range %d", l))
}
func newAllCompactionMetrics(labelNames []string) *compactionMetrics {
labelNamesWithLevel := append(labelNames, levelKey)
return &compactionMetrics{
Duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: storageNamespace,
Subsystem: engineSubsystem,
Name: "duration_seconds",
Help: "Histogram of compactions by level since startup",
// 10 minute compactions seem normal, 1h40min is high
Buckets: []float64{60, 600, 6000},
}, labelNamesWithLevel),
Active: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: engineSubsystem,
Name: "active",
Help: "Gauge of compactions (by level) currently running",
}, labelNamesWithLevel),
Failed: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: engineSubsystem,
Name: "failed",
Help: "Counter of TSM compactions (by level) that have failed due to error",
}, labelNamesWithLevel),
Queued: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: engineSubsystem,
Name: "queued",
Help: "Counter of TSM compactions (by level) that are currently queued",
}, labelNamesWithLevel),
}
}
type compactionCounter struct {
l1 int64
l2 int64
l3 int64
full int64
optimize int64
}
func (c *compactionCounter) countForLevel(l int) *int64 {
switch l {
case 1:
return &c.l1
case 2:
return &c.l2
case 3:
return &c.l3
}
return nil
}
// engineMetrics holds statistics across all instantiated engines
type compactionMetrics struct {
Duration prometheus.ObserverVec
Active *prometheus.GaugeVec
Queued *prometheus.GaugeVec
Failed *prometheus.CounterVec
}
func newEngineMetrics(tags tsdb.EngineTags) *compactionMetrics {
engineLabels := tags.GetLabels()
return &compactionMetrics{
Duration: globalCompactionMetrics.Duration.MustCurryWith(engineLabels),
Active: globalCompactionMetrics.Active.MustCurryWith(engineLabels),
Failed: globalCompactionMetrics.Failed.MustCurryWith(engineLabels),
Queued: globalCompactionMetrics.Queued.MustCurryWith(engineLabels),
}
}
// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
func (e *Engine) DiskSize() int64 {
var walDiskSizeBytes int64
if e.WALEnabled {
walDiskSizeBytes = e.WAL.DiskSizeBytes()
}
return e.FileStore.DiskSizeBytes() + walDiskSizeBytes
}
// Open opens and initializes the engine.
func (e *Engine) Open(ctx context.Context) error {
if err := os.MkdirAll(e.path, 0777); err != nil {
return err
}
if err := e.cleanup(); err != nil {
return err
}
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"), e.logger)
if err != nil {
e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v. Rebuilding.", err))
}
e.mu.Lock()
e.fieldset = fields
e.mu.Unlock()
e.index.SetFieldSet(fields)
if e.WALEnabled {
if err := e.WAL.Open(); err != nil {
return err
}
}
if err := e.FileStore.Open(ctx); err != nil {
return err
}
if e.WALEnabled {
if err := e.reloadCache(); err != nil {
return err
}
}
e.Compactor.Open()
if e.enableCompactionsOnOpen {
e.SetCompactionsEnabled(true)
}
return nil
}
// Close closes the engine. Subsequent calls to Close are a nop.
func (e *Engine) Close() error {
e.SetCompactionsEnabled(false)
// Lock now and close everything else down.
e.mu.Lock()
defer e.mu.Unlock()
e.done = nil // Ensures that the channel will not be closed again.
var err error = nil
err = e.fieldset.Close()
if err2 := e.FileStore.Close(); err2 != nil && err == nil {
err = err2
}
if e.WALEnabled {
if err2 := e.WAL.Close(); err2 != nil && err == nil {
err = err2
}
}
return err
}
// WithLogger sets the logger for the engine.
func (e *Engine) WithLogger(log *zap.Logger) {
e.logger = log.With(zap.String("engine", "tsm1"))
if e.traceLogging {
e.traceLogger = e.logger
}
if e.WALEnabled {
e.WAL.WithLogger(e.logger)
}
e.FileStore.WithLogger(e.logger)
}
// LoadMetadataIndex loads the shard metadata into memory.
//
// Note, it not safe to call LoadMetadataIndex concurrently. LoadMetadataIndex
// should only be called when initialising a new Engine.
func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
now := time.Now()
// Save reference to index for iterator creation.
e.index = index
// If we have the cached fields index on disk, we can skip scanning all the TSM files.
if !e.fieldset.IsEmpty() {
return nil
}
keys := make([][]byte, 0, 10000)
fieldTypes := make([]influxql.DataType, 0, 10000)
if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
fieldType := BlockTypeToInfluxQLDataType(typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", typ)
}
keys = append(keys, key)
fieldTypes = append(fieldTypes, fieldType)
if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
return nil
}); err != nil {
return err
}
if len(keys) > 0 {
// Add remaining partial batch from FileStore.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
// load metadata from the Cache
if err := e.Cache.ApplyEntryFn(func(key []byte, entry *entry) error {
fieldType, err := entry.values.InfluxQLType()
if err != nil {
e.logger.Info("Error getting the data type of values for key", zap.ByteString("key", key), zap.Error(err))
}
keys = append(keys, key)
fieldTypes = append(fieldTypes, fieldType)
if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
return nil
}); err != nil {
return err
}
if len(keys) > 0 {
// Add remaining partial batch from FileStore.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
}
// Save the field set index so we don't have to rebuild it next time
if err := e.fieldset.WriteToFile(); err != nil {
return err
}
e.traceLogger.Info("Meta data index for shard loaded", zap.Uint64("id", shardID), zap.Duration("duration", time.Since(now)))
return nil
}
// IsIdle returns true if the cache is empty, there are no running compactions and the
// shard is fully compacted.
func (e *Engine) IsIdle() (state bool, reason string) {
c := []struct {
ActiveCompactions *int64
LogMessage string
}{
// We don't actually track cache compactions: {&e.status.CacheCompactionsActive, "not idle because of active Cache compactions"},
{&e.activeCompactions.l1, "not idle because of active Level1 compactions"},
{&e.activeCompactions.l2, "not idle because of active Level2 compactions"},
{&e.activeCompactions.l3, "not idle because of active Level3 compactions"},
{&e.activeCompactions.full, "not idle because of active Full compactions"},
{&e.activeCompactions.optimize, "not idle because of active TSM Optimization compactions"},
}
for _, compactionState := range c {
count := atomic.LoadInt64(compactionState.ActiveCompactions)
if count > 0 {
return false, compactionState.LogMessage
}
}
if cacheSize := e.Cache.Size(); cacheSize > 0 {
return false, "not idle because cache size is nonzero"
} else if c, r := e.CompactionPlan.FullyCompacted(); !c {
return false, r
} else {
return true, ""
}
}
// Free releases any resources held by the engine to free up memory or CPU.
func (e *Engine) Free() error {
e.Cache.Free()
return e.FileStore.Free()
}
// Backup writes a tar archive of any TSM files modified since the passed
// in time to the passed in writer. The basePath will be prepended to the names
// of the files in the archive. It will force a snapshot of the WAL first
// then perform the backup with a read lock against the file store. This means
// that new TSM files will not be able to be created in this shard while the
// backup is running. For shards that are still actively getting writes, this
// could cause the WAL to backup, increasing memory usage and eventually rejecting writes.
func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
var err error
var path string
path, err = e.CreateSnapshot(true)
if err != nil {
return err
}
// Remove the temporary snapshot dir
defer os.RemoveAll(path)
return intar.Stream(w, path, basePath, intar.SinceFilterTarFile(since))
}
func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
return func(fi os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
if !strings.HasSuffix(fi.Name(), ".tsm") {
return intar.StreamFile(fi, shardRelativePath, fullPath, tw)
}
f, err := os.Open(fullPath)
if err != nil {
return err
}
r, err := NewTSMReader(f)
if err != nil {
return err
}
// Grab the tombstone file if one exists.
if ts := r.TombstoneStats(); ts.TombstoneExists {
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw)
}
min, max := r.TimeRange()
stun := start.UnixNano()
eun := end.UnixNano()
// We overlap time ranges, we need to filter the file
if min >= stun && min <= eun && max > eun || // overlap to the right
max >= stun && max <= eun && min < stun || // overlap to the left
min <= stun && max >= eun { // TSM file has a range LARGER than the boundary
err := e.filterFileToBackup(r, fi, shardRelativePath, fullPath, start.UnixNano(), end.UnixNano(), tw)
if err != nil {
if err := r.Close(); err != nil {
return err
}
return err
}
}
// above is the only case where we need to keep the reader open.
if err := r.Close(); err != nil {
return err
}
// the TSM file is 100% inside the range, so we can just write it without scanning each block
if min >= start.UnixNano() && max <= end.UnixNano() {
if err := intar.StreamFile(fi, shardRelativePath, fullPath, tw); err != nil {
return err
}
}
return nil
}
}
func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
path, err := e.CreateSnapshot(false)
if err != nil {
return err
}
// Remove the temporary snapshot dir
defer os.RemoveAll(path)
return intar.Stream(w, path, basePath, e.timeStampFilterTarFile(start, end))
}
func (e *Engine) filterFileToBackup(r *TSMReader, fi os.FileInfo, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error {
path := fullPath + ".tmp"
out, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
defer os.Remove(path)
w, err := NewTSMWriter(out)
if err != nil {
return err
}
defer w.Close()
// implicit else: here we iterate over the blocks and only keep the ones we really want.
bi := r.BlockIterator()
for bi.Next() {
// not concerned with typ or checksum since we are just blindly writing back, with no decoding
key, minTime, maxTime, _, _, buf, err := bi.Read()
if err != nil {
return err
}
if minTime >= start && minTime <= end ||
maxTime >= start && maxTime <= end ||
minTime <= start && maxTime >= end {
err := w.WriteBlock(key, minTime, maxTime, buf)
if err != nil {
return err
}
}
}
if err := bi.Err(); err != nil {
return err
}
err = w.WriteIndex()
if err != nil {
return err
}
// make sure the whole file is out to disk
if err := w.Flush(); err != nil {
return err
}
tmpFi, err := os.Stat(path)
if err != nil {
return err
}
return intar.StreamRenameFile(tmpFi, fi.Name(), shardRelativePath, path, tw)
}
// Restore reads a tar archive generated by Backup().
// Only files that match basePath will be copied into the directory. This obtains
// a write lock so no operations can be performed while restoring.
func (e *Engine) Restore(r io.Reader, basePath string) error {
return e.overlay(r, basePath, false)
}
// Import reads a tar archive generated by Backup() and adds each
// file matching basePath as a new TSM file. This obtains
// a write lock so no operations can be performed while Importing.
// If the import is successful, a full compaction is scheduled.
func (e *Engine) Import(r io.Reader, basePath string) error {
if err := e.overlay(r, basePath, true); err != nil {
return err
}
return e.ScheduleFullCompaction()
}
// overlay reads a tar archive generated by Backup() and adds each file
// from the archive matching basePath to the shard.
// If asNew is true, each file will be installed as a new TSM file even if an
// existing file with the same name in the backup exists.
func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
// Copy files from archive while under lock to prevent reopening.
newFiles, err := func() ([]string, error) {
e.mu.Lock()
defer e.mu.Unlock()
var newFiles []string
tr := tar.NewReader(r)
for {
if fileName, err := e.readFileFromBackup(tr, basePath, asNew); err == io.EOF {
break
} else if err != nil {
return nil, err
} else if fileName != "" {
newFiles = append(newFiles, fileName)
}
}
if err := file.SyncDir(e.path); err != nil {
return nil, err
}
// The filestore will only handle tsm files. Other file types will be ignored.
if err := e.FileStore.Replace(nil, newFiles); err != nil {
return nil, err
}
return newFiles, nil
}()
if err != nil {
return err
}
// Load any new series keys to the index
tsmFiles := make([]TSMFile, 0, len(newFiles))
defer func() {
for _, r := range tsmFiles {
r.Close()
}
}()
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, f := range newFiles {
// If asNew is true, the files created from readFileFromBackup will be new ones
// having a temp extension.
f = strings.TrimSuffix(f, ext)
if !strings.HasSuffix(f, TSMFileExtension) {
// This isn't a .tsm file.
continue
}
fd, err := os.Open(f)
if err != nil {
return err
}
r, err := NewTSMReader(fd)
if err != nil {
return err
}
tsmFiles = append(tsmFiles, r)
}
// Merge and dedup all the series keys across each reader to reduce
// lock contention on the index.
keys := make([][]byte, 0, 10000)
fieldTypes := make([]influxql.DataType, 0, 10000)
ki := newMergeKeyIterator(tsmFiles, nil)
for ki.Next() {
key, typ := ki.Read()
fieldType := BlockTypeToInfluxQLDataType(typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", typ)
}
keys = append(keys, key)
fieldTypes = append(fieldTypes, fieldType)
if len(keys) == cap(keys) {
// Send batch of keys to the index.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
// Reset buffers.
keys, fieldTypes = keys[:0], fieldTypes[:0]
}
}
if len(keys) > 0 {
// Add remaining partial batch.
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
return err
}
}
return e.MeasurementFieldSet().WriteToFile()
}
// readFileFromBackup copies the next file from the archive into the shard.
// The file is skipped if it does not have a matching shardRelativePath prefix.
// If asNew is true, each file will be installed as a new TSM file even if an
// existing file with the same name in the backup exists.
func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, asNew bool) (string, error) {
// Read next archive file.
hdr, err := tr.Next()
if err != nil {
return "", err
}
if !strings.HasSuffix(hdr.Name, TSMFileExtension) {
// This isn't a .tsm file.
return "", nil
}
filename := filepath.Base(filepath.FromSlash(hdr.Name))
// If this is a directory entry (usually just `index` for tsi), create it an move on.
if hdr.Typeflag == tar.TypeDir {
if err := os.MkdirAll(filepath.Join(e.path, filename), os.FileMode(hdr.Mode).Perm()); err != nil {
return "", err
}
return "", nil
}
if asNew {
filename = e.formatFileName(e.FileStore.NextGeneration(), 1) + "." + TSMFileExtension
}
tmp := fmt.Sprintf("%s.%s", filepath.Join(e.path, filename), TmpTSMFileExtension)
// Create new file on disk.
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return "", err
}
defer f.Close()
// Copy from archive to the file.
if _, err := io.CopyN(f, tr, hdr.Size); err != nil {
return "", err
}
// Sync to disk & close.
if err := f.Sync(); err != nil {
return "", err
}
return tmp, nil
}
// addToIndexFromKey will pull the measurement names, series keys, and field
// names from composite keys, and add them to the database index and measurement
// fields.
func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType) error {
var field []byte
names := make([][]byte, 0, len(keys))
tags := make([]models.Tags, 0, len(keys))
for i := 0; i < len(keys); i++ {
// Replace tsm key format with index key format.
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
name := models.ParseName(keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
return err
}
names = append(names, name)
tags = append(tags, models.ParseTags(keys[i]))
}
return e.index.CreateSeriesListIfNotExists(keys, names, tags)
}
// WritePoints writes metadata and point data into the engine.
// It returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
values := make(map[string][]Value, len(points))
var (
keyBuf []byte
baseLen int
seriesErr error
)
for _, p := range points {
keyBuf = append(keyBuf[:0], p.Key()...)
keyBuf = append(keyBuf, keyFieldSeparator...)
baseLen = len(keyBuf)
iter := p.FieldIterator()
t := p.Time().UnixNano()
for iter.Next() {
// Skip fields name "time", they are illegal
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
if e.seriesTypeMap != nil {
// Fast-path check to see if the field for the series already exists.
if v, ok := e.seriesTypeMap.Get(keyBuf); !ok {
if typ, err := e.Type(keyBuf); err != nil {
// Field type is unknown, we can try to add it.
} else if typ != iter.Type() {
// Existing type is different from what was passed in, we need to drop
// this write and refresh the series type map.
seriesErr = tsdb.ErrFieldTypeConflict
e.seriesTypeMap.Insert(keyBuf, int(typ))
continue
}
// Doesn't exist, so try to insert
vv, ok := e.seriesTypeMap.Insert(keyBuf, int(iter.Type()))
// We didn't insert and the type that exists isn't what we tried to insert, so
// we have a conflict and must drop this field/series.
if !ok || vv != int(iter.Type()) {
seriesErr = tsdb.ErrFieldTypeConflict
continue
}
} else if v != int(iter.Type()) {
// The series already exists, but with a different type. This is also a type conflict
// and we need to drop this field/series.
seriesErr = tsdb.ErrFieldTypeConflict
continue
}
}
var v Value
switch iter.Type() {
case models.Float:
fv, err := iter.FloatValue()
if err != nil {
return err
}
v = NewFloatValue(t, fv)
case models.Integer:
iv, err := iter.IntegerValue()
if err != nil {
return err
}
v = NewIntegerValue(t, iv)
case models.Unsigned:
iv, err := iter.UnsignedValue()
if err != nil {
return err
}
v = NewUnsignedValue(t, iv)
case models.String:
v = NewStringValue(t, iter.StringValue())
case models.Boolean:
bv, err := iter.BooleanValue()
if err != nil {
return err
}
v = NewBooleanValue(t, bv)
default:
return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())
}
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
}
e.mu.RLock()
defer e.mu.RUnlock()
// first try to write to the cache
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(ctx, values); err != nil {
return err
}
}
return seriesErr
}
// DeleteSeriesRange removes the values between min and max (inclusive) from all series
func (e *Engine) DeleteSeriesRange(ctx context.Context, itr tsdb.SeriesIterator, min, max int64) error {
return e.DeleteSeriesRangeWithPredicate(ctx, itr, func(name []byte, tags models.Tags) (int64, int64, bool) {
return min, max, true
})
}
// DeleteSeriesRangeWithPredicate removes the values between min and max (inclusive) from all series
// for which predicate() returns true. If predicate() is nil, then all values in range are removed.
func (e *Engine) DeleteSeriesRangeWithPredicate(
ctx context.Context,
itr tsdb.SeriesIterator,
predicate func(name []byte, tags models.Tags) (int64, int64, bool),
) error {
var disableOnce bool
// Ensure that the index does not compact away the measurement or series we're
// going to delete before we're done with them.
if tsiIndex, ok := e.index.(*tsi1.Index); ok {
tsiIndex.DisableCompactions()
defer tsiIndex.EnableCompactions()
tsiIndex.Wait()
fs, err := tsiIndex.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
}
var (
sz int
min, max int64 = math.MinInt64, math.MaxInt64
// Indicator that the min/max time for the current batch has changed and
// we need to flush the current batch before appending to it.
flushBatch bool
)
// These are reversed from min/max to ensure they are different the first time through.
newMin, newMax := int64(math.MaxInt64), int64(math.MinInt64)
// There is no predicate, so setup newMin/newMax to delete the full time range.
if predicate == nil {
newMin = min
newMax = max
}
batch := make([][]byte, 0, 10000)
for {
elem, err := itr.Next()
if err != nil {
return err
} else if elem == nil {
break
}
// See if the series should be deleted and if so, what range of time.
if predicate != nil {
var shouldDelete bool
newMin, newMax, shouldDelete = predicate(elem.Name(), elem.Tags())
if !shouldDelete {
continue
}
// If the min/max happens to change for the batch, we need to flush
// the current batch and start a new one.
flushBatch = (min != newMin || max != newMax) && len(batch) > 0
}
if elem.Expr() != nil {
if v, ok := elem.Expr().(*influxql.BooleanLiteral); !ok || !v.Val {
return errors.New("fields not supported in WHERE clause during deletion")
}
}
if !disableOnce {
// Disable and abort running compactions so that tombstones added existing tsm
// files don't get removed. This would cause deleted measurements/series to
// re-appear once the compaction completed. We only disable the level compactions
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
// and writing tombstones takes a long time, writes can get rejected due to the cache
// filling up.
e.disableLevelCompactions(true)
defer e.enableLevelCompactions(true)
e.sfile.DisableCompactions()
defer e.sfile.EnableCompactions()
e.sfile.Wait()
disableOnce = true
}
if sz >= deleteFlushThreshold || flushBatch {
// Delete all matching batch.
if err := e.deleteSeriesRange(ctx, batch, min, max); err != nil {
return err
}
batch = batch[:0]
sz = 0
flushBatch = false
}
// Use the new min/max time for the next iteration
min = newMin
max = newMax
key := models.MakeKey(elem.Name(), elem.Tags())
sz += len(key)
batch = append(batch, key)
}
if len(batch) > 0 {
// Delete all matching batch.
if err := e.deleteSeriesRange(ctx, batch, min, max); err != nil {
return err
}
}
return nil
}
// deleteSeriesRange removes the values between min and max (inclusive) from all series. This
// does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange
// and not directly.
func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min, max int64) error {
if len(seriesKeys) == 0 {
return nil
}
// Min and max time in the engine are slightly different from the query language values.
if min == influxql.MinTime {
min = math.MinInt64
}
if max == influxql.MaxTime {
max = math.MaxInt64
}
var overlapsTimeRangeMinMax bool
var overlapsTimeRangeMinMaxLock sync.Mutex
e.FileStore.Apply(ctx, func(r TSMFile) error {
if r.OverlapsTimeRange(min, max) {
overlapsTimeRangeMinMaxLock.Lock()
overlapsTimeRangeMinMax = true
overlapsTimeRangeMinMaxLock.Unlock()
}
return nil
})
if !overlapsTimeRangeMinMax && e.Cache.store.count() > 0 {
overlapsTimeRangeMinMax = true
}
if !overlapsTimeRangeMinMax {
return nil
}
// Ensure keys are sorted since lower layers require them to be.
if !bytesutil.IsSorted(seriesKeys) {
bytesutil.Sort(seriesKeys)
}
// Run the delete on each TSM file in parallel
if err := e.FileStore.Apply(ctx, func(r TSMFile) error {
// See if this TSM file contains the keys and time range
minKey, maxKey := seriesKeys[0], seriesKeys[len(seriesKeys)-1]
tsmMin, tsmMax := r.KeyRange()
tsmMin, _ = SeriesAndFieldFromCompositeKey(tsmMin)
tsmMax, _ = SeriesAndFieldFromCompositeKey(tsmMax)
overlaps := bytes.Compare(tsmMin, maxKey) <= 0 && bytes.Compare(tsmMax, minKey) >= 0
if !overlaps || !r.OverlapsTimeRange(min, max) {
return nil
}
// Delete each key we find in the file. We seek to the min key and walk from there.
batch := r.BatchDelete()
n := r.KeyCount()
var j int
for i := r.Seek(minKey); i < n; i++ {
indexKey, _ := r.KeyAt(i)
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
for j < len(seriesKeys) && bytes.Compare(seriesKeys[j], seriesKey) < 0 {
j++
}
if j >= len(seriesKeys) {
break
}
if bytes.Equal(seriesKeys[j], seriesKey) {
if err := batch.DeleteRange([][]byte{indexKey}, min, max); err != nil {
batch.Rollback()
return err
}
}
}
return batch.Commit()
}); err != nil {
return err
}
// find the keys in the cache and remove them
deleteKeys := make([][]byte, 0, len(seriesKeys))
// ApplySerialEntryFn cannot return an error in this invocation.
_ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
// Cache does not walk keys in sorted order, so search the sorted
// series we need to delete to see if any of the cache keys match.
i := bytesutil.SearchBytes(seriesKeys, seriesKey)
if i < len(seriesKeys) && bytes.Equal(seriesKey, seriesKeys[i]) {
// k is the measurement + tags + sep + field
deleteKeys = append(deleteKeys, k)
}
return nil
})
// Sort the series keys because ApplyEntryFn iterates over the keys randomly.
bytesutil.Sort(deleteKeys)
e.Cache.DeleteRange(deleteKeys, min, max)
// delete from the WAL
if e.WALEnabled {
if _, err := e.WAL.DeleteRange(ctx, deleteKeys, min, max); err != nil {
return err
}
}
// The series are deleted on disk, but the index may still say they exist.
// Depending on the min,max time passed in, the series may or not actually
// exists now. To reconcile the index, we walk the series keys that still exists
// on disk and cross out any keys that match the passed in series. Any series
// left in the slice at the end do not exist and can be deleted from the index.
// Note: this is inherently racy if writes are occurring to the same measurement/series are
// being removed. A write could occur and exist in the cache at this point, but we
// would delete it from the index.
minKey := seriesKeys[0]
// Apply runs this func concurrently. The seriesKeys slice is mutated concurrently
// by different goroutines setting positions to nil.
if err := e.FileStore.Apply(ctx, func(r TSMFile) error {
n := r.KeyCount()
var j int
// Start from the min deleted key that exists in this file.
for i := r.Seek(minKey); i < n; i++ {
if j >= len(seriesKeys) {
return nil
}
indexKey, _ := r.KeyAt(i)
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
// Skip over any deleted keys that are less than our tsm key
cmp := bytes.Compare(seriesKeys[j], seriesKey)
for j < len(seriesKeys) && cmp < 0 {
j++
if j >= len(seriesKeys) {
return nil
}
cmp = bytes.Compare(seriesKeys[j], seriesKey)
}
// We've found a matching key, cross it out so we do not remove it from the index.
if j < len(seriesKeys) && cmp == 0 {
seriesKeys[j] = emptyBytes
j++
}
}
return nil
}); err != nil {
return err
}
// The seriesKeys slice is mutated if they are still found in the cache.
cacheKeys := e.Cache.Keys()
for i := 0; i < len(seriesKeys); i++ {
seriesKey := seriesKeys[i]
// Already crossed out
if len(seriesKey) == 0 {
continue
}
j := bytesutil.SearchBytes(cacheKeys, seriesKey)
if j < len(cacheKeys) {
cacheSeriesKey, _ := SeriesAndFieldFromCompositeKey(cacheKeys[j])
if bytes.Equal(seriesKey, cacheSeriesKey) {
seriesKeys[i] = emptyBytes
}
}
}
// Have we deleted all values for the series? If so, we need to remove
// the series from the index.
hasDeleted := false
for _, k := range seriesKeys {
if len(k) > 0 {
hasDeleted = true
break
}
}
if hasDeleted {
buf := make([]byte, 1024) // For use when accessing series file.
ids := tsdb.NewSeriesIDSet()
measurements := make(map[string]struct{}, 1)
for _, k := range seriesKeys {
if len(k) == 0 {
continue // This key was wiped because it shouldn't be removed from index.
}
name, tags := models.ParseKeyBytes(k)
sid := e.sfile.SeriesID(name, tags, buf)
if sid == 0 {
continue
}
// See if this series was found in the cache earlier
i := bytesutil.SearchBytes(deleteKeys, k)
var hasCacheValues bool
// If there are multiple fields, they will have the same prefix. If any field
// has values, then we can't delete it from the index.
for i < len(deleteKeys) && bytes.HasPrefix(deleteKeys[i], k) {
if e.Cache.Values(deleteKeys[i]).Len() > 0 {
hasCacheValues = true
break
}
i++
}
if hasCacheValues {
continue
}
measurements[string(name)] = struct{}{}
// Remove the series from the local index.
if err := e.index.DropSeries(sid, k, false); err != nil {
return err
}
// Add the id to the set of delete ids.
ids.Add(sid)
}
actuallyDeleted := make([]string, 0, len(measurements))
for k := range measurements {
if dropped, err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil {
return err
} else if dropped {
if deleted, err := e.cleanupMeasurement([]byte(k)); err != nil {
return err
} else if deleted {
actuallyDeleted = append(actuallyDeleted, k)
}
}
}
if len(actuallyDeleted) > 0 {
if err := e.fieldset.Save(tsdb.MeasurementsToFieldChangeDeletions(actuallyDeleted)); err != nil {
return err
}
}
// Remove any series IDs for our set that still exist in other shards.
// We cannot remove these from the series file yet.
if err := e.seriesIDSets.ForEach(func(s *tsdb.SeriesIDSet) {
ids = ids.AndNot(s)
}); err != nil {
return err
}
// Remove the remaining ids from the series file as they no longer exist
// in any shard.
var err error
ids.ForEach(func(id uint64) {
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
err = err1
return
}
})
if err != nil {
return err
}
}
return nil
}
func (e *Engine) cleanupMeasurement(name []byte) (deleted bool, err error) {
// A sentinel error message to cause DeleteWithLock to not delete the measurement
abortErr := fmt.Errorf("measurements still exist")
// Under write lock, delete the measurement if we no longer have any data stored for
// the measurement. If data exists, we can't delete the field set yet as there
// were writes to the measurement while we are deleting it.
if err := e.fieldset.DeleteWithLock(string(name), func() error {
encodedName := models.EscapeMeasurement(name)
sep := len(encodedName)
// First scan the cache to see if any series exists for this measurement.
if err := e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
if bytes.HasPrefix(k, encodedName) && (k[sep] == ',' || k[sep] == keyFieldSeparator[0]) {
return abortErr
}
return nil
}); err != nil {
return err
}
// Check the filestore.
return e.FileStore.WalkKeys(name, func(k []byte, _ byte) error {
if bytes.HasPrefix(k, encodedName) && (k[sep] == ',' || k[sep] == keyFieldSeparator[0]) {
return abortErr
}
return nil
})
}); err != nil && err != abortErr {
// Something else failed, return it
return false, err
}
return err != abortErr, nil
}
// DeleteMeasurement deletes a measurement and all related series.
func (e *Engine) DeleteMeasurement(ctx context.Context, name []byte) error {
// Attempt to find the series keys.
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
itr, err := indexSet.MeasurementSeriesByExprIterator(name, nil)
if err != nil {
return err
} else if itr == nil {
return nil
}
defer itr.Close()
return e.DeleteSeriesRange(ctx, tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64)
}
// ForEachMeasurementName iterates over each measurement name in the engine.
func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error {
return e.index.ForEachMeasurementName(fn)
}
func (e *Engine) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error {
return e.index.CreateSeriesListIfNotExists(keys, names, tagsSlice)
}
func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
return e.index.CreateSeriesIfNotExists(key, name, tags)
}
// WriteTo is not implemented.
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
func (e *Engine) WriteSnapshot() (err error) {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot
started := time.Now()
log, logEnd := logger.NewOperation(context.TODO(), e.logger, "Cache snapshot", "tsm1_cache_snapshot")
defer func() {
elapsed := time.Since(started)
if err != nil && err != errCompactionsDisabled {
e.stats.Failed.With(prometheus.Labels{levelKey: levelCache}).Inc()
}
e.stats.Duration.With(prometheus.Labels{levelKey: levelCache}).Observe(elapsed.Seconds())
if err == nil {
log.Info("Snapshot for path written", zap.String("path", e.path), zap.Duration("duration", elapsed))
}
logEnd()
}()
closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.WALEnabled {
if err = e.WAL.CloseSegment(); err != nil {
return
}
segments, err = e.WAL.ClosedSegments()
if err != nil {
return
}
}
snapshot, err = e.Cache.Snapshot()
if err != nil {
return
}
return
}()
if err != nil {
return err
}
if snapshot.Size() == 0 {
e.Cache.ClearSnapshot(true)
return nil
}
// The snapshotted cache may have duplicate points and unsorted data. We need to deduplicate
// it before writing the snapshot. This can be very expensive so it's done while we are not
// holding the engine write lock.
dedup := time.Now()
snapshot.Deduplicate()
e.traceLogger.Info("Snapshot for path deduplicated",
zap.String("path", e.path),
zap.Duration("duration", time.Since(dedup)))
return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
}
// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underlying shard files.
// skipCacheOk controls whether it is permissible to fail writing out
// in-memory cache data when a previous snapshot is in progress.
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
err := e.WriteSnapshot()
for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 {
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
err = e.WriteSnapshot()
}
if err == ErrSnapshotInProgress && skipCacheOk {
e.logger.Warn("Snapshotter busy: proceeding without cache contents")
} else if err != nil {
return "", err
}
e.mu.RLock()
defer e.mu.RUnlock()
path, err := e.FileStore.CreateSnapshot()
if err != nil {
return "", err
}
// Generate a snapshot of the index.
return path, nil
}
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.
func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {
defer func() {
if err != nil {
e.Cache.ClearSnapshot(false)
}
}()
// write the new snapshot files
newFiles, err := e.Compactor.WriteSnapshot(snapshot, e.logger)
if err != nil {
log.Info("Error writing snapshot from compactor", zap.Error(err))
return err
}
e.mu.RLock()
defer e.mu.RUnlock()
// update the file store with these new files
if err := e.FileStore.Replace(nil, newFiles); err != nil {
log.Info("Error adding new TSM files from snapshot. Removing temp files.", zap.Error(err))
// Remove the new snapshot files. We will try again.
for _, file := range newFiles {
if err := os.Remove(file); err != nil {
log.Info("Unable to remove file", zap.String("path", file), zap.Error(err))
}
}
return err
}
// clear the snapshot from the in-memory cache, then the old WAL files
e.Cache.ClearSnapshot(true)
if e.WALEnabled {
if err := e.WAL.Remove(closedFiles); err != nil {
log.Info("Error removing closed WAL segments", zap.Error(err))
}
}
return nil
}
// compactCache continually checks if the WAL cache should be written to disk.
func (e *Engine) compactCache() {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
e.mu.RLock()
quit := e.snapDone
e.mu.RUnlock()
select {
case <-quit:
return
case <-t.C:
if e.ShouldCompactCache(time.Now()) {
e.traceLogger.Info("Compacting cache", zap.String("path", e.path))
err := e.WriteSnapshot()
if err != nil && err != errCompactionsDisabled {
e.logger.Info("Error writing snapshot", zap.Error(err))
}
}
}
}
}
// ShouldCompactCache returns true if the Cache is over its flush threshold
// or if the passed in lastWriteTime is older than the write cold threshold.
func (e *Engine) ShouldCompactCache(t time.Time) bool {
sz := e.Cache.Size()
if sz == 0 {
return false
}
if sz > e.CacheFlushMemorySizeThreshold {
return true
}
return t.Sub(e.Cache.LastWriteTime()) > e.CacheFlushWriteColdDuration
}
func (e *Engine) compact(wg *sync.WaitGroup) {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
e.mu.RLock()
quit := e.done
e.mu.RUnlock()
select {
case <-quit:
return
case <-t.C:
// Find our compaction plans
level1Groups, len1 := e.CompactionPlan.PlanLevel(1)
level2Groups, len2 := e.CompactionPlan.PlanLevel(2)
level3Groups, len3 := e.CompactionPlan.PlanLevel(3)
level4Groups, len4 := e.CompactionPlan.Plan(e.LastModified())
e.stats.Queued.With(prometheus.Labels{levelKey: levelFull}).Set(float64(len4))
// If no full compactions are need, see if an optimize is needed
if len(level4Groups) == 0 {
level4Groups, len4 = e.CompactionPlan.PlanOptimize()
e.stats.Queued.With(prometheus.Labels{levelKey: levelOpt}).Set(float64(len4))
}
// Update the level plan queue stats
// For stats, use the length needed, even if the lock was
// not acquired
e.stats.Queued.With(prometheus.Labels{levelKey: level1}).Set(float64(len1))
e.stats.Queued.With(prometheus.Labels{levelKey: level2}).Set(float64(len2))
e.stats.Queued.With(prometheus.Labels{levelKey: level3}).Set(float64(len3))
// Set the queue depths on the scheduler
// Use the real queue depth, dependent on acquiring
// the file locks.
e.scheduler.setDepth(1, len(level1Groups))
e.scheduler.setDepth(2, len(level2Groups))
e.scheduler.setDepth(3, len(level3Groups))
e.scheduler.setDepth(4, len(level4Groups))
// Find the next compaction that can run and try to kick it off
if level, runnable := e.scheduler.next(); runnable {
switch level {
case 1:
if e.compactLevel(level1Groups[0], 1, false, wg) {
level1Groups = level1Groups[1:]
}
case 2:
if e.compactLevel(level2Groups[0], 2, false, wg) {
level2Groups = level2Groups[1:]
}
case 3:
if e.compactLevel(level3Groups[0], 3, true, wg) {
level3Groups = level3Groups[1:]
}
case 4:
if e.compactFull(level4Groups[0], wg) {
level4Groups = level4Groups[1:]
}
}
}
// Release all the plans we didn't start.
e.CompactionPlan.Release(level1Groups)
e.CompactionPlan.Release(level2Groups)
e.CompactionPlan.Release(level3Groups)
e.CompactionPlan.Release(level4Groups)
}
}
}
// compactLevel kicks off compactions using the level strategy. It returns
// true if the compaction was started
func (e *Engine) compactLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
}
if e.compactionLimiter.TryTake() {
{
val := atomic.AddInt64(e.activeCompactions.countForLevel(level), 1)
e.stats.Active.With(labelForLevel(level)).Set(float64(val))
}
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
val := atomic.AddInt64(e.activeCompactions.countForLevel(level), -1)
e.stats.Active.With(labelForLevel(level)).Set(float64(val))
}()
defer e.compactionLimiter.Release()
s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release([]CompactionGroup{s.group})
}()
return true
}
// Return the unused plans
return false
}
// compactFull kicks off full and optimize compactions using the lo priority policy. It returns
// the plans that were not able to be started.
func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool {
s := e.fullCompactionStrategy(grp, false)
if s == nil {
return false
}
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
if e.compactionLimiter.TryTake() {
{
val := atomic.AddInt64(&e.activeCompactions.full, 1)
e.stats.Active.With(prometheus.Labels{levelKey: levelFull}).Set(float64(val))
}
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
val := atomic.AddInt64(&e.activeCompactions.full, -1)
e.stats.Active.With(prometheus.Labels{levelKey: levelFull}).Set(float64(val))
}()
defer e.compactionLimiter.Release()
s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release([]CompactionGroup{s.group})
}()
return true
}
return false
}
// compactionStrategy holds the details of what to do in a compaction.
type compactionStrategy struct {
group CompactionGroup
fast bool
level int
durationSecondsStat prometheus.Observer
errorStat prometheus.Counter
logger *zap.Logger
compactor *Compactor
fileStore *FileStore
engine *Engine
}
// Apply concurrently compacts all the groups in a compaction strategy.
func (s *compactionStrategy) Apply() {
start := time.Now()
s.compactGroup()
s.durationSecondsStat.Observe(time.Since(start).Seconds())
}
// compactGroup executes the compaction strategy against a single CompactionGroup.
func (s *compactionStrategy) compactGroup() {
group := s.group
log, logEnd := logger.NewOperation(context.TODO(), s.logger, "TSM compaction", "tsm1_compact_group")
defer logEnd()
log.Info("Beginning compaction", zap.Int("tsm1_files_n", len(group)))
for i, f := range group {
log.Info("Compacting file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f))
}
var (
err error
files []string
)
if s.fast {
files, err = s.compactor.CompactFast(group, log)
} else {
files, err = s.compactor.CompactFull(group, log)
}
if err != nil {
_, inProgress := err.(errCompactionInProgress)
if err == errCompactionsDisabled || inProgress {
log.Info("Aborted compaction", zap.Error(err))
if _, ok := err.(errCompactionInProgress); ok {
time.Sleep(time.Second)
}
return
}
log.Warn("Error compacting TSM files", zap.Error(err))
// We hit a bad TSM file - rename so the next compaction can proceed.
if _, ok := err.(errBlockRead); ok {
path := err.(errBlockRead).file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.Error(err))
if err := s.fileStore.ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.Error((err)))
}
}
s.errorStat.Inc()
time.Sleep(time.Second)
return
}
if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil {
log.Info("Error replacing new TSM files", zap.Error(err))
s.errorStat.Inc()
time.Sleep(time.Second)
// Remove the new snapshot files. We will try again.
for _, file := range files {
if err := os.Remove(file); err != nil {
log.Error("Unable to remove file", zap.String("path", file), zap.Error(err))
}
}
return
}
for i, f := range files {
log.Info("Compacted file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f))
}
log.Info("Finished compacting files",
zap.Int("tsm1_files_n", len(files)))
}
// levelCompactionStrategy returns a compactionStrategy for the given level.
// It returns nil if there are no TSM files to compact.
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {
label := labelForLevel(level)
return &compactionStrategy{
group: group,
logger: e.logger.With(zap.Int("tsm1_level", level), zap.String("tsm1_strategy", "level")),
fileStore: e.FileStore,
compactor: e.Compactor,
fast: fast,
engine: e,
level: level,
errorStat: e.stats.Failed.With(label),
durationSecondsStat: e.stats.Duration.With(label),
}
}
// fullCompactionStrategy returns a compactionStrategy for higher level generations of TSM files.
// It returns nil if there are no TSM files to compact.
func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *compactionStrategy {
s := &compactionStrategy{
group: group,
logger: e.logger.With(zap.String("tsm1_strategy", "full"), zap.Bool("tsm1_optimize", optimize)),
fileStore: e.FileStore,
compactor: e.Compactor,
fast: optimize,
engine: e,
level: 4,
}
plabel := prometheus.Labels{levelKey: levelFull}
if optimize {
plabel = prometheus.Labels{levelKey: levelOpt}
}
s.errorStat = e.stats.Failed.With(plabel)
s.durationSecondsStat = e.stats.Duration.With(plabel)
return s
}
// reloadCache reads the WAL segment files and loads them into the cache.
func (e *Engine) reloadCache() error {
now := time.Now()
files, err := segmentFileNames(e.WAL.Path())
if err != nil {
return err
}
limit := e.Cache.MaxSize()
defer func() {
e.Cache.SetMaxSize(limit)
}()
// Disable the max size during loading
e.Cache.SetMaxSize(0)
loader := NewCacheLoader(files)
loader.WithLogger(e.logger)
if err := loader.Load(e.Cache); err != nil {
return err
}
e.traceLogger.Info("Reloaded WAL cache",
zap.String("path", e.WAL.Path()), zap.Duration("duration", time.Since(now)))
return nil
}
// cleanup removes all temp files and dirs that exist on disk. This is should only be run at startup to avoid
// removing tmp files that are still in use.
func (e *Engine) cleanup() error {
allfiles, err := os.ReadDir(e.path)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, f := range allfiles {
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
if f.IsDir() && strings.HasSuffix(f.Name(), ext) {
if err := os.RemoveAll(filepath.Join(e.path, f.Name())); err != nil {
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
}
}
}
return e.cleanupTempTSMFiles()
}
func (e *Engine) cleanupTempTSMFiles() error {
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension)))
if err != nil {
return fmt.Errorf("error getting compaction temp files: %s", err.Error())
}
for _, f := range files {
if err := os.Remove(f); err != nil {
return fmt.Errorf("error removing temp compaction files: %v", err)
}
}
return nil
}
// KeyCursor returns a KeyCursor for the given key starting at time t.
func (e *Engine) KeyCursor(ctx context.Context, key []byte, t int64, ascending bool) *KeyCursor {
return e.FileStore.KeyCursor(ctx, key, t, ascending)
}
// CreateIterator returns an iterator for the measurement based on opt.
func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error) {
if span := tracing.SpanFromContext(ctx); span != nil {
labels := []string{"shard_id", strconv.Itoa(int(e.id)), "measurement", measurement}
if opt.Condition != nil {
labels = append(labels, "cond", opt.Condition.String())
}
span = span.StartSpan("create_iterator")
span.SetLabels(labels...)
ctx = tracing.NewContextWithSpan(ctx, span)
group := metrics.NewGroup(tsmGroup)
ctx = metrics.NewContextWithGroup(ctx, group)
start := time.Now()
defer group.GetTimer(planningTimer).UpdateSince(start)
}
if call, ok := opt.Expr.(*influxql.Call); ok {
if opt.Interval.IsZero() {
if call.Name == "first" || call.Name == "last" {
refOpt := opt
refOpt.Limit = 1
refOpt.Ascending = call.Name == "first"
refOpt.Ordered = true
refOpt.Expr = call.Args[0]
itrs, err := e.createVarRefIterator(ctx, measurement, refOpt)
if err != nil {
return nil, err
}
return newMergeFinalizerIterator(ctx, itrs, opt, e.logger)
}
}
inputs, err := e.createCallIterator(ctx, measurement, call, opt)
if err != nil {
return nil, err
} else if len(inputs) == 0 {
return nil, nil
}
return newMergeFinalizerIterator(ctx, inputs, opt, e.logger)
}
itrs, err := e.createVarRefIterator(ctx, measurement, opt)
if err != nil {
return nil, err
}
return newMergeFinalizerIterator(ctx, itrs, opt, e.logger)
}
// createSeriesIterator creates an optimized series iterator if possible.
// We exclude less-common cases for now as not worth implementing.
func (e *Engine) createSeriesIterator(measurement string, ref *influxql.VarRef, is tsdb.IndexSet, opt query.IteratorOptions) (query.Iterator, error) {
// Main check to see if we are trying to create a seriesKey iterator
if ref == nil || ref.Val != "_seriesKey" || len(opt.Aux) != 0 {
return nil, nil
}
// Check some other cases that we could maybe handle, but don't
if len(opt.Dimensions) > 0 {
return nil, nil
}
if opt.SLimit != 0 || opt.SOffset != 0 {
return nil, nil
}
if opt.StripName {
return nil, nil
}
if opt.Ordered {
return nil, nil
}
// Actual creation of the iterator
seriesCursor, err := is.MeasurementSeriesKeyByExprIterator([]byte(measurement), opt.Condition, opt.Authorizer)
if err != nil {
seriesCursor.Close()
return nil, err
}
var seriesIterator query.Iterator
seriesIterator = newSeriesIterator(measurement, seriesCursor)
if opt.InterruptCh != nil {
seriesIterator = query.NewInterruptIterator(seriesIterator, opt.InterruptCh)
}
return seriesIterator, nil
}
func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) {
ref, _ := call.Args[0].(*influxql.VarRef)
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
return nil, err
} else if !exists {
return nil, nil
}
// check for optimized series iteration for tsi index
if e.index.Type() == tsdb.TSI1IndexName {
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
seriesOpt := opt
if len(opt.Dimensions) == 0 && (call.Name == "count" || call.Name == "sum_hll") {
// no point ordering the series if we are just counting all of them
seriesOpt.Ordered = false
}
seriesIterator, err := e.createSeriesIterator(measurement, ref, indexSet, seriesOpt)
if err != nil {
return nil, err
}
if seriesIterator != nil {
callIterator, err := query.NewCallIterator(seriesIterator, opt)
if err != nil {
seriesIterator.Close()
return nil, err
}
return []query.Iterator{callIterator}, nil
}
}
// Determine tagsets for this measurement based on dimensions and filters.
var (
tagSets []*query.TagSet
err error
)
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
if err != nil {
return nil, err
}
// Reverse the tag sets if we are ordering by descending.
if !opt.Ascending {
for _, t := range tagSets {
t.Reverse()
}
}
// Calculate tag sets and apply SLIMIT/SOFFSET.
tagSets = query.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)
itrs := make([]query.Iterator, 0, len(tagSets))
if err := func() error {
for _, t := range tagSets {
// Abort if the query was killed
select {
case <-opt.InterruptCh:
query.Iterators(itrs).Close()
return query.ErrQueryInterrupted
default:
}
inputs, err := e.createTagSetIterators(ctx, ref, measurement, t, opt)
if err != nil {
return err
} else if len(inputs) == 0 {
continue
}
// Wrap each series in a call iterator.
for i, input := range inputs {
if opt.InterruptCh != nil {
input = query.NewInterruptIterator(input, opt.InterruptCh)
}
itr, err := query.NewCallIterator(input, opt)
if err != nil {
query.Iterators(inputs).Close()
return err
}
inputs[i] = itr
}
itr := query.NewParallelMergeIterator(inputs, opt, runtime.GOMAXPROCS(0))
itrs = append(itrs, itr)
}
return nil
}(); err != nil {
query.Iterators(itrs).Close()
return nil, err
}
return itrs, nil
}
// createVarRefIterator creates an iterator for a variable reference.
func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, opt query.IteratorOptions) ([]query.Iterator, error) {
ref, _ := opt.Expr.(*influxql.VarRef)
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
return nil, err
} else if !exists {
return nil, nil
}
var (
tagSets []*query.TagSet
err error
)
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
if err != nil {
return nil, err
}
// Reverse the tag sets if we are ordering by descending.
if !opt.Ascending {
for _, t := range tagSets {
t.Reverse()
}
}
// Calculate tag sets and apply SLIMIT/SOFFSET.
tagSets = query.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)
itrs := make([]query.Iterator, 0, len(tagSets))
if err := func() error {
for _, t := range tagSets {
inputs, err := e.createTagSetIterators(ctx, ref, measurement, t, opt)
if err != nil {
return err
} else if len(inputs) == 0 {
continue
}
// If we have a LIMIT or OFFSET and the grouping of the outer query
// is different than the current grouping, we need to perform the
// limit on each of the individual series keys instead to improve
// performance.
if (opt.Limit > 0 || opt.Offset > 0) && len(opt.Dimensions) != len(opt.GroupBy) {
for i, input := range inputs {
inputs[i] = newLimitIterator(input, opt)
}
}
itr, err := query.Iterators(inputs).Merge(opt)
if err != nil {
query.Iterators(inputs).Close()
return err
}
// Apply a limit on the merged iterator.
if opt.Limit > 0 || opt.Offset > 0 {
if len(opt.Dimensions) == len(opt.GroupBy) {
// When the final dimensions and the current grouping are
// the same, we will only produce one series so we can use
// the faster limit iterator.
itr = newLimitIterator(itr, opt)
} else {
// When the dimensions are different than the current
// grouping, we need to account for the possibility there
// will be multiple series. The limit iterator in the
// influxql package handles that scenario.
itr = query.NewLimitIterator(itr, opt)
}
}
itrs = append(itrs, itr)
}
return nil
}(); err != nil {
query.Iterators(itrs).Close()
return nil, err
}
return itrs, nil
}
// createTagSetIterators creates a set of iterators for a tagset.
func (e *Engine) createTagSetIterators(ctx context.Context, ref *influxql.VarRef, name string, t *query.TagSet, opt query.IteratorOptions) ([]query.Iterator, error) {
// Set parallelism by number of logical cpus.
parallelism := runtime.GOMAXPROCS(0)
if parallelism > len(t.SeriesKeys) {
parallelism = len(t.SeriesKeys)
}
// Create series key groupings w/ return error.
groups := make([]struct {
keys []string
filters []influxql.Expr
itrs []query.Iterator
err error
}, parallelism)
// Group series keys.
n := len(t.SeriesKeys) / parallelism
for i := 0; i < parallelism; i++ {
group := &groups[i]
if i < parallelism-1 {
group.keys = t.SeriesKeys[i*n : (i+1)*n]
group.filters = t.Filters[i*n : (i+1)*n]
} else {
group.keys = t.SeriesKeys[i*n:]
group.filters = t.Filters[i*n:]
}
}
// Read series groups in parallel.
var wg sync.WaitGroup
for i := range groups {
wg.Add(1)
go func(i int) {
defer wg.Done()
groups[i].itrs, groups[i].err = e.createTagSetGroupIterators(ctx, ref, name, groups[i].keys, t, groups[i].filters, opt)
}(i)
}
wg.Wait()
// Determine total number of iterators so we can allocate only once.
var itrN int
for _, group := range groups {
itrN += len(group.itrs)
}
// Combine all iterators together and check for errors.
var err error
itrs := make([]query.Iterator, 0, itrN)
for _, group := range groups {
if group.err != nil {
err = group.err
}
itrs = append(itrs, group.itrs...)
}
// If an error occurred, make sure we close all created iterators.
if err != nil {
query.Iterators(itrs).Close()
return nil, err
}
return itrs, nil
}
// createTagSetGroupIterators creates a set of iterators for a subset of a tagset's series.
func (e *Engine) createTagSetGroupIterators(ctx context.Context, ref *influxql.VarRef, name string, seriesKeys []string, t *query.TagSet, filters []influxql.Expr, opt query.IteratorOptions) ([]query.Iterator, error) {
itrs := make([]query.Iterator, 0, len(seriesKeys))
for i, seriesKey := range seriesKeys {
var conditionFields []influxql.VarRef
if filters[i] != nil {
// Retrieve non-time fields from this series filter and filter out tags.
conditionFields = influxql.ExprNames(filters[i])
}
itr, err := e.createVarRefSeriesIterator(ctx, ref, name, seriesKey, t, filters[i], conditionFields, opt)
if err != nil {
return itrs, err
} else if itr == nil {
continue
}
itrs = append(itrs, itr)
// Abort if the query was killed
select {
case <-opt.InterruptCh:
query.Iterators(itrs).Close()
return nil, query.ErrQueryInterrupted
default:
}
// Enforce series limit at creation time.
if opt.MaxSeriesN > 0 && len(itrs) > opt.MaxSeriesN {
query.Iterators(itrs).Close()
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", len(itrs), opt.MaxSeriesN)
}
}
return itrs, nil
}
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
func (e *Engine) createVarRefSeriesIterator(ctx context.Context, ref *influxql.VarRef, name string, seriesKey string, t *query.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt query.IteratorOptions) (query.Iterator, error) {
_, tfs := models.ParseKey([]byte(seriesKey))
tags := query.NewTags(tfs.Map())
// Create options specific for this series.
itrOpt := opt
itrOpt.Condition = filter
var curCounter, auxCounter, condCounter *metrics.Counter
if col := metrics.GroupFromContext(ctx); col != nil {
curCounter = col.GetCounter(numberOfRefCursorsCounter)
auxCounter = col.GetCounter(numberOfAuxCursorsCounter)
condCounter = col.GetCounter(numberOfCondCursorsCounter)
}
// Build main cursor.
var cur cursor
if ref != nil {
cur = e.buildCursor(ctx, name, seriesKey, tfs, ref, opt)
// If the field doesn't exist then don't build an iterator.
if cur == nil {
return nil, nil
}
if curCounter != nil {
curCounter.Add(1)
}
}
// Build auxiliary cursors.
// Tag values should be returned if the field doesn't exist.
var aux []cursorAt
if len(opt.Aux) > 0 {
aux = make([]cursorAt, len(opt.Aux))
for i, ref := range opt.Aux {
// Create cursor from field if a tag wasn't requested.
if ref.Type != influxql.Tag {
cur := e.buildCursor(ctx, name, seriesKey, tfs, &ref, opt)
if cur != nil {
if auxCounter != nil {
auxCounter.Add(1)
}
aux[i] = newBufCursor(cur, opt.Ascending)
continue
}
// If a field was requested, use a nil cursor of the requested type.
switch ref.Type {
case influxql.Float, influxql.AnyField:
aux[i] = nilFloatLiteralValueCursor
continue
case influxql.Integer:
aux[i] = nilIntegerLiteralValueCursor
continue
case influxql.Unsigned:
aux[i] = nilUnsignedLiteralValueCursor
continue
case influxql.String:
aux[i] = nilStringLiteralValueCursor
continue
case influxql.Boolean:
aux[i] = nilBooleanLiteralValueCursor
continue
}
}
// If field doesn't exist, use the tag value.
if v := tags.Value(ref.Val); v == "" {
// However, if the tag value is blank then return a null.
aux[i] = nilStringLiteralValueCursor
} else {
aux[i] = &literalValueCursor{value: v}
}
}
}
// Remove _tagKey condition field.
// We can't seach on it because we can't join it to _tagValue based on time.
if varRefSliceContains(conditionFields, "_tagKey") {
conditionFields = varRefSliceRemove(conditionFields, "_tagKey")
// Remove _tagKey conditional references from iterator.
itrOpt.Condition = influxql.RewriteExpr(influxql.CloneExpr(itrOpt.Condition), func(expr influxql.Expr) influxql.Expr {
switch expr := expr.(type) {
case *influxql.BinaryExpr:
if ref, ok := expr.LHS.(*influxql.VarRef); ok && ref.Val == "_tagKey" {
return &influxql.BooleanLiteral{Val: true}
}
if ref, ok := expr.RHS.(*influxql.VarRef); ok && ref.Val == "_tagKey" {
return &influxql.BooleanLiteral{Val: true}
}
}
return expr
})
}
// Build conditional field cursors.
// If a conditional field doesn't exist then ignore the series.
var conds []cursorAt
if len(conditionFields) > 0 {
conds = make([]cursorAt, len(conditionFields))
for i, ref := range conditionFields {
// Create cursor from field if a tag wasn't requested.
if ref.Type != influxql.Tag {
cur := e.buildCursor(ctx, name, seriesKey, tfs, &ref, opt)
if cur != nil {
if condCounter != nil {
condCounter.Add(1)
}
conds[i] = newBufCursor(cur, opt.Ascending)
continue
}
// If a field was requested, use a nil cursor of the requested type.
switch ref.Type {
case influxql.Float, influxql.AnyField:
conds[i] = nilFloatLiteralValueCursor
continue
case influxql.Integer:
conds[i] = nilIntegerLiteralValueCursor
continue
case influxql.Unsigned:
conds[i] = nilUnsignedLiteralValueCursor
continue
case influxql.String:
conds[i] = nilStringLiteralValueCursor
continue
case influxql.Boolean:
conds[i] = nilBooleanLiteralValueCursor
continue
}
}
// If field doesn't exist, use the tag value.
if v := tags.Value(ref.Val); v == "" {
// However, if the tag value is blank then return a null.
conds[i] = nilStringLiteralValueCursor
} else {
conds[i] = &literalValueCursor{value: v}
}
}
}
condNames := influxql.VarRefs(conditionFields).Strings()
// Limit tags to only the dimensions selected.
dimensions := opt.GetDimensions()
tags = tags.Subset(dimensions)
// If it's only auxiliary fields then it doesn't matter what type of iterator we use.
if ref == nil {
if opt.StripName {
name = ""
}
return newFloatIterator(name, tags, itrOpt, nil, aux, conds, condNames), nil
}
// Remove name if requested.
if opt.StripName {
name = ""
}
switch cur := cur.(type) {
case floatCursor:
return newFloatIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case integerCursor:
return newIntegerIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case unsignedCursor:
return newUnsignedIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case stringCursor:
return newStringIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case booleanCursor:
return newBooleanIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
default:
panic("unreachable")
}
}
// buildCursor creates an untyped cursor for a field.
func (e *Engine) buildCursor(ctx context.Context, measurement, seriesKey string, tags models.Tags, ref *influxql.VarRef, opt query.IteratorOptions) cursor {
// Check if this is a system field cursor.
switch ref.Val {
case "_name":
return &stringSliceCursor{values: []string{measurement}}
case "_tagKey":
return &stringSliceCursor{values: tags.Keys()}
case "_tagValue":
return &stringSliceCursor{values: matchTagValues(tags, opt.Condition)}
case "_seriesKey":
return &stringSliceCursor{values: []string{seriesKey}}
}
// Look up fields for measurement.
mf := e.fieldset.FieldsByString(measurement)
if mf == nil {
return nil
}
// Check for system field for field keys.
if ref.Val == "_fieldKey" {
return &stringSliceCursor{values: mf.FieldKeys()}
}
// Find individual field.
f := mf.Field(ref.Val)
if f == nil {
return nil
}
// Check if we need to perform a cast. Performing a cast in the
// engine (if it is possible) is much more efficient than an automatic cast.
if ref.Type != influxql.Unknown && ref.Type != influxql.AnyField && ref.Type != f.Type {
switch ref.Type {
case influxql.Float:
switch f.Type {
case influxql.Integer:
cur := e.buildIntegerCursor(ctx, measurement, seriesKey, ref.Val, opt)
return &floatCastIntegerCursor{cursor: cur}
case influxql.Unsigned:
cur := e.buildUnsignedCursor(ctx, measurement, seriesKey, ref.Val, opt)
return &floatCastUnsignedCursor{cursor: cur}
}
case influxql.Integer:
switch f.Type {
case influxql.Float:
cur := e.buildFloatCursor(ctx, measurement, seriesKey, ref.Val, opt)
return &integerCastFloatCursor{cursor: cur}
case influxql.Unsigned:
cur := e.buildUnsignedCursor(ctx, measurement, seriesKey, ref.Val, opt)
return &integerCastUnsignedCursor{cursor: cur}
}
case influxql.Unsigned:
switch f.Type {
case influxql.Float:
cur := e.buildFloatCursor(ctx, measurement, seriesKey, ref.Val, opt)
return &unsignedCastFloatCursor{cursor: cur}
case influxql.Integer:
cur := e.buildIntegerCursor(ctx, measurement, seriesKey, ref.Val, opt)
return &unsignedCastIntegerCursor{cursor: cur}
}
}
return nil
}
// Return appropriate cursor based on type.
switch f.Type {
case influxql.Float:
return e.buildFloatCursor(ctx, measurement, seriesKey, ref.Val, opt)
case influxql.Integer:
return e.buildIntegerCursor(ctx, measurement, seriesKey, ref.Val, opt)
case influxql.Unsigned:
return e.buildUnsignedCursor(ctx, measurement, seriesKey, ref.Val, opt)
case influxql.String:
return e.buildStringCursor(ctx, measurement, seriesKey, ref.Val, opt)
case influxql.Boolean:
return e.buildBooleanCursor(ctx, measurement, seriesKey, ref.Val, opt)
default:
panic("unreachable")
}
}
func matchTagValues(tags models.Tags, condition influxql.Expr) []string {
if condition == nil {
return tags.Values()
}
// Populate map with tag values.
data := map[string]interface{}{}
for _, tag := range tags {
data[string(tag.Key)] = string(tag.Value)
}
// Match against each specific tag.
var values []string
for _, tag := range tags {
data["_tagKey"] = string(tag.Key)
if influxql.EvalBool(condition, data) {
values = append(values, string(tag.Value))
}
}
return values
}
// IteratorCost produces the cost of an iterator.
func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
// Determine if this measurement exists. If it does not, then no shards are
// accessed to begin with.
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
return query.IteratorCost{}, err
} else if !exists {
return query.IteratorCost{}, nil
}
// Determine all of the tag sets for this query.
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
tagSets, err := indexSet.TagSets(e.sfile, []byte(measurement), opt)
if err != nil {
return query.IteratorCost{}, err
}
// Attempt to retrieve the ref from the main expression (if it exists).
var ref *influxql.VarRef
if opt.Expr != nil {
if v, ok := opt.Expr.(*influxql.VarRef); ok {
ref = v
} else if call, ok := opt.Expr.(*influxql.Call); ok {
if len(call.Args) > 0 {
ref, _ = call.Args[0].(*influxql.VarRef)
}
}
}
// Count the number of series concatenated from the tag set.
cost := query.IteratorCost{NumShards: 1}
for _, t := range tagSets {
cost.NumSeries += int64(len(t.SeriesKeys))
for i, key := range t.SeriesKeys {
// Retrieve the cost for the main expression (if it exists).
if ref != nil {
c := e.seriesCost(key, ref.Val, opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}
// Retrieve the cost for every auxiliary field since these are also
// iterators that we may have to look through.
// We may want to separate these though as we are unlikely to incur
// anywhere close to the full costs of the auxiliary iterators because
// many of the selected values are usually skipped.
for _, ref := range opt.Aux {
c := e.seriesCost(key, ref.Val, opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}
// Retrieve the expression names in the condition (if there is a condition).
// We will also create cursors for these too.
if t.Filters[i] != nil {
refs := influxql.ExprNames(t.Filters[i])
for _, ref := range refs {
c := e.seriesCost(key, ref.Val, opt.StartTime, opt.EndTime)
cost = cost.Combine(c)
}
}
}
}
return cost, nil
}
// Type returns FieldType for a series. If the series does not
// exist, ErrUnknownFieldType is returned.
func (e *Engine) Type(series []byte) (models.FieldType, error) {
if typ, err := e.Cache.Type(series); err == nil {
return typ, nil
}
typ, err := e.FileStore.Type(series)
if err != nil {
return 0, err
}
switch typ {
case BlockFloat64:
return models.Float, nil
case BlockInteger:
return models.Integer, nil
case BlockUnsigned:
return models.Unsigned, nil
case BlockString:
return models.String, nil
case BlockBoolean:
return models.Boolean, nil
}
return 0, tsdb.ErrUnknownFieldType
}
func (e *Engine) seriesCost(seriesKey, field string, tmin, tmax int64) query.IteratorCost {
key := SeriesFieldKeyBytes(seriesKey, field)
c := e.FileStore.Cost(key, tmin, tmax)
// Retrieve the range of values within the cache.
cacheValues := e.Cache.Values(key)
c.CachedValues = int64(len(cacheValues.Include(tmin, tmax)))
return c
}
// SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID.
func SeriesFieldKey(seriesKey, field string) string {
return seriesKey + keyFieldSeparator + field
}
func SeriesFieldKeyBytes(seriesKey, field string) []byte {
b := make([]byte, len(seriesKey)+len(keyFieldSeparator)+len(field))
i := copy(b, seriesKey)
i += copy(b[i:], keyFieldSeparatorBytes)
copy(b[i:], field)
return b
}
var (
blockToFieldType = [8]influxql.DataType{
BlockFloat64: influxql.Float,
BlockInteger: influxql.Integer,
BlockBoolean: influxql.Boolean,
BlockString: influxql.String,
BlockUnsigned: influxql.Unsigned,
5: influxql.Unknown,
6: influxql.Unknown,
7: influxql.Unknown,
}
)
func BlockTypeToInfluxQLDataType(typ byte) influxql.DataType { return blockToFieldType[typ&7] }
// SeriesAndFieldFromCompositeKey returns the series key and the field key extracted from the composite key.
func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, []byte) {
series, field, _ := bytes.Cut(key, keyFieldSeparatorBytes)
return series, field
}
func varRefSliceContains(a []influxql.VarRef, v string) bool {
for _, ref := range a {
if ref.Val == v {
return true
}
}
return false
}
func varRefSliceRemove(a []influxql.VarRef, v string) []influxql.VarRef {
if !varRefSliceContains(a, v) {
return a
}
other := make([]influxql.VarRef, 0, len(a))
for _, ref := range a {
if ref.Val != v {
other = append(other, ref)
}
}
return other
}
const reindexBatchSize = 10000
func (e *Engine) Reindex() error {
keys := make([][]byte, reindexBatchSize)
seriesKeys := make([][]byte, reindexBatchSize)
names := make([][]byte, reindexBatchSize)
tags := make([]models.Tags, reindexBatchSize)
n := 0
reindexBatch := func() error {
if n == 0 {
return nil
}
for i, key := range keys[:n] {
seriesKeys[i], _ = SeriesAndFieldFromCompositeKey(key)
names[i], tags[i] = models.ParseKeyBytes(seriesKeys[i])
e.traceLogger.Debug(
"Read series during reindexing",
logger.Shard(e.id),
zap.String("name", string(names[i])),
zap.String("tags", tags[i].String()),
)
}
e.logger.Debug("Reindexing data batch", logger.Shard(e.id), zap.Int("batch_size", n))
if err := e.index.CreateSeriesListIfNotExists(seriesKeys[:n], names[:n], tags[:n]); err != nil {
return err
}
n = 0
return nil
}
reindexKey := func(key []byte) error {
keys[n] = key
n++
if n < reindexBatchSize {
return nil
}
return reindexBatch()
}
// Index data stored in TSM files.
e.logger.Info("Reindexing TSM data", logger.Shard(e.id))
if err := e.FileStore.WalkKeys(nil, func(key []byte, _ byte) error {
return reindexKey(key)
}); err != nil {
return err
}
// Make sure all TSM data is indexed.
if err := reindexBatch(); err != nil {
return err
}
if !e.WALEnabled {
// All done.
return nil
}
// Reindex data stored in the WAL cache.
e.logger.Info("Reindexing WAL data", logger.Shard(e.id))
for _, key := range e.Cache.Keys() {
if err := reindexKey(key); err != nil {
return err
}
}
// Make sure all WAL data is indexed.
return reindexBatch()
}