3217 lines
92 KiB
Go
3217 lines
92 KiB
Go
// Package tsm1 provides a TSDB in the Time Structured Merge tree format.
|
|
package tsm1 // import "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/v2/influxql/query"
|
|
"github.com/influxdata/influxdb/v2/logger"
|
|
"github.com/influxdata/influxdb/v2/models"
|
|
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
|
|
"github.com/influxdata/influxdb/v2/pkg/estimator"
|
|
"github.com/influxdata/influxdb/v2/pkg/file"
|
|
"github.com/influxdata/influxdb/v2/pkg/limiter"
|
|
"github.com/influxdata/influxdb/v2/pkg/metrics"
|
|
"github.com/influxdata/influxdb/v2/pkg/radix"
|
|
intar "github.com/influxdata/influxdb/v2/pkg/tar"
|
|
"github.com/influxdata/influxdb/v2/pkg/tracing"
|
|
"github.com/influxdata/influxdb/v2/tsdb"
|
|
_ "github.com/influxdata/influxdb/v2/tsdb/index"
|
|
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
|
"github.com/influxdata/influxql"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
//go:generate -command tmpl go run github.com/benbjohnson/tmpl
|
|
//go:generate tmpl -data=@iterator.gen.go.tmpldata iterator.gen.go.tmpl engine.gen.go.tmpl array_cursor.gen.go.tmpl array_cursor_iterator.gen.go.tmpl
|
|
// The file store generate uses a custom modified tmpl
|
|
// to support adding templated data from the command line.
|
|
// This can probably be worked into the upstream tmpl
|
|
// but isn't at the moment.
|
|
//go:generate go run ../../../tools/tmpl -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store.gen.go
|
|
//go:generate go run ../../../tools/tmpl -d isArray=y -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store_array.gen.go
|
|
//go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl
|
|
//go:generate tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl
|
|
//go:generate tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl
|
|
|
|
func init() {
|
|
tsdb.RegisterEngine("tsm1", NewEngine)
|
|
}
|
|
|
|
var (
|
|
// Ensure Engine implements the interface.
|
|
_ tsdb.Engine = &Engine{}
|
|
// Static objects to prevent small allocs.
|
|
timeBytes = []byte("time")
|
|
keyFieldSeparatorBytes = []byte(keyFieldSeparator)
|
|
emptyBytes = []byte{}
|
|
)
|
|
|
|
var (
|
|
tsmGroup = metrics.MustRegisterGroup("tsm1")
|
|
numberOfRefCursorsCounter = metrics.MustRegisterCounter("cursors_ref", metrics.WithGroup(tsmGroup))
|
|
numberOfAuxCursorsCounter = metrics.MustRegisterCounter("cursors_aux", metrics.WithGroup(tsmGroup))
|
|
numberOfCondCursorsCounter = metrics.MustRegisterCounter("cursors_cond", metrics.WithGroup(tsmGroup))
|
|
planningTimer = metrics.MustRegisterTimer("planning_time", metrics.WithGroup(tsmGroup))
|
|
)
|
|
|
|
const (
|
|
// keyFieldSeparator separates the series key from the field name in the composite key
|
|
// that identifies a specific field in series
|
|
keyFieldSeparator = "#!~#"
|
|
|
|
// deleteFlushThreshold is the size in bytes of a batch of series keys to delete.
|
|
deleteFlushThreshold = 50 * 1024 * 1024
|
|
)
|
|
|
|
// Engine represents a storage engine with compressed blocks.
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
|
|
index tsdb.Index
|
|
|
|
// The following group of fields is used to track the state of level compactions within the
|
|
// Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is
|
|
// used to signal those goroutines to shutdown. Every request to disable level compactions will
|
|
// call 'Wait' on 'wg', with the first goroutine to arrive (levelWorkers == 0 while holding the
|
|
// lock) will close the done channel and re-assign 'nil' to the variable. Re-enabling will
|
|
// decrease 'levelWorkers', and when it decreases to zero, level compactions will be started
|
|
// back up again.
|
|
|
|
wg *sync.WaitGroup // waitgroup for active level compaction goroutines
|
|
done chan struct{} // channel to signal level compactions to stop
|
|
levelWorkers int // Number of "workers" that expect compactions to be in a disabled state
|
|
|
|
snapDone chan struct{} // channel to signal snapshot compactions to stop
|
|
snapWG *sync.WaitGroup // waitgroup for running snapshot compactions
|
|
|
|
id uint64
|
|
path string
|
|
sfile *tsdb.SeriesFile
|
|
logger *zap.Logger // Logger to be used for important messages
|
|
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
|
|
traceLogging bool
|
|
|
|
fieldset *tsdb.MeasurementFieldSet
|
|
|
|
WAL *WAL
|
|
Cache *Cache
|
|
Compactor *Compactor
|
|
CompactionPlan CompactionPlanner
|
|
FileStore *FileStore
|
|
|
|
MaxPointsPerBlock int
|
|
|
|
// CacheFlushMemorySizeThreshold specifies the minimum size threshold for
|
|
// the cache when the engine should write a snapshot to a TSM file
|
|
CacheFlushMemorySizeThreshold uint64
|
|
|
|
// CacheFlushWriteColdDuration specifies the length of time after which if
|
|
// no writes have been committed to the WAL, the engine will write
|
|
// a snapshot of the cache to a TSM file
|
|
CacheFlushWriteColdDuration time.Duration
|
|
|
|
// WALEnabled determines whether writes to the WAL are enabled. If this is false,
|
|
// writes will only exist in the cache and can be lost if a snapshot has not occurred.
|
|
WALEnabled bool
|
|
|
|
// Invoked when creating a backup file "as new".
|
|
formatFileName FormatFileNameFunc
|
|
|
|
// Controls whether to enabled compactions when the engine is open
|
|
enableCompactionsOnOpen bool
|
|
|
|
stats *compactionMetrics
|
|
|
|
activeCompactions *compactionCounter
|
|
|
|
// Limiter for concurrent compactions.
|
|
compactionLimiter limiter.Fixed
|
|
|
|
scheduler *scheduler
|
|
|
|
// provides access to the total set of series IDs
|
|
seriesIDSets tsdb.SeriesIDSets
|
|
|
|
// seriesTypeMap maps a series key to field type
|
|
seriesTypeMap *radix.Tree
|
|
|
|
// muDigest ensures only one goroutine can generate a digest at a time.
|
|
muDigest sync.RWMutex
|
|
}
|
|
|
|
// NewEngine returns a new instance of Engine.
|
|
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine {
|
|
etags := tsdb.EngineTags{
|
|
Path: path,
|
|
WalPath: walPath,
|
|
Id: fmt.Sprintf("%d", id),
|
|
Bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db
|
|
EngineVersion: opt.EngineVersion,
|
|
}
|
|
|
|
var wal *WAL
|
|
if opt.WALEnabled {
|
|
wal = NewWAL(walPath, opt.Config.WALMaxConcurrentWrites, opt.Config.WALMaxWriteDelay, etags)
|
|
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
|
|
}
|
|
|
|
fs := NewFileStore(path, etags)
|
|
fs.openLimiter = opt.OpenLimiter
|
|
if opt.FileStoreObserver != nil {
|
|
fs.WithObserver(opt.FileStoreObserver)
|
|
}
|
|
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed
|
|
|
|
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags)
|
|
|
|
c := NewCompactor()
|
|
c.Dir = path
|
|
c.FileStore = fs
|
|
c.RateLimit = opt.CompactionThroughputLimiter
|
|
|
|
var planner CompactionPlanner = NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration))
|
|
if opt.CompactionPlannerCreator != nil {
|
|
planner = opt.CompactionPlannerCreator(opt.Config).(CompactionPlanner)
|
|
planner.SetFileStore(fs)
|
|
}
|
|
|
|
stats := newEngineMetrics(etags)
|
|
activeCompactions := &compactionCounter{}
|
|
e := &Engine{
|
|
id: id,
|
|
path: path,
|
|
index: idx,
|
|
sfile: sfile,
|
|
logger: zap.NewNop(),
|
|
traceLogger: zap.NewNop(),
|
|
traceLogging: opt.Config.TraceLoggingEnabled,
|
|
|
|
WAL: wal,
|
|
Cache: cache,
|
|
|
|
FileStore: fs,
|
|
Compactor: c,
|
|
CompactionPlan: planner,
|
|
|
|
activeCompactions: activeCompactions,
|
|
scheduler: newScheduler(activeCompactions, opt.CompactionLimiter.Capacity()),
|
|
|
|
CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize),
|
|
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
|
enableCompactionsOnOpen: true,
|
|
WALEnabled: opt.WALEnabled,
|
|
formatFileName: DefaultFormatFileName,
|
|
stats: stats,
|
|
compactionLimiter: opt.CompactionLimiter,
|
|
seriesIDSets: opt.SeriesIDSets,
|
|
}
|
|
|
|
// Feature flag to enable per-series type checking, by default this is off and
|
|
// e.seriesTypeMap will be nil.
|
|
if os.Getenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED") != "" {
|
|
e.seriesTypeMap = radix.New()
|
|
}
|
|
|
|
if e.traceLogging {
|
|
fs.enableTraceLogging(true)
|
|
if e.WALEnabled {
|
|
e.WAL.enableTraceLogging(true)
|
|
}
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *Engine) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
|
|
e.Compactor.WithFormatFileNameFunc(formatFileNameFunc)
|
|
e.formatFileName = formatFileNameFunc
|
|
}
|
|
|
|
func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
|
|
e.FileStore.WithParseFileNameFunc(parseFileNameFunc)
|
|
e.Compactor.WithParseFileNameFunc(parseFileNameFunc)
|
|
}
|
|
|
|
// Digest returns a reader for the shard's digest.
|
|
func (e *Engine) Digest() (io.ReadCloser, int64, error) {
|
|
e.muDigest.Lock()
|
|
defer e.muDigest.Unlock()
|
|
|
|
log, logEnd := logger.NewOperation(context.TODO(), e.logger, "Engine digest", "tsm1_digest")
|
|
defer logEnd()
|
|
|
|
log.Info("Starting digest", zap.String("tsm1_path", e.path))
|
|
|
|
digestPath := filepath.Join(e.path, DigestFilename)
|
|
|
|
// Get a list of tsm file paths from the FileStore.
|
|
files := e.FileStore.Files()
|
|
tsmfiles := make([]string, 0, len(files))
|
|
for _, f := range files {
|
|
tsmfiles = append(tsmfiles, f.Path())
|
|
}
|
|
|
|
// See if there's a fresh digest cached on disk.
|
|
fresh, reason := DigestFresh(e.path, tsmfiles, e.LastModified())
|
|
if fresh {
|
|
f, err := os.Open(digestPath)
|
|
if err == nil {
|
|
fi, err := f.Stat()
|
|
if err != nil {
|
|
log.Info("Digest aborted, couldn't stat digest file", logger.Shard(e.id), zap.Error(err))
|
|
return nil, 0, err
|
|
}
|
|
|
|
log.Info("Digest is fresh", logger.Shard(e.id), zap.String("path", digestPath))
|
|
|
|
// Return the cached digest.
|
|
return f, fi.Size(), nil
|
|
}
|
|
}
|
|
|
|
log.Info("Digest stale", logger.Shard(e.id), zap.String("reason", reason))
|
|
|
|
// Either no digest existed or the existing one was stale
|
|
// so generate a new digest.
|
|
|
|
// Make sure the directory exists, in case it was deleted for some reason.
|
|
if err := os.MkdirAll(e.path, 0777); err != nil {
|
|
log.Info("Digest aborted, problem creating shard directory path", zap.Error(err))
|
|
return nil, 0, err
|
|
}
|
|
|
|
// Create a tmp file to write the digest to.
|
|
tf, err := os.Create(digestPath + ".tmp")
|
|
if err != nil {
|
|
log.Info("Digest aborted, problem creating tmp digest", zap.Error(err))
|
|
return nil, 0, err
|
|
}
|
|
|
|
// Write the new digest to the tmp file.
|
|
if err := Digest(e.path, tsmfiles, tf); err != nil {
|
|
log.Info("Digest aborted, problem writing tmp digest", zap.Error(err))
|
|
tf.Close()
|
|
os.Remove(tf.Name())
|
|
return nil, 0, err
|
|
}
|
|
|
|
// Rename the temporary digest file to the actual digest file.
|
|
if err := file.RenameFile(tf.Name(), digestPath); err != nil {
|
|
log.Info("Digest aborted, problem renaming tmp digest", zap.Error(err))
|
|
return nil, 0, err
|
|
}
|
|
|
|
// Create and return a reader for the new digest file.
|
|
f, err := os.Open(digestPath)
|
|
if err != nil {
|
|
log.Info("Digest aborted, opening new digest", zap.Error(err))
|
|
return nil, 0, err
|
|
}
|
|
|
|
fi, err := f.Stat()
|
|
if err != nil {
|
|
log.Info("Digest aborted, can't stat new digest", zap.Error(err))
|
|
f.Close()
|
|
return nil, 0, err
|
|
}
|
|
|
|
log.Info("Digest written", zap.String("tsm1_digest_path", digestPath), zap.Int64("size", fi.Size()))
|
|
|
|
return f, fi.Size(), nil
|
|
}
|
|
|
|
// SetEnabled sets whether the engine is enabled.
|
|
func (e *Engine) SetEnabled(enabled bool) {
|
|
e.enableCompactionsOnOpen = enabled
|
|
e.SetCompactionsEnabled(enabled)
|
|
}
|
|
|
|
// SetCompactionsEnabled enables compactions on the engine. When disabled
|
|
// all running compactions are aborted and new compactions stop running.
|
|
func (e *Engine) SetCompactionsEnabled(enabled bool) {
|
|
if enabled {
|
|
e.enableSnapshotCompactions()
|
|
e.enableLevelCompactions(false)
|
|
} else {
|
|
e.disableSnapshotCompactions()
|
|
e.disableLevelCompactions(false)
|
|
}
|
|
}
|
|
|
|
// enableLevelCompactions will request that level compactions start back up again
|
|
//
|
|
// 'wait' signifies that a corresponding call to disableLevelCompactions(true) was made at some
|
|
// point, and the associated task that required disabled compactions is now complete
|
|
func (e *Engine) enableLevelCompactions(wait bool) {
|
|
// If we don't need to wait, see if we're already enabled
|
|
if !wait {
|
|
e.mu.RLock()
|
|
if e.done != nil {
|
|
e.mu.RUnlock()
|
|
return
|
|
}
|
|
e.mu.RUnlock()
|
|
}
|
|
|
|
e.mu.Lock()
|
|
if wait {
|
|
e.levelWorkers -= 1
|
|
}
|
|
if e.levelWorkers != 0 || e.done != nil {
|
|
// still waiting on more workers or already enabled
|
|
e.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// last one to enable, start things back up
|
|
e.Compactor.EnableCompactions()
|
|
e.done = make(chan struct{})
|
|
wg := new(sync.WaitGroup)
|
|
wg.Add(1)
|
|
e.wg = wg
|
|
e.mu.Unlock()
|
|
|
|
go func() { defer wg.Done(); e.compact(wg) }()
|
|
}
|
|
|
|
// disableLevelCompactions will stop level compactions before returning.
|
|
//
|
|
// If 'wait' is set to true, then a corresponding call to enableLevelCompactions(true) will be
|
|
// required before level compactions will start back up again.
|
|
func (e *Engine) disableLevelCompactions(wait bool) {
|
|
e.mu.Lock()
|
|
old := e.levelWorkers
|
|
if wait {
|
|
e.levelWorkers += 1
|
|
}
|
|
|
|
// Hold onto the current done channel so we can wait on it if necessary
|
|
waitCh := e.done
|
|
wg := e.wg
|
|
|
|
if old == 0 && e.done != nil {
|
|
// It's possible we have closed the done channel and released the lock and another
|
|
// goroutine has attempted to disable compactions. We're current in the process of
|
|
// disabling them so check for this and wait until the original completes.
|
|
select {
|
|
case <-e.done:
|
|
e.mu.Unlock()
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Prevent new compactions from starting
|
|
e.Compactor.DisableCompactions()
|
|
|
|
// Stop all background compaction goroutines
|
|
close(e.done)
|
|
e.mu.Unlock()
|
|
wg.Wait()
|
|
|
|
// Signal that all goroutines have exited.
|
|
e.mu.Lock()
|
|
e.done = nil
|
|
e.mu.Unlock()
|
|
return
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
// Compaction were already disabled.
|
|
if waitCh == nil {
|
|
return
|
|
}
|
|
|
|
// We were not the first caller to disable compactions and they were in the process
|
|
// of being disabled. Wait for them to complete before returning.
|
|
<-waitCh
|
|
wg.Wait()
|
|
}
|
|
|
|
func (e *Engine) enableSnapshotCompactions() {
|
|
// Check if already enabled under read lock
|
|
e.mu.RLock()
|
|
if e.snapDone != nil {
|
|
e.mu.RUnlock()
|
|
return
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
// Check again under write lock
|
|
e.mu.Lock()
|
|
if e.snapDone != nil {
|
|
e.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
e.Compactor.EnableSnapshots()
|
|
e.snapDone = make(chan struct{})
|
|
wg := new(sync.WaitGroup)
|
|
wg.Add(1)
|
|
e.snapWG = wg
|
|
e.mu.Unlock()
|
|
|
|
go func() { defer wg.Done(); e.compactCache() }()
|
|
}
|
|
|
|
func (e *Engine) disableSnapshotCompactions() {
|
|
e.mu.Lock()
|
|
if e.snapDone == nil {
|
|
e.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// We may be in the process of stopping snapshots. See if the channel
|
|
// was closed.
|
|
select {
|
|
case <-e.snapDone:
|
|
e.mu.Unlock()
|
|
return
|
|
default:
|
|
}
|
|
|
|
// first one here, disable and wait for completion
|
|
close(e.snapDone)
|
|
e.Compactor.DisableSnapshots()
|
|
wg := e.snapWG
|
|
e.mu.Unlock()
|
|
|
|
// Wait for the snapshot goroutine to exit.
|
|
wg.Wait()
|
|
|
|
// Signal that the goroutines are exit and everything is stopped by setting
|
|
// snapDone to nil.
|
|
e.mu.Lock()
|
|
e.snapDone = nil
|
|
e.mu.Unlock()
|
|
|
|
// If the cache is empty, free up its resources as well.
|
|
if e.Cache.Size() == 0 {
|
|
e.Cache.Free()
|
|
}
|
|
}
|
|
|
|
// ScheduleFullCompaction will force the engine to fully compact all data stored.
|
|
// This will cancel and running compactions and snapshot any data in the cache to
|
|
// TSM files. This is an expensive operation.
|
|
func (e *Engine) ScheduleFullCompaction() error {
|
|
// Snapshot any data in the cache
|
|
if err := e.WriteSnapshot(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Cancel running compactions
|
|
e.SetCompactionsEnabled(false)
|
|
|
|
// Ensure compactions are restarted
|
|
defer e.SetCompactionsEnabled(true)
|
|
|
|
// Force the planner to only create a full plan.
|
|
e.CompactionPlan.ForceFull()
|
|
return nil
|
|
}
|
|
|
|
// Path returns the path the engine was opened with.
|
|
func (e *Engine) Path() string { return e.path }
|
|
|
|
func (e *Engine) MeasurementExists(name []byte) (bool, error) {
|
|
return e.index.MeasurementExists(name)
|
|
}
|
|
|
|
func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
|
return e.index.MeasurementNamesByRegex(re)
|
|
}
|
|
|
|
// MeasurementFieldSet returns the measurement field set.
|
|
func (e *Engine) MeasurementFieldSet() *tsdb.MeasurementFieldSet {
|
|
return e.fieldset
|
|
}
|
|
|
|
// MeasurementFields returns the measurement fields for a measurement.
|
|
func (e *Engine) MeasurementFields(measurement []byte) *tsdb.MeasurementFields {
|
|
return e.fieldset.CreateFieldsIfNotExists(measurement)
|
|
}
|
|
|
|
func (e *Engine) HasTagKey(name, key []byte) (bool, error) {
|
|
return e.index.HasTagKey(name, key)
|
|
}
|
|
|
|
func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
|
|
return e.index.MeasurementTagKeysByExpr(name, expr)
|
|
}
|
|
|
|
func (e *Engine) TagKeyCardinality(name, key []byte) int {
|
|
return e.index.TagKeyCardinality(name, key)
|
|
}
|
|
|
|
// SeriesN returns the unique number of series in the index.
|
|
func (e *Engine) SeriesN() int64 {
|
|
return e.index.SeriesN()
|
|
}
|
|
|
|
// MeasurementsSketches returns sketches that describe the cardinality of the
|
|
// measurements in this shard and measurements that were in this shard, but have
|
|
// been tombstoned.
|
|
func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
|
return e.index.MeasurementsSketches()
|
|
}
|
|
|
|
// SeriesSketches returns sketches that describe the cardinality of the
|
|
// series in this shard and series that were in this shard, but have
|
|
// been tombstoned.
|
|
func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
|
return e.index.SeriesSketches()
|
|
}
|
|
|
|
// LastModified returns the time when this shard was last modified.
|
|
func (e *Engine) LastModified() time.Time {
|
|
fsTime := e.FileStore.LastModified()
|
|
|
|
if e.WALEnabled && e.WAL.LastWriteTime().After(fsTime) {
|
|
return e.WAL.LastWriteTime()
|
|
}
|
|
|
|
return fsTime
|
|
}
|
|
|
|
var globalCompactionMetrics *compactionMetrics = newAllCompactionMetrics(tsdb.EngineLabelNames())
|
|
|
|
// PrometheusCollectors returns all prometheus metrics for the tsm1 package.
|
|
func PrometheusCollectors() []prometheus.Collector {
|
|
collectors := []prometheus.Collector{
|
|
globalCompactionMetrics.Duration,
|
|
globalCompactionMetrics.Active,
|
|
globalCompactionMetrics.Failed,
|
|
globalCompactionMetrics.Queued,
|
|
}
|
|
collectors = append(collectors, FileStoreCollectors()...)
|
|
collectors = append(collectors, CacheCollectors()...)
|
|
collectors = append(collectors, WALCollectors()...)
|
|
return collectors
|
|
}
|
|
|
|
const (
|
|
storageNamespace = "storage"
|
|
engineSubsystem = "compactions"
|
|
level1 = "1"
|
|
level2 = "2"
|
|
level3 = "3"
|
|
levelOpt = "opt"
|
|
levelFull = "full"
|
|
levelKey = "level"
|
|
levelCache = "cache"
|
|
)
|
|
|
|
func labelForLevel(l int) prometheus.Labels {
|
|
switch l {
|
|
case 1:
|
|
return prometheus.Labels{levelKey: level1}
|
|
case 2:
|
|
return prometheus.Labels{levelKey: level2}
|
|
case 3:
|
|
return prometheus.Labels{levelKey: level3}
|
|
}
|
|
panic(fmt.Sprintf("labelForLevel: level out of range %d", l))
|
|
}
|
|
|
|
func newAllCompactionMetrics(labelNames []string) *compactionMetrics {
|
|
labelNamesWithLevel := append(labelNames, levelKey)
|
|
return &compactionMetrics{
|
|
Duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: storageNamespace,
|
|
Subsystem: engineSubsystem,
|
|
Name: "duration_seconds",
|
|
Help: "Histogram of compactions by level since startup",
|
|
// 10 minute compactions seem normal, 1h40min is high
|
|
Buckets: []float64{60, 600, 6000},
|
|
}, labelNamesWithLevel),
|
|
Active: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Namespace: storageNamespace,
|
|
Subsystem: engineSubsystem,
|
|
Name: "active",
|
|
Help: "Gauge of compactions (by level) currently running",
|
|
}, labelNamesWithLevel),
|
|
Failed: prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: storageNamespace,
|
|
Subsystem: engineSubsystem,
|
|
Name: "failed",
|
|
Help: "Counter of TSM compactions (by level) that have failed due to error",
|
|
}, labelNamesWithLevel),
|
|
Queued: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Namespace: storageNamespace,
|
|
Subsystem: engineSubsystem,
|
|
Name: "queued",
|
|
Help: "Counter of TSM compactions (by level) that are currently queued",
|
|
}, labelNamesWithLevel),
|
|
}
|
|
}
|
|
|
|
type compactionCounter struct {
|
|
l1 int64
|
|
l2 int64
|
|
l3 int64
|
|
full int64
|
|
optimize int64
|
|
}
|
|
|
|
func (c *compactionCounter) countForLevel(l int) *int64 {
|
|
switch l {
|
|
case 1:
|
|
return &c.l1
|
|
case 2:
|
|
return &c.l2
|
|
case 3:
|
|
return &c.l3
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// engineMetrics holds statistics across all instantiated engines
|
|
type compactionMetrics struct {
|
|
Duration prometheus.ObserverVec
|
|
Active *prometheus.GaugeVec
|
|
Queued *prometheus.GaugeVec
|
|
Failed *prometheus.CounterVec
|
|
}
|
|
|
|
func newEngineMetrics(tags tsdb.EngineTags) *compactionMetrics {
|
|
engineLabels := tags.GetLabels()
|
|
return &compactionMetrics{
|
|
Duration: globalCompactionMetrics.Duration.MustCurryWith(engineLabels),
|
|
Active: globalCompactionMetrics.Active.MustCurryWith(engineLabels),
|
|
Failed: globalCompactionMetrics.Failed.MustCurryWith(engineLabels),
|
|
Queued: globalCompactionMetrics.Queued.MustCurryWith(engineLabels),
|
|
}
|
|
}
|
|
|
|
// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
|
|
func (e *Engine) DiskSize() int64 {
|
|
var walDiskSizeBytes int64
|
|
if e.WALEnabled {
|
|
walDiskSizeBytes = e.WAL.DiskSizeBytes()
|
|
}
|
|
return e.FileStore.DiskSizeBytes() + walDiskSizeBytes
|
|
}
|
|
|
|
// Open opens and initializes the engine.
|
|
func (e *Engine) Open(ctx context.Context) error {
|
|
if err := os.MkdirAll(e.path, 0777); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := e.cleanup(); err != nil {
|
|
return err
|
|
}
|
|
|
|
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"), e.logger)
|
|
if err != nil {
|
|
e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v. Rebuilding.", err))
|
|
}
|
|
|
|
e.mu.Lock()
|
|
e.fieldset = fields
|
|
e.mu.Unlock()
|
|
|
|
e.index.SetFieldSet(fields)
|
|
|
|
if e.WALEnabled {
|
|
if err := e.WAL.Open(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := e.FileStore.Open(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if e.WALEnabled {
|
|
if err := e.reloadCache(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
e.Compactor.Open()
|
|
|
|
if e.enableCompactionsOnOpen {
|
|
e.SetCompactionsEnabled(true)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the engine. Subsequent calls to Close are a nop.
|
|
func (e *Engine) Close() error {
|
|
e.SetCompactionsEnabled(false)
|
|
|
|
// Lock now and close everything else down.
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.done = nil // Ensures that the channel will not be closed again.
|
|
|
|
var err error = nil
|
|
err = e.fieldset.Close()
|
|
if err2 := e.FileStore.Close(); err2 != nil && err == nil {
|
|
err = err2
|
|
}
|
|
if e.WALEnabled {
|
|
if err2 := e.WAL.Close(); err2 != nil && err == nil {
|
|
err = err2
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// WithLogger sets the logger for the engine.
|
|
func (e *Engine) WithLogger(log *zap.Logger) {
|
|
e.logger = log.With(zap.String("engine", "tsm1"))
|
|
|
|
if e.traceLogging {
|
|
e.traceLogger = e.logger
|
|
}
|
|
|
|
if e.WALEnabled {
|
|
e.WAL.WithLogger(e.logger)
|
|
}
|
|
e.FileStore.WithLogger(e.logger)
|
|
}
|
|
|
|
// LoadMetadataIndex loads the shard metadata into memory.
|
|
//
|
|
// Note, it not safe to call LoadMetadataIndex concurrently. LoadMetadataIndex
|
|
// should only be called when initialising a new Engine.
|
|
func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
|
|
now := time.Now()
|
|
|
|
// Save reference to index for iterator creation.
|
|
e.index = index
|
|
|
|
// If we have the cached fields index on disk, we can skip scanning all the TSM files.
|
|
if !e.fieldset.IsEmpty() {
|
|
return nil
|
|
}
|
|
|
|
keys := make([][]byte, 0, 10000)
|
|
fieldTypes := make([]influxql.DataType, 0, 10000)
|
|
|
|
if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
|
|
fieldType := BlockTypeToInfluxQLDataType(typ)
|
|
if fieldType == influxql.Unknown {
|
|
return fmt.Errorf("unknown block type: %v", typ)
|
|
}
|
|
|
|
keys = append(keys, key)
|
|
fieldTypes = append(fieldTypes, fieldType)
|
|
if len(keys) == cap(keys) {
|
|
// Send batch of keys to the index.
|
|
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Reset buffers.
|
|
keys, fieldTypes = keys[:0], fieldTypes[:0]
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(keys) > 0 {
|
|
// Add remaining partial batch from FileStore.
|
|
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
|
|
return err
|
|
}
|
|
keys, fieldTypes = keys[:0], fieldTypes[:0]
|
|
}
|
|
|
|
// load metadata from the Cache
|
|
if err := e.Cache.ApplyEntryFn(func(key []byte, entry *entry) error {
|
|
fieldType, err := entry.values.InfluxQLType()
|
|
if err != nil {
|
|
e.logger.Info("Error getting the data type of values for key", zap.ByteString("key", key), zap.Error(err))
|
|
}
|
|
|
|
keys = append(keys, key)
|
|
fieldTypes = append(fieldTypes, fieldType)
|
|
if len(keys) == cap(keys) {
|
|
// Send batch of keys to the index.
|
|
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Reset buffers.
|
|
keys, fieldTypes = keys[:0], fieldTypes[:0]
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(keys) > 0 {
|
|
// Add remaining partial batch from FileStore.
|
|
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Save the field set index so we don't have to rebuild it next time
|
|
if err := e.fieldset.WriteToFile(); err != nil {
|
|
return err
|
|
}
|
|
|
|
e.traceLogger.Info("Meta data index for shard loaded", zap.Uint64("id", shardID), zap.Duration("duration", time.Since(now)))
|
|
return nil
|
|
}
|
|
|
|
// IsIdle returns true if the cache is empty, there are no running compactions and the
|
|
// shard is fully compacted.
|
|
func (e *Engine) IsIdle() (state bool, reason string) {
|
|
c := []struct {
|
|
ActiveCompactions *int64
|
|
LogMessage string
|
|
}{
|
|
// We don't actually track cache compactions: {&e.status.CacheCompactionsActive, "not idle because of active Cache compactions"},
|
|
{&e.activeCompactions.l1, "not idle because of active Level1 compactions"},
|
|
{&e.activeCompactions.l2, "not idle because of active Level2 compactions"},
|
|
{&e.activeCompactions.l3, "not idle because of active Level3 compactions"},
|
|
{&e.activeCompactions.full, "not idle because of active Full compactions"},
|
|
{&e.activeCompactions.optimize, "not idle because of active TSM Optimization compactions"},
|
|
}
|
|
|
|
for _, compactionState := range c {
|
|
count := atomic.LoadInt64(compactionState.ActiveCompactions)
|
|
if count > 0 {
|
|
return false, compactionState.LogMessage
|
|
}
|
|
}
|
|
|
|
if cacheSize := e.Cache.Size(); cacheSize > 0 {
|
|
return false, "not idle because cache size is nonzero"
|
|
} else if c, r := e.CompactionPlan.FullyCompacted(); !c {
|
|
return false, r
|
|
} else {
|
|
return true, ""
|
|
}
|
|
}
|
|
|
|
// Free releases any resources held by the engine to free up memory or CPU.
|
|
func (e *Engine) Free() error {
|
|
e.Cache.Free()
|
|
return e.FileStore.Free()
|
|
}
|
|
|
|
// Backup writes a tar archive of any TSM files modified since the passed
|
|
// in time to the passed in writer. The basePath will be prepended to the names
|
|
// of the files in the archive. It will force a snapshot of the WAL first
|
|
// then perform the backup with a read lock against the file store. This means
|
|
// that new TSM files will not be able to be created in this shard while the
|
|
// backup is running. For shards that are still actively getting writes, this
|
|
// could cause the WAL to backup, increasing memory usage and eventually rejecting writes.
|
|
func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
|
|
var err error
|
|
var path string
|
|
path, err = e.CreateSnapshot(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Remove the temporary snapshot dir
|
|
defer os.RemoveAll(path)
|
|
|
|
return intar.Stream(w, path, basePath, intar.SinceFilterTarFile(since))
|
|
}
|
|
|
|
func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
|
|
return func(fi os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
|
|
if !strings.HasSuffix(fi.Name(), ".tsm") {
|
|
return intar.StreamFile(fi, shardRelativePath, fullPath, tw)
|
|
}
|
|
|
|
f, err := os.Open(fullPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r, err := NewTSMReader(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Grab the tombstone file if one exists.
|
|
if ts := r.TombstoneStats(); ts.TombstoneExists {
|
|
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw)
|
|
}
|
|
|
|
min, max := r.TimeRange()
|
|
stun := start.UnixNano()
|
|
eun := end.UnixNano()
|
|
|
|
// We overlap time ranges, we need to filter the file
|
|
if min >= stun && min <= eun && max > eun || // overlap to the right
|
|
max >= stun && max <= eun && min < stun || // overlap to the left
|
|
min <= stun && max >= eun { // TSM file has a range LARGER than the boundary
|
|
err := e.filterFileToBackup(r, fi, shardRelativePath, fullPath, start.UnixNano(), end.UnixNano(), tw)
|
|
if err != nil {
|
|
if err := r.Close(); err != nil {
|
|
return err
|
|
}
|
|
return err
|
|
}
|
|
|
|
}
|
|
|
|
// above is the only case where we need to keep the reader open.
|
|
if err := r.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// the TSM file is 100% inside the range, so we can just write it without scanning each block
|
|
if min >= start.UnixNano() && max <= end.UnixNano() {
|
|
if err := intar.StreamFile(fi, shardRelativePath, fullPath, tw); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
|
|
path, err := e.CreateSnapshot(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Remove the temporary snapshot dir
|
|
defer os.RemoveAll(path)
|
|
|
|
return intar.Stream(w, path, basePath, e.timeStampFilterTarFile(start, end))
|
|
}
|
|
|
|
func (e *Engine) filterFileToBackup(r *TSMReader, fi os.FileInfo, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error {
|
|
path := fullPath + ".tmp"
|
|
out, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.Remove(path)
|
|
|
|
w, err := NewTSMWriter(out)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer w.Close()
|
|
|
|
// implicit else: here we iterate over the blocks and only keep the ones we really want.
|
|
bi := r.BlockIterator()
|
|
|
|
for bi.Next() {
|
|
// not concerned with typ or checksum since we are just blindly writing back, with no decoding
|
|
key, minTime, maxTime, _, _, buf, err := bi.Read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if minTime >= start && minTime <= end ||
|
|
maxTime >= start && maxTime <= end ||
|
|
minTime <= start && maxTime >= end {
|
|
err := w.WriteBlock(key, minTime, maxTime, buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := bi.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
err = w.WriteIndex()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// make sure the whole file is out to disk
|
|
if err := w.Flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
tmpFi, err := os.Stat(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return intar.StreamRenameFile(tmpFi, fi.Name(), shardRelativePath, path, tw)
|
|
}
|
|
|
|
// Restore reads a tar archive generated by Backup().
|
|
// Only files that match basePath will be copied into the directory. This obtains
|
|
// a write lock so no operations can be performed while restoring.
|
|
func (e *Engine) Restore(r io.Reader, basePath string) error {
|
|
return e.overlay(r, basePath, false)
|
|
}
|
|
|
|
// Import reads a tar archive generated by Backup() and adds each
|
|
// file matching basePath as a new TSM file. This obtains
|
|
// a write lock so no operations can be performed while Importing.
|
|
// If the import is successful, a full compaction is scheduled.
|
|
func (e *Engine) Import(r io.Reader, basePath string) error {
|
|
if err := e.overlay(r, basePath, true); err != nil {
|
|
return err
|
|
}
|
|
return e.ScheduleFullCompaction()
|
|
}
|
|
|
|
// overlay reads a tar archive generated by Backup() and adds each file
|
|
// from the archive matching basePath to the shard.
|
|
// If asNew is true, each file will be installed as a new TSM file even if an
|
|
// existing file with the same name in the backup exists.
|
|
func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
|
|
// Copy files from archive while under lock to prevent reopening.
|
|
newFiles, err := func() ([]string, error) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
var newFiles []string
|
|
tr := tar.NewReader(r)
|
|
for {
|
|
if fileName, err := e.readFileFromBackup(tr, basePath, asNew); err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return nil, err
|
|
} else if fileName != "" {
|
|
newFiles = append(newFiles, fileName)
|
|
}
|
|
}
|
|
|
|
if err := file.SyncDir(e.path); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// The filestore will only handle tsm files. Other file types will be ignored.
|
|
if err := e.FileStore.Replace(nil, newFiles); err != nil {
|
|
return nil, err
|
|
}
|
|
return newFiles, nil
|
|
}()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Load any new series keys to the index
|
|
tsmFiles := make([]TSMFile, 0, len(newFiles))
|
|
defer func() {
|
|
for _, r := range tsmFiles {
|
|
r.Close()
|
|
}
|
|
}()
|
|
|
|
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
|
|
for _, f := range newFiles {
|
|
// If asNew is true, the files created from readFileFromBackup will be new ones
|
|
// having a temp extension.
|
|
f = strings.TrimSuffix(f, ext)
|
|
if !strings.HasSuffix(f, TSMFileExtension) {
|
|
// This isn't a .tsm file.
|
|
continue
|
|
}
|
|
|
|
fd, err := os.Open(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r, err := NewTSMReader(fd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tsmFiles = append(tsmFiles, r)
|
|
}
|
|
|
|
// Merge and dedup all the series keys across each reader to reduce
|
|
// lock contention on the index.
|
|
keys := make([][]byte, 0, 10000)
|
|
fieldTypes := make([]influxql.DataType, 0, 10000)
|
|
|
|
ki := newMergeKeyIterator(tsmFiles, nil)
|
|
for ki.Next() {
|
|
key, typ := ki.Read()
|
|
fieldType := BlockTypeToInfluxQLDataType(typ)
|
|
if fieldType == influxql.Unknown {
|
|
return fmt.Errorf("unknown block type: %v", typ)
|
|
}
|
|
|
|
keys = append(keys, key)
|
|
fieldTypes = append(fieldTypes, fieldType)
|
|
|
|
if len(keys) == cap(keys) {
|
|
// Send batch of keys to the index.
|
|
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Reset buffers.
|
|
keys, fieldTypes = keys[:0], fieldTypes[:0]
|
|
}
|
|
}
|
|
|
|
if len(keys) > 0 {
|
|
// Add remaining partial batch.
|
|
if err := e.addToIndexFromKey(keys, fieldTypes); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return e.MeasurementFieldSet().WriteToFile()
|
|
}
|
|
|
|
// readFileFromBackup copies the next file from the archive into the shard.
|
|
// The file is skipped if it does not have a matching shardRelativePath prefix.
|
|
// If asNew is true, each file will be installed as a new TSM file even if an
|
|
// existing file with the same name in the backup exists.
|
|
func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, asNew bool) (string, error) {
|
|
// Read next archive file.
|
|
hdr, err := tr.Next()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if !strings.HasSuffix(hdr.Name, TSMFileExtension) {
|
|
// This isn't a .tsm file.
|
|
return "", nil
|
|
}
|
|
|
|
filename := filepath.Base(filepath.FromSlash(hdr.Name))
|
|
|
|
// If this is a directory entry (usually just `index` for tsi), create it an move on.
|
|
if hdr.Typeflag == tar.TypeDir {
|
|
if err := os.MkdirAll(filepath.Join(e.path, filename), os.FileMode(hdr.Mode).Perm()); err != nil {
|
|
return "", err
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
if asNew {
|
|
filename = e.formatFileName(e.FileStore.NextGeneration(), 1) + "." + TSMFileExtension
|
|
}
|
|
|
|
tmp := fmt.Sprintf("%s.%s", filepath.Join(e.path, filename), TmpTSMFileExtension)
|
|
// Create new file on disk.
|
|
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_RDWR, 0666)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer f.Close()
|
|
|
|
// Copy from archive to the file.
|
|
if _, err := io.CopyN(f, tr, hdr.Size); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Sync to disk & close.
|
|
if err := f.Sync(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return tmp, nil
|
|
}
|
|
|
|
// addToIndexFromKey will pull the measurement names, series keys, and field
|
|
// names from composite keys, and add them to the database index and measurement
|
|
// fields.
|
|
func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType) error {
|
|
var field []byte
|
|
names := make([][]byte, 0, len(keys))
|
|
tags := make([]models.Tags, 0, len(keys))
|
|
|
|
for i := 0; i < len(keys); i++ {
|
|
// Replace tsm key format with index key format.
|
|
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
|
|
name := models.ParseName(keys[i])
|
|
mf := e.fieldset.CreateFieldsIfNotExists(name)
|
|
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
|
|
return err
|
|
}
|
|
|
|
names = append(names, name)
|
|
tags = append(tags, models.ParseTags(keys[i]))
|
|
}
|
|
|
|
return e.index.CreateSeriesListIfNotExists(keys, names, tags)
|
|
}
|
|
|
|
// WritePoints writes metadata and point data into the engine.
|
|
// It returns an error if new points are added to an existing key.
|
|
func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
|
|
values := make(map[string][]Value, len(points))
|
|
var (
|
|
keyBuf []byte
|
|
baseLen int
|
|
seriesErr error
|
|
)
|
|
|
|
for _, p := range points {
|
|
keyBuf = append(keyBuf[:0], p.Key()...)
|
|
keyBuf = append(keyBuf, keyFieldSeparator...)
|
|
baseLen = len(keyBuf)
|
|
iter := p.FieldIterator()
|
|
t := p.Time().UnixNano()
|
|
for iter.Next() {
|
|
// Skip fields name "time", they are illegal
|
|
if bytes.Equal(iter.FieldKey(), timeBytes) {
|
|
continue
|
|
}
|
|
|
|
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
|
|
|
|
if e.seriesTypeMap != nil {
|
|
// Fast-path check to see if the field for the series already exists.
|
|
if v, ok := e.seriesTypeMap.Get(keyBuf); !ok {
|
|
if typ, err := e.Type(keyBuf); err != nil {
|
|
// Field type is unknown, we can try to add it.
|
|
} else if typ != iter.Type() {
|
|
// Existing type is different from what was passed in, we need to drop
|
|
// this write and refresh the series type map.
|
|
seriesErr = tsdb.ErrFieldTypeConflict
|
|
e.seriesTypeMap.Insert(keyBuf, int(typ))
|
|
continue
|
|
}
|
|
|
|
// Doesn't exist, so try to insert
|
|
vv, ok := e.seriesTypeMap.Insert(keyBuf, int(iter.Type()))
|
|
|
|
// We didn't insert and the type that exists isn't what we tried to insert, so
|
|
// we have a conflict and must drop this field/series.
|
|
if !ok || vv != int(iter.Type()) {
|
|
seriesErr = tsdb.ErrFieldTypeConflict
|
|
continue
|
|
}
|
|
} else if v != int(iter.Type()) {
|
|
// The series already exists, but with a different type. This is also a type conflict
|
|
// and we need to drop this field/series.
|
|
seriesErr = tsdb.ErrFieldTypeConflict
|
|
continue
|
|
}
|
|
}
|
|
|
|
var v Value
|
|
switch iter.Type() {
|
|
case models.Float:
|
|
fv, err := iter.FloatValue()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v = NewFloatValue(t, fv)
|
|
case models.Integer:
|
|
iv, err := iter.IntegerValue()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v = NewIntegerValue(t, iv)
|
|
case models.Unsigned:
|
|
iv, err := iter.UnsignedValue()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v = NewUnsignedValue(t, iv)
|
|
case models.String:
|
|
v = NewStringValue(t, iter.StringValue())
|
|
case models.Boolean:
|
|
bv, err := iter.BooleanValue()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v = NewBooleanValue(t, bv)
|
|
default:
|
|
return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())
|
|
}
|
|
values[string(keyBuf)] = append(values[string(keyBuf)], v)
|
|
}
|
|
}
|
|
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
// first try to write to the cache
|
|
if err := e.Cache.WriteMulti(values); err != nil {
|
|
return err
|
|
}
|
|
|
|
if e.WALEnabled {
|
|
if _, err := e.WAL.WriteMulti(ctx, values); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return seriesErr
|
|
}
|
|
|
|
// DeleteSeriesRange removes the values between min and max (inclusive) from all series
|
|
func (e *Engine) DeleteSeriesRange(ctx context.Context, itr tsdb.SeriesIterator, min, max int64) error {
|
|
return e.DeleteSeriesRangeWithPredicate(ctx, itr, func(name []byte, tags models.Tags) (int64, int64, bool) {
|
|
return min, max, true
|
|
})
|
|
}
|
|
|
|
// DeleteSeriesRangeWithPredicate removes the values between min and max (inclusive) from all series
|
|
// for which predicate() returns true. If predicate() is nil, then all values in range are removed.
|
|
func (e *Engine) DeleteSeriesRangeWithPredicate(
|
|
ctx context.Context,
|
|
itr tsdb.SeriesIterator,
|
|
predicate func(name []byte, tags models.Tags) (int64, int64, bool),
|
|
) error {
|
|
var disableOnce bool
|
|
|
|
// Ensure that the index does not compact away the measurement or series we're
|
|
// going to delete before we're done with them.
|
|
if tsiIndex, ok := e.index.(*tsi1.Index); ok {
|
|
tsiIndex.DisableCompactions()
|
|
defer tsiIndex.EnableCompactions()
|
|
tsiIndex.Wait()
|
|
|
|
fs, err := tsiIndex.RetainFileSet()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer fs.Release()
|
|
}
|
|
|
|
var (
|
|
sz int
|
|
min, max int64 = math.MinInt64, math.MaxInt64
|
|
|
|
// Indicator that the min/max time for the current batch has changed and
|
|
// we need to flush the current batch before appending to it.
|
|
flushBatch bool
|
|
)
|
|
|
|
// These are reversed from min/max to ensure they are different the first time through.
|
|
newMin, newMax := int64(math.MaxInt64), int64(math.MinInt64)
|
|
|
|
// There is no predicate, so setup newMin/newMax to delete the full time range.
|
|
if predicate == nil {
|
|
newMin = min
|
|
newMax = max
|
|
}
|
|
|
|
batch := make([][]byte, 0, 10000)
|
|
for {
|
|
elem, err := itr.Next()
|
|
if err != nil {
|
|
return err
|
|
} else if elem == nil {
|
|
break
|
|
}
|
|
|
|
// See if the series should be deleted and if so, what range of time.
|
|
if predicate != nil {
|
|
var shouldDelete bool
|
|
newMin, newMax, shouldDelete = predicate(elem.Name(), elem.Tags())
|
|
if !shouldDelete {
|
|
continue
|
|
}
|
|
|
|
// If the min/max happens to change for the batch, we need to flush
|
|
// the current batch and start a new one.
|
|
flushBatch = (min != newMin || max != newMax) && len(batch) > 0
|
|
}
|
|
|
|
if elem.Expr() != nil {
|
|
if v, ok := elem.Expr().(*influxql.BooleanLiteral); !ok || !v.Val {
|
|
return errors.New("fields not supported in WHERE clause during deletion")
|
|
}
|
|
}
|
|
|
|
if !disableOnce {
|
|
// Disable and abort running compactions so that tombstones added existing tsm
|
|
// files don't get removed. This would cause deleted measurements/series to
|
|
// re-appear once the compaction completed. We only disable the level compactions
|
|
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
|
|
// and writing tombstones takes a long time, writes can get rejected due to the cache
|
|
// filling up.
|
|
e.disableLevelCompactions(true)
|
|
defer e.enableLevelCompactions(true)
|
|
|
|
e.sfile.DisableCompactions()
|
|
defer e.sfile.EnableCompactions()
|
|
e.sfile.Wait()
|
|
|
|
disableOnce = true
|
|
}
|
|
|
|
if sz >= deleteFlushThreshold || flushBatch {
|
|
// Delete all matching batch.
|
|
if err := e.deleteSeriesRange(ctx, batch, min, max); err != nil {
|
|
return err
|
|
}
|
|
batch = batch[:0]
|
|
sz = 0
|
|
flushBatch = false
|
|
}
|
|
|
|
// Use the new min/max time for the next iteration
|
|
min = newMin
|
|
max = newMax
|
|
|
|
key := models.MakeKey(elem.Name(), elem.Tags())
|
|
sz += len(key)
|
|
batch = append(batch, key)
|
|
}
|
|
|
|
if len(batch) > 0 {
|
|
// Delete all matching batch.
|
|
if err := e.deleteSeriesRange(ctx, batch, min, max); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteSeriesRange removes the values between min and max (inclusive) from all series. This
|
|
// does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange
|
|
// and not directly.
|
|
func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min, max int64) error {
|
|
if len(seriesKeys) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Min and max time in the engine are slightly different from the query language values.
|
|
if min == influxql.MinTime {
|
|
min = math.MinInt64
|
|
}
|
|
if max == influxql.MaxTime {
|
|
max = math.MaxInt64
|
|
}
|
|
|
|
var overlapsTimeRangeMinMax bool
|
|
var overlapsTimeRangeMinMaxLock sync.Mutex
|
|
e.FileStore.Apply(ctx, func(r TSMFile) error {
|
|
if r.OverlapsTimeRange(min, max) {
|
|
overlapsTimeRangeMinMaxLock.Lock()
|
|
overlapsTimeRangeMinMax = true
|
|
overlapsTimeRangeMinMaxLock.Unlock()
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if !overlapsTimeRangeMinMax && e.Cache.store.count() > 0 {
|
|
overlapsTimeRangeMinMax = true
|
|
}
|
|
|
|
if !overlapsTimeRangeMinMax {
|
|
return nil
|
|
}
|
|
|
|
// Ensure keys are sorted since lower layers require them to be.
|
|
if !bytesutil.IsSorted(seriesKeys) {
|
|
bytesutil.Sort(seriesKeys)
|
|
}
|
|
|
|
// Run the delete on each TSM file in parallel
|
|
if err := e.FileStore.Apply(ctx, func(r TSMFile) error {
|
|
// See if this TSM file contains the keys and time range
|
|
minKey, maxKey := seriesKeys[0], seriesKeys[len(seriesKeys)-1]
|
|
tsmMin, tsmMax := r.KeyRange()
|
|
|
|
tsmMin, _ = SeriesAndFieldFromCompositeKey(tsmMin)
|
|
tsmMax, _ = SeriesAndFieldFromCompositeKey(tsmMax)
|
|
|
|
overlaps := bytes.Compare(tsmMin, maxKey) <= 0 && bytes.Compare(tsmMax, minKey) >= 0
|
|
if !overlaps || !r.OverlapsTimeRange(min, max) {
|
|
return nil
|
|
}
|
|
|
|
// Delete each key we find in the file. We seek to the min key and walk from there.
|
|
batch := r.BatchDelete()
|
|
n := r.KeyCount()
|
|
var j int
|
|
for i := r.Seek(minKey); i < n; i++ {
|
|
indexKey, _ := r.KeyAt(i)
|
|
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
|
|
|
|
for j < len(seriesKeys) && bytes.Compare(seriesKeys[j], seriesKey) < 0 {
|
|
j++
|
|
}
|
|
|
|
if j >= len(seriesKeys) {
|
|
break
|
|
}
|
|
if bytes.Equal(seriesKeys[j], seriesKey) {
|
|
if err := batch.DeleteRange([][]byte{indexKey}, min, max); err != nil {
|
|
batch.Rollback()
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return batch.Commit()
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// find the keys in the cache and remove them
|
|
deleteKeys := make([][]byte, 0, len(seriesKeys))
|
|
|
|
// ApplySerialEntryFn cannot return an error in this invocation.
|
|
_ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
|
|
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
|
|
|
|
// Cache does not walk keys in sorted order, so search the sorted
|
|
// series we need to delete to see if any of the cache keys match.
|
|
i := bytesutil.SearchBytes(seriesKeys, seriesKey)
|
|
if i < len(seriesKeys) && bytes.Equal(seriesKey, seriesKeys[i]) {
|
|
// k is the measurement + tags + sep + field
|
|
deleteKeys = append(deleteKeys, k)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Sort the series keys because ApplyEntryFn iterates over the keys randomly.
|
|
bytesutil.Sort(deleteKeys)
|
|
|
|
e.Cache.DeleteRange(deleteKeys, min, max)
|
|
|
|
// delete from the WAL
|
|
if e.WALEnabled {
|
|
if _, err := e.WAL.DeleteRange(ctx, deleteKeys, min, max); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// The series are deleted on disk, but the index may still say they exist.
|
|
// Depending on the min,max time passed in, the series may or not actually
|
|
// exists now. To reconcile the index, we walk the series keys that still exists
|
|
// on disk and cross out any keys that match the passed in series. Any series
|
|
// left in the slice at the end do not exist and can be deleted from the index.
|
|
// Note: this is inherently racy if writes are occurring to the same measurement/series are
|
|
// being removed. A write could occur and exist in the cache at this point, but we
|
|
// would delete it from the index.
|
|
minKey := seriesKeys[0]
|
|
|
|
// Apply runs this func concurrently. The seriesKeys slice is mutated concurrently
|
|
// by different goroutines setting positions to nil.
|
|
if err := e.FileStore.Apply(ctx, func(r TSMFile) error {
|
|
n := r.KeyCount()
|
|
var j int
|
|
|
|
// Start from the min deleted key that exists in this file.
|
|
for i := r.Seek(minKey); i < n; i++ {
|
|
if j >= len(seriesKeys) {
|
|
return nil
|
|
}
|
|
|
|
indexKey, _ := r.KeyAt(i)
|
|
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
|
|
|
|
// Skip over any deleted keys that are less than our tsm key
|
|
cmp := bytes.Compare(seriesKeys[j], seriesKey)
|
|
for j < len(seriesKeys) && cmp < 0 {
|
|
j++
|
|
if j >= len(seriesKeys) {
|
|
return nil
|
|
}
|
|
cmp = bytes.Compare(seriesKeys[j], seriesKey)
|
|
}
|
|
|
|
// We've found a matching key, cross it out so we do not remove it from the index.
|
|
if j < len(seriesKeys) && cmp == 0 {
|
|
seriesKeys[j] = emptyBytes
|
|
j++
|
|
}
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// The seriesKeys slice is mutated if they are still found in the cache.
|
|
cacheKeys := e.Cache.Keys()
|
|
for i := 0; i < len(seriesKeys); i++ {
|
|
seriesKey := seriesKeys[i]
|
|
// Already crossed out
|
|
if len(seriesKey) == 0 {
|
|
continue
|
|
}
|
|
|
|
j := bytesutil.SearchBytes(cacheKeys, seriesKey)
|
|
if j < len(cacheKeys) {
|
|
cacheSeriesKey, _ := SeriesAndFieldFromCompositeKey(cacheKeys[j])
|
|
if bytes.Equal(seriesKey, cacheSeriesKey) {
|
|
seriesKeys[i] = emptyBytes
|
|
}
|
|
}
|
|
}
|
|
|
|
// Have we deleted all values for the series? If so, we need to remove
|
|
// the series from the index.
|
|
hasDeleted := false
|
|
for _, k := range seriesKeys {
|
|
if len(k) > 0 {
|
|
hasDeleted = true
|
|
break
|
|
}
|
|
}
|
|
if hasDeleted {
|
|
buf := make([]byte, 1024) // For use when accessing series file.
|
|
ids := tsdb.NewSeriesIDSet()
|
|
measurements := make(map[string]struct{}, 1)
|
|
|
|
for _, k := range seriesKeys {
|
|
if len(k) == 0 {
|
|
continue // This key was wiped because it shouldn't be removed from index.
|
|
}
|
|
|
|
name, tags := models.ParseKeyBytes(k)
|
|
sid := e.sfile.SeriesID(name, tags, buf)
|
|
if sid == 0 {
|
|
continue
|
|
}
|
|
|
|
// See if this series was found in the cache earlier
|
|
i := bytesutil.SearchBytes(deleteKeys, k)
|
|
|
|
var hasCacheValues bool
|
|
// If there are multiple fields, they will have the same prefix. If any field
|
|
// has values, then we can't delete it from the index.
|
|
for i < len(deleteKeys) && bytes.HasPrefix(deleteKeys[i], k) {
|
|
if e.Cache.Values(deleteKeys[i]).Len() > 0 {
|
|
hasCacheValues = true
|
|
break
|
|
}
|
|
i++
|
|
}
|
|
|
|
if hasCacheValues {
|
|
continue
|
|
}
|
|
|
|
measurements[string(name)] = struct{}{}
|
|
// Remove the series from the local index.
|
|
if err := e.index.DropSeries(sid, k, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Add the id to the set of delete ids.
|
|
ids.Add(sid)
|
|
}
|
|
|
|
actuallyDeleted := make([]string, 0, len(measurements))
|
|
for k := range measurements {
|
|
if dropped, err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil {
|
|
return err
|
|
} else if dropped {
|
|
if deleted, err := e.cleanupMeasurement([]byte(k)); err != nil {
|
|
return err
|
|
} else if deleted {
|
|
actuallyDeleted = append(actuallyDeleted, k)
|
|
}
|
|
}
|
|
}
|
|
if len(actuallyDeleted) > 0 {
|
|
if err := e.fieldset.Save(tsdb.MeasurementsToFieldChangeDeletions(actuallyDeleted)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Remove any series IDs for our set that still exist in other shards.
|
|
// We cannot remove these from the series file yet.
|
|
if err := e.seriesIDSets.ForEach(func(s *tsdb.SeriesIDSet) {
|
|
ids = ids.AndNot(s)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove the remaining ids from the series file as they no longer exist
|
|
// in any shard.
|
|
var err error
|
|
ids.ForEach(func(id uint64) {
|
|
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
|
|
err = err1
|
|
return
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Engine) cleanupMeasurement(name []byte) (deleted bool, err error) {
|
|
// A sentinel error message to cause DeleteWithLock to not delete the measurement
|
|
abortErr := fmt.Errorf("measurements still exist")
|
|
|
|
// Under write lock, delete the measurement if we no longer have any data stored for
|
|
// the measurement. If data exists, we can't delete the field set yet as there
|
|
// were writes to the measurement while we are deleting it.
|
|
if err := e.fieldset.DeleteWithLock(string(name), func() error {
|
|
encodedName := models.EscapeMeasurement(name)
|
|
sep := len(encodedName)
|
|
|
|
// First scan the cache to see if any series exists for this measurement.
|
|
if err := e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
|
|
if bytes.HasPrefix(k, encodedName) && (k[sep] == ',' || k[sep] == keyFieldSeparator[0]) {
|
|
return abortErr
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check the filestore.
|
|
return e.FileStore.WalkKeys(name, func(k []byte, _ byte) error {
|
|
if bytes.HasPrefix(k, encodedName) && (k[sep] == ',' || k[sep] == keyFieldSeparator[0]) {
|
|
return abortErr
|
|
}
|
|
return nil
|
|
})
|
|
|
|
}); err != nil && err != abortErr {
|
|
// Something else failed, return it
|
|
return false, err
|
|
}
|
|
|
|
return err != abortErr, nil
|
|
}
|
|
|
|
// DeleteMeasurement deletes a measurement and all related series.
|
|
func (e *Engine) DeleteMeasurement(ctx context.Context, name []byte) error {
|
|
// Attempt to find the series keys.
|
|
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
|
itr, err := indexSet.MeasurementSeriesByExprIterator(name, nil)
|
|
if err != nil {
|
|
return err
|
|
} else if itr == nil {
|
|
return nil
|
|
}
|
|
defer itr.Close()
|
|
return e.DeleteSeriesRange(ctx, tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64)
|
|
}
|
|
|
|
// ForEachMeasurementName iterates over each measurement name in the engine.
|
|
func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error {
|
|
return e.index.ForEachMeasurementName(fn)
|
|
}
|
|
|
|
func (e *Engine) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error {
|
|
return e.index.CreateSeriesListIfNotExists(keys, names, tagsSlice)
|
|
}
|
|
|
|
func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
|
|
return e.index.CreateSeriesIfNotExists(key, name, tags)
|
|
}
|
|
|
|
// WriteTo is not implemented.
|
|
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
|
|
|
|
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
|
|
func (e *Engine) WriteSnapshot() (err error) {
|
|
// Lock and grab the cache snapshot along with all the closed WAL
|
|
// filenames associated with the snapshot
|
|
|
|
started := time.Now()
|
|
log, logEnd := logger.NewOperation(context.TODO(), e.logger, "Cache snapshot", "tsm1_cache_snapshot")
|
|
defer func() {
|
|
elapsed := time.Since(started)
|
|
if err != nil && err != errCompactionsDisabled {
|
|
e.stats.Failed.With(prometheus.Labels{levelKey: levelCache}).Inc()
|
|
}
|
|
e.stats.Duration.With(prometheus.Labels{levelKey: levelCache}).Observe(elapsed.Seconds())
|
|
if err == nil {
|
|
log.Info("Snapshot for path written", zap.String("path", e.path), zap.Duration("duration", elapsed))
|
|
}
|
|
logEnd()
|
|
}()
|
|
|
|
closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if e.WALEnabled {
|
|
if err = e.WAL.CloseSegment(); err != nil {
|
|
return
|
|
}
|
|
|
|
segments, err = e.WAL.ClosedSegments()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
snapshot, err = e.Cache.Snapshot()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if snapshot.Size() == 0 {
|
|
e.Cache.ClearSnapshot(true)
|
|
return nil
|
|
}
|
|
|
|
// The snapshotted cache may have duplicate points and unsorted data. We need to deduplicate
|
|
// it before writing the snapshot. This can be very expensive so it's done while we are not
|
|
// holding the engine write lock.
|
|
dedup := time.Now()
|
|
snapshot.Deduplicate()
|
|
e.traceLogger.Info("Snapshot for path deduplicated",
|
|
zap.String("path", e.path),
|
|
zap.Duration("duration", time.Since(dedup)))
|
|
|
|
return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
|
|
}
|
|
|
|
// CreateSnapshot will create a temp directory that holds
|
|
// temporary hardlinks to the underlying shard files.
|
|
// skipCacheOk controls whether it is permissible to fail writing out
|
|
// in-memory cache data when a previous snapshot is in progress.
|
|
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
|
|
err := e.WriteSnapshot()
|
|
for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 {
|
|
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
|
|
time.Sleep(backoff)
|
|
err = e.WriteSnapshot()
|
|
}
|
|
if err == ErrSnapshotInProgress && skipCacheOk {
|
|
e.logger.Warn("Snapshotter busy: proceeding without cache contents")
|
|
} else if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
path, err := e.FileStore.CreateSnapshot()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Generate a snapshot of the index.
|
|
return path, nil
|
|
}
|
|
|
|
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.
|
|
func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
e.Cache.ClearSnapshot(false)
|
|
}
|
|
}()
|
|
|
|
// write the new snapshot files
|
|
newFiles, err := e.Compactor.WriteSnapshot(snapshot, e.logger)
|
|
if err != nil {
|
|
log.Info("Error writing snapshot from compactor", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
// update the file store with these new files
|
|
if err := e.FileStore.Replace(nil, newFiles); err != nil {
|
|
log.Info("Error adding new TSM files from snapshot. Removing temp files.", zap.Error(err))
|
|
|
|
// Remove the new snapshot files. We will try again.
|
|
for _, file := range newFiles {
|
|
if err := os.Remove(file); err != nil {
|
|
log.Info("Unable to remove file", zap.String("path", file), zap.Error(err))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// clear the snapshot from the in-memory cache, then the old WAL files
|
|
e.Cache.ClearSnapshot(true)
|
|
|
|
if e.WALEnabled {
|
|
if err := e.WAL.Remove(closedFiles); err != nil {
|
|
log.Info("Error removing closed WAL segments", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// compactCache continually checks if the WAL cache should be written to disk.
|
|
func (e *Engine) compactCache() {
|
|
t := time.NewTicker(time.Second)
|
|
defer t.Stop()
|
|
for {
|
|
e.mu.RLock()
|
|
quit := e.snapDone
|
|
e.mu.RUnlock()
|
|
|
|
select {
|
|
case <-quit:
|
|
return
|
|
|
|
case <-t.C:
|
|
if e.ShouldCompactCache(time.Now()) {
|
|
e.traceLogger.Info("Compacting cache", zap.String("path", e.path))
|
|
err := e.WriteSnapshot()
|
|
if err != nil && err != errCompactionsDisabled {
|
|
e.logger.Info("Error writing snapshot", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ShouldCompactCache returns true if the Cache is over its flush threshold
|
|
// or if the passed in lastWriteTime is older than the write cold threshold.
|
|
func (e *Engine) ShouldCompactCache(t time.Time) bool {
|
|
sz := e.Cache.Size()
|
|
|
|
if sz == 0 {
|
|
return false
|
|
}
|
|
|
|
if sz > e.CacheFlushMemorySizeThreshold {
|
|
return true
|
|
}
|
|
|
|
return t.Sub(e.Cache.LastWriteTime()) > e.CacheFlushWriteColdDuration
|
|
}
|
|
|
|
func (e *Engine) compact(wg *sync.WaitGroup) {
|
|
t := time.NewTicker(time.Second)
|
|
defer t.Stop()
|
|
|
|
for {
|
|
e.mu.RLock()
|
|
quit := e.done
|
|
e.mu.RUnlock()
|
|
|
|
select {
|
|
case <-quit:
|
|
return
|
|
|
|
case <-t.C:
|
|
|
|
// Find our compaction plans
|
|
level1Groups, len1 := e.CompactionPlan.PlanLevel(1)
|
|
level2Groups, len2 := e.CompactionPlan.PlanLevel(2)
|
|
level3Groups, len3 := e.CompactionPlan.PlanLevel(3)
|
|
level4Groups, len4 := e.CompactionPlan.Plan(e.LastModified())
|
|
|
|
e.stats.Queued.With(prometheus.Labels{levelKey: levelFull}).Set(float64(len4))
|
|
|
|
// If no full compactions are need, see if an optimize is needed
|
|
if len(level4Groups) == 0 {
|
|
level4Groups, len4 = e.CompactionPlan.PlanOptimize()
|
|
e.stats.Queued.With(prometheus.Labels{levelKey: levelOpt}).Set(float64(len4))
|
|
}
|
|
|
|
// Update the level plan queue stats
|
|
// For stats, use the length needed, even if the lock was
|
|
// not acquired
|
|
e.stats.Queued.With(prometheus.Labels{levelKey: level1}).Set(float64(len1))
|
|
e.stats.Queued.With(prometheus.Labels{levelKey: level2}).Set(float64(len2))
|
|
e.stats.Queued.With(prometheus.Labels{levelKey: level3}).Set(float64(len3))
|
|
|
|
// Set the queue depths on the scheduler
|
|
// Use the real queue depth, dependent on acquiring
|
|
// the file locks.
|
|
e.scheduler.setDepth(1, len(level1Groups))
|
|
e.scheduler.setDepth(2, len(level2Groups))
|
|
e.scheduler.setDepth(3, len(level3Groups))
|
|
e.scheduler.setDepth(4, len(level4Groups))
|
|
|
|
// Find the next compaction that can run and try to kick it off
|
|
if level, runnable := e.scheduler.next(); runnable {
|
|
switch level {
|
|
case 1:
|
|
if e.compactLevel(level1Groups[0], 1, false, wg) {
|
|
level1Groups = level1Groups[1:]
|
|
}
|
|
case 2:
|
|
if e.compactLevel(level2Groups[0], 2, false, wg) {
|
|
level2Groups = level2Groups[1:]
|
|
}
|
|
case 3:
|
|
if e.compactLevel(level3Groups[0], 3, true, wg) {
|
|
level3Groups = level3Groups[1:]
|
|
}
|
|
case 4:
|
|
if e.compactFull(level4Groups[0], wg) {
|
|
level4Groups = level4Groups[1:]
|
|
}
|
|
}
|
|
}
|
|
|
|
// Release all the plans we didn't start.
|
|
e.CompactionPlan.Release(level1Groups)
|
|
e.CompactionPlan.Release(level2Groups)
|
|
e.CompactionPlan.Release(level3Groups)
|
|
e.CompactionPlan.Release(level4Groups)
|
|
}
|
|
}
|
|
}
|
|
|
|
// compactLevel kicks off compactions using the level strategy. It returns
|
|
// true if the compaction was started
|
|
func (e *Engine) compactLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool {
|
|
s := e.levelCompactionStrategy(grp, fast, level)
|
|
if s == nil {
|
|
return false
|
|
}
|
|
|
|
if e.compactionLimiter.TryTake() {
|
|
{
|
|
val := atomic.AddInt64(e.activeCompactions.countForLevel(level), 1)
|
|
e.stats.Active.With(labelForLevel(level)).Set(float64(val))
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
defer func() {
|
|
val := atomic.AddInt64(e.activeCompactions.countForLevel(level), -1)
|
|
e.stats.Active.With(labelForLevel(level)).Set(float64(val))
|
|
}()
|
|
|
|
defer e.compactionLimiter.Release()
|
|
s.Apply()
|
|
// Release the files in the compaction plan
|
|
e.CompactionPlan.Release([]CompactionGroup{s.group})
|
|
}()
|
|
return true
|
|
}
|
|
|
|
// Return the unused plans
|
|
return false
|
|
}
|
|
|
|
// compactFull kicks off full and optimize compactions using the lo priority policy. It returns
|
|
// the plans that were not able to be started.
|
|
func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool {
|
|
s := e.fullCompactionStrategy(grp, false)
|
|
if s == nil {
|
|
return false
|
|
}
|
|
|
|
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
|
|
if e.compactionLimiter.TryTake() {
|
|
{
|
|
val := atomic.AddInt64(&e.activeCompactions.full, 1)
|
|
e.stats.Active.With(prometheus.Labels{levelKey: levelFull}).Set(float64(val))
|
|
}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
defer func() {
|
|
val := atomic.AddInt64(&e.activeCompactions.full, -1)
|
|
e.stats.Active.With(prometheus.Labels{levelKey: levelFull}).Set(float64(val))
|
|
}()
|
|
defer e.compactionLimiter.Release()
|
|
s.Apply()
|
|
// Release the files in the compaction plan
|
|
e.CompactionPlan.Release([]CompactionGroup{s.group})
|
|
}()
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// compactionStrategy holds the details of what to do in a compaction.
|
|
type compactionStrategy struct {
|
|
group CompactionGroup
|
|
|
|
fast bool
|
|
level int
|
|
|
|
durationSecondsStat prometheus.Observer
|
|
errorStat prometheus.Counter
|
|
|
|
logger *zap.Logger
|
|
compactor *Compactor
|
|
fileStore *FileStore
|
|
|
|
engine *Engine
|
|
}
|
|
|
|
// Apply concurrently compacts all the groups in a compaction strategy.
|
|
func (s *compactionStrategy) Apply() {
|
|
start := time.Now()
|
|
s.compactGroup()
|
|
s.durationSecondsStat.Observe(time.Since(start).Seconds())
|
|
}
|
|
|
|
// compactGroup executes the compaction strategy against a single CompactionGroup.
|
|
func (s *compactionStrategy) compactGroup() {
|
|
group := s.group
|
|
log, logEnd := logger.NewOperation(context.TODO(), s.logger, "TSM compaction", "tsm1_compact_group")
|
|
defer logEnd()
|
|
|
|
log.Info("Beginning compaction", zap.Int("tsm1_files_n", len(group)))
|
|
for i, f := range group {
|
|
log.Info("Compacting file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f))
|
|
}
|
|
|
|
var (
|
|
err error
|
|
files []string
|
|
)
|
|
if s.fast {
|
|
files, err = s.compactor.CompactFast(group, log)
|
|
} else {
|
|
files, err = s.compactor.CompactFull(group, log)
|
|
}
|
|
|
|
if err != nil {
|
|
_, inProgress := err.(errCompactionInProgress)
|
|
if err == errCompactionsDisabled || inProgress {
|
|
log.Info("Aborted compaction", zap.Error(err))
|
|
|
|
if _, ok := err.(errCompactionInProgress); ok {
|
|
time.Sleep(time.Second)
|
|
}
|
|
return
|
|
}
|
|
|
|
log.Warn("Error compacting TSM files", zap.Error(err))
|
|
|
|
// We hit a bad TSM file - rename so the next compaction can proceed.
|
|
if _, ok := err.(errBlockRead); ok {
|
|
path := err.(errBlockRead).file
|
|
log.Info("Renaming a corrupt TSM file due to compaction error", zap.Error(err))
|
|
if err := s.fileStore.ReplaceWithCallback([]string{path}, nil, nil); err != nil {
|
|
log.Info("Error removing bad TSM file", zap.Error(err))
|
|
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
|
|
log.Info("Error renaming corrupt TSM file", zap.Error((err)))
|
|
}
|
|
}
|
|
|
|
s.errorStat.Inc()
|
|
time.Sleep(time.Second)
|
|
return
|
|
}
|
|
|
|
if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil {
|
|
log.Info("Error replacing new TSM files", zap.Error(err))
|
|
s.errorStat.Inc()
|
|
time.Sleep(time.Second)
|
|
|
|
// Remove the new snapshot files. We will try again.
|
|
for _, file := range files {
|
|
if err := os.Remove(file); err != nil {
|
|
log.Error("Unable to remove file", zap.String("path", file), zap.Error(err))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
for i, f := range files {
|
|
log.Info("Compacted file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f))
|
|
}
|
|
log.Info("Finished compacting files",
|
|
zap.Int("tsm1_files_n", len(files)))
|
|
}
|
|
|
|
// levelCompactionStrategy returns a compactionStrategy for the given level.
|
|
// It returns nil if there are no TSM files to compact.
|
|
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {
|
|
label := labelForLevel(level)
|
|
return &compactionStrategy{
|
|
group: group,
|
|
logger: e.logger.With(zap.Int("tsm1_level", level), zap.String("tsm1_strategy", "level")),
|
|
fileStore: e.FileStore,
|
|
compactor: e.Compactor,
|
|
fast: fast,
|
|
engine: e,
|
|
level: level,
|
|
|
|
errorStat: e.stats.Failed.With(label),
|
|
durationSecondsStat: e.stats.Duration.With(label),
|
|
}
|
|
}
|
|
|
|
// fullCompactionStrategy returns a compactionStrategy for higher level generations of TSM files.
|
|
// It returns nil if there are no TSM files to compact.
|
|
func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *compactionStrategy {
|
|
s := &compactionStrategy{
|
|
group: group,
|
|
logger: e.logger.With(zap.String("tsm1_strategy", "full"), zap.Bool("tsm1_optimize", optimize)),
|
|
fileStore: e.FileStore,
|
|
compactor: e.Compactor,
|
|
fast: optimize,
|
|
engine: e,
|
|
level: 4,
|
|
}
|
|
|
|
plabel := prometheus.Labels{levelKey: levelFull}
|
|
if optimize {
|
|
plabel = prometheus.Labels{levelKey: levelOpt}
|
|
}
|
|
s.errorStat = e.stats.Failed.With(plabel)
|
|
s.durationSecondsStat = e.stats.Duration.With(plabel)
|
|
return s
|
|
}
|
|
|
|
// reloadCache reads the WAL segment files and loads them into the cache.
|
|
func (e *Engine) reloadCache() error {
|
|
now := time.Now()
|
|
files, err := segmentFileNames(e.WAL.Path())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
limit := e.Cache.MaxSize()
|
|
defer func() {
|
|
e.Cache.SetMaxSize(limit)
|
|
}()
|
|
|
|
// Disable the max size during loading
|
|
e.Cache.SetMaxSize(0)
|
|
|
|
loader := NewCacheLoader(files)
|
|
loader.WithLogger(e.logger)
|
|
if err := loader.Load(e.Cache); err != nil {
|
|
return err
|
|
}
|
|
|
|
e.traceLogger.Info("Reloaded WAL cache",
|
|
zap.String("path", e.WAL.Path()), zap.Duration("duration", time.Since(now)))
|
|
return nil
|
|
}
|
|
|
|
// cleanup removes all temp files and dirs that exist on disk. This is should only be run at startup to avoid
|
|
// removing tmp files that are still in use.
|
|
func (e *Engine) cleanup() error {
|
|
allfiles, err := os.ReadDir(e.path)
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
|
|
for _, f := range allfiles {
|
|
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
|
|
if f.IsDir() && strings.HasSuffix(f.Name(), ext) {
|
|
if err := os.RemoveAll(filepath.Join(e.path, f.Name())); err != nil {
|
|
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return e.cleanupTempTSMFiles()
|
|
}
|
|
|
|
func (e *Engine) cleanupTempTSMFiles() error {
|
|
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension)))
|
|
if err != nil {
|
|
return fmt.Errorf("error getting compaction temp files: %s", err.Error())
|
|
}
|
|
|
|
for _, f := range files {
|
|
if err := os.Remove(f); err != nil {
|
|
return fmt.Errorf("error removing temp compaction files: %v", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// KeyCursor returns a KeyCursor for the given key starting at time t.
|
|
func (e *Engine) KeyCursor(ctx context.Context, key []byte, t int64, ascending bool) *KeyCursor {
|
|
return e.FileStore.KeyCursor(ctx, key, t, ascending)
|
|
}
|
|
|
|
// CreateIterator returns an iterator for the measurement based on opt.
|
|
func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error) {
|
|
if span := tracing.SpanFromContext(ctx); span != nil {
|
|
labels := []string{"shard_id", strconv.Itoa(int(e.id)), "measurement", measurement}
|
|
if opt.Condition != nil {
|
|
labels = append(labels, "cond", opt.Condition.String())
|
|
}
|
|
|
|
span = span.StartSpan("create_iterator")
|
|
span.SetLabels(labels...)
|
|
ctx = tracing.NewContextWithSpan(ctx, span)
|
|
|
|
group := metrics.NewGroup(tsmGroup)
|
|
ctx = metrics.NewContextWithGroup(ctx, group)
|
|
start := time.Now()
|
|
|
|
defer group.GetTimer(planningTimer).UpdateSince(start)
|
|
}
|
|
|
|
if call, ok := opt.Expr.(*influxql.Call); ok {
|
|
if opt.Interval.IsZero() {
|
|
if call.Name == "first" || call.Name == "last" {
|
|
refOpt := opt
|
|
refOpt.Limit = 1
|
|
refOpt.Ascending = call.Name == "first"
|
|
refOpt.Ordered = true
|
|
refOpt.Expr = call.Args[0]
|
|
|
|
itrs, err := e.createVarRefIterator(ctx, measurement, refOpt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newMergeFinalizerIterator(ctx, itrs, opt, e.logger)
|
|
}
|
|
}
|
|
|
|
inputs, err := e.createCallIterator(ctx, measurement, call, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if len(inputs) == 0 {
|
|
return nil, nil
|
|
}
|
|
return newMergeFinalizerIterator(ctx, inputs, opt, e.logger)
|
|
}
|
|
|
|
itrs, err := e.createVarRefIterator(ctx, measurement, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newMergeFinalizerIterator(ctx, itrs, opt, e.logger)
|
|
}
|
|
|
|
// createSeriesIterator creates an optimized series iterator if possible.
|
|
// We exclude less-common cases for now as not worth implementing.
|
|
func (e *Engine) createSeriesIterator(measurement string, ref *influxql.VarRef, is tsdb.IndexSet, opt query.IteratorOptions) (query.Iterator, error) {
|
|
// Main check to see if we are trying to create a seriesKey iterator
|
|
if ref == nil || ref.Val != "_seriesKey" || len(opt.Aux) != 0 {
|
|
return nil, nil
|
|
}
|
|
// Check some other cases that we could maybe handle, but don't
|
|
if len(opt.Dimensions) > 0 {
|
|
return nil, nil
|
|
}
|
|
if opt.SLimit != 0 || opt.SOffset != 0 {
|
|
return nil, nil
|
|
}
|
|
if opt.StripName {
|
|
return nil, nil
|
|
}
|
|
if opt.Ordered {
|
|
return nil, nil
|
|
}
|
|
// Actual creation of the iterator
|
|
seriesCursor, err := is.MeasurementSeriesKeyByExprIterator([]byte(measurement), opt.Condition, opt.Authorizer)
|
|
if err != nil {
|
|
seriesCursor.Close()
|
|
return nil, err
|
|
}
|
|
var seriesIterator query.Iterator
|
|
seriesIterator = newSeriesIterator(measurement, seriesCursor)
|
|
if opt.InterruptCh != nil {
|
|
seriesIterator = query.NewInterruptIterator(seriesIterator, opt.InterruptCh)
|
|
}
|
|
return seriesIterator, nil
|
|
}
|
|
|
|
func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) {
|
|
ref, _ := call.Args[0].(*influxql.VarRef)
|
|
|
|
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
|
|
return nil, err
|
|
} else if !exists {
|
|
return nil, nil
|
|
}
|
|
|
|
// check for optimized series iteration for tsi index
|
|
if e.index.Type() == tsdb.TSI1IndexName {
|
|
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
|
seriesOpt := opt
|
|
if len(opt.Dimensions) == 0 && (call.Name == "count" || call.Name == "sum_hll") {
|
|
// no point ordering the series if we are just counting all of them
|
|
seriesOpt.Ordered = false
|
|
}
|
|
seriesIterator, err := e.createSeriesIterator(measurement, ref, indexSet, seriesOpt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if seriesIterator != nil {
|
|
callIterator, err := query.NewCallIterator(seriesIterator, opt)
|
|
if err != nil {
|
|
seriesIterator.Close()
|
|
return nil, err
|
|
}
|
|
return []query.Iterator{callIterator}, nil
|
|
}
|
|
}
|
|
|
|
// Determine tagsets for this measurement based on dimensions and filters.
|
|
var (
|
|
tagSets []*query.TagSet
|
|
err error
|
|
)
|
|
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
|
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Reverse the tag sets if we are ordering by descending.
|
|
if !opt.Ascending {
|
|
for _, t := range tagSets {
|
|
t.Reverse()
|
|
}
|
|
}
|
|
|
|
// Calculate tag sets and apply SLIMIT/SOFFSET.
|
|
tagSets = query.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)
|
|
|
|
itrs := make([]query.Iterator, 0, len(tagSets))
|
|
if err := func() error {
|
|
for _, t := range tagSets {
|
|
// Abort if the query was killed
|
|
select {
|
|
case <-opt.InterruptCh:
|
|
query.Iterators(itrs).Close()
|
|
return query.ErrQueryInterrupted
|
|
default:
|
|
}
|
|
|
|
inputs, err := e.createTagSetIterators(ctx, ref, measurement, t, opt)
|
|
if err != nil {
|
|
return err
|
|
} else if len(inputs) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Wrap each series in a call iterator.
|
|
for i, input := range inputs {
|
|
if opt.InterruptCh != nil {
|
|
input = query.NewInterruptIterator(input, opt.InterruptCh)
|
|
}
|
|
|
|
itr, err := query.NewCallIterator(input, opt)
|
|
if err != nil {
|
|
query.Iterators(inputs).Close()
|
|
return err
|
|
}
|
|
inputs[i] = itr
|
|
}
|
|
|
|
itr := query.NewParallelMergeIterator(inputs, opt, runtime.GOMAXPROCS(0))
|
|
itrs = append(itrs, itr)
|
|
}
|
|
return nil
|
|
}(); err != nil {
|
|
query.Iterators(itrs).Close()
|
|
return nil, err
|
|
}
|
|
|
|
return itrs, nil
|
|
}
|
|
|
|
// createVarRefIterator creates an iterator for a variable reference.
|
|
func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, opt query.IteratorOptions) ([]query.Iterator, error) {
|
|
ref, _ := opt.Expr.(*influxql.VarRef)
|
|
|
|
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
|
|
return nil, err
|
|
} else if !exists {
|
|
return nil, nil
|
|
}
|
|
|
|
var (
|
|
tagSets []*query.TagSet
|
|
err error
|
|
)
|
|
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
|
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Reverse the tag sets if we are ordering by descending.
|
|
if !opt.Ascending {
|
|
for _, t := range tagSets {
|
|
t.Reverse()
|
|
}
|
|
}
|
|
|
|
// Calculate tag sets and apply SLIMIT/SOFFSET.
|
|
tagSets = query.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)
|
|
itrs := make([]query.Iterator, 0, len(tagSets))
|
|
if err := func() error {
|
|
for _, t := range tagSets {
|
|
inputs, err := e.createTagSetIterators(ctx, ref, measurement, t, opt)
|
|
if err != nil {
|
|
return err
|
|
} else if len(inputs) == 0 {
|
|
continue
|
|
}
|
|
|
|
// If we have a LIMIT or OFFSET and the grouping of the outer query
|
|
// is different than the current grouping, we need to perform the
|
|
// limit on each of the individual series keys instead to improve
|
|
// performance.
|
|
if (opt.Limit > 0 || opt.Offset > 0) && len(opt.Dimensions) != len(opt.GroupBy) {
|
|
for i, input := range inputs {
|
|
inputs[i] = newLimitIterator(input, opt)
|
|
}
|
|
}
|
|
|
|
itr, err := query.Iterators(inputs).Merge(opt)
|
|
if err != nil {
|
|
query.Iterators(inputs).Close()
|
|
return err
|
|
}
|
|
|
|
// Apply a limit on the merged iterator.
|
|
if opt.Limit > 0 || opt.Offset > 0 {
|
|
if len(opt.Dimensions) == len(opt.GroupBy) {
|
|
// When the final dimensions and the current grouping are
|
|
// the same, we will only produce one series so we can use
|
|
// the faster limit iterator.
|
|
itr = newLimitIterator(itr, opt)
|
|
} else {
|
|
// When the dimensions are different than the current
|
|
// grouping, we need to account for the possibility there
|
|
// will be multiple series. The limit iterator in the
|
|
// influxql package handles that scenario.
|
|
itr = query.NewLimitIterator(itr, opt)
|
|
}
|
|
}
|
|
itrs = append(itrs, itr)
|
|
}
|
|
return nil
|
|
}(); err != nil {
|
|
query.Iterators(itrs).Close()
|
|
return nil, err
|
|
}
|
|
|
|
return itrs, nil
|
|
}
|
|
|
|
// createTagSetIterators creates a set of iterators for a tagset.
|
|
func (e *Engine) createTagSetIterators(ctx context.Context, ref *influxql.VarRef, name string, t *query.TagSet, opt query.IteratorOptions) ([]query.Iterator, error) {
|
|
// Set parallelism by number of logical cpus.
|
|
parallelism := runtime.GOMAXPROCS(0)
|
|
if parallelism > len(t.SeriesKeys) {
|
|
parallelism = len(t.SeriesKeys)
|
|
}
|
|
|
|
// Create series key groupings w/ return error.
|
|
groups := make([]struct {
|
|
keys []string
|
|
filters []influxql.Expr
|
|
itrs []query.Iterator
|
|
err error
|
|
}, parallelism)
|
|
|
|
// Group series keys.
|
|
n := len(t.SeriesKeys) / parallelism
|
|
for i := 0; i < parallelism; i++ {
|
|
group := &groups[i]
|
|
|
|
if i < parallelism-1 {
|
|
group.keys = t.SeriesKeys[i*n : (i+1)*n]
|
|
group.filters = t.Filters[i*n : (i+1)*n]
|
|
} else {
|
|
group.keys = t.SeriesKeys[i*n:]
|
|
group.filters = t.Filters[i*n:]
|
|
}
|
|
}
|
|
|
|
// Read series groups in parallel.
|
|
var wg sync.WaitGroup
|
|
for i := range groups {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
groups[i].itrs, groups[i].err = e.createTagSetGroupIterators(ctx, ref, name, groups[i].keys, t, groups[i].filters, opt)
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Determine total number of iterators so we can allocate only once.
|
|
var itrN int
|
|
for _, group := range groups {
|
|
itrN += len(group.itrs)
|
|
}
|
|
|
|
// Combine all iterators together and check for errors.
|
|
var err error
|
|
itrs := make([]query.Iterator, 0, itrN)
|
|
for _, group := range groups {
|
|
if group.err != nil {
|
|
err = group.err
|
|
}
|
|
itrs = append(itrs, group.itrs...)
|
|
}
|
|
|
|
// If an error occurred, make sure we close all created iterators.
|
|
if err != nil {
|
|
query.Iterators(itrs).Close()
|
|
return nil, err
|
|
}
|
|
|
|
return itrs, nil
|
|
}
|
|
|
|
// createTagSetGroupIterators creates a set of iterators for a subset of a tagset's series.
|
|
func (e *Engine) createTagSetGroupIterators(ctx context.Context, ref *influxql.VarRef, name string, seriesKeys []string, t *query.TagSet, filters []influxql.Expr, opt query.IteratorOptions) ([]query.Iterator, error) {
|
|
itrs := make([]query.Iterator, 0, len(seriesKeys))
|
|
for i, seriesKey := range seriesKeys {
|
|
var conditionFields []influxql.VarRef
|
|
if filters[i] != nil {
|
|
// Retrieve non-time fields from this series filter and filter out tags.
|
|
conditionFields = influxql.ExprNames(filters[i])
|
|
}
|
|
|
|
itr, err := e.createVarRefSeriesIterator(ctx, ref, name, seriesKey, t, filters[i], conditionFields, opt)
|
|
if err != nil {
|
|
return itrs, err
|
|
} else if itr == nil {
|
|
continue
|
|
}
|
|
itrs = append(itrs, itr)
|
|
|
|
// Abort if the query was killed
|
|
select {
|
|
case <-opt.InterruptCh:
|
|
query.Iterators(itrs).Close()
|
|
return nil, query.ErrQueryInterrupted
|
|
default:
|
|
}
|
|
|
|
// Enforce series limit at creation time.
|
|
if opt.MaxSeriesN > 0 && len(itrs) > opt.MaxSeriesN {
|
|
query.Iterators(itrs).Close()
|
|
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", len(itrs), opt.MaxSeriesN)
|
|
}
|
|
|
|
}
|
|
return itrs, nil
|
|
}
|
|
|
|
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
|
|
func (e *Engine) createVarRefSeriesIterator(ctx context.Context, ref *influxql.VarRef, name string, seriesKey string, t *query.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt query.IteratorOptions) (query.Iterator, error) {
|
|
_, tfs := models.ParseKey([]byte(seriesKey))
|
|
tags := query.NewTags(tfs.Map())
|
|
|
|
// Create options specific for this series.
|
|
itrOpt := opt
|
|
itrOpt.Condition = filter
|
|
|
|
var curCounter, auxCounter, condCounter *metrics.Counter
|
|
if col := metrics.GroupFromContext(ctx); col != nil {
|
|
curCounter = col.GetCounter(numberOfRefCursorsCounter)
|
|
auxCounter = col.GetCounter(numberOfAuxCursorsCounter)
|
|
condCounter = col.GetCounter(numberOfCondCursorsCounter)
|
|
}
|
|
|
|
// Build main cursor.
|
|
var cur cursor
|
|
if ref != nil {
|
|
cur = e.buildCursor(ctx, name, seriesKey, tfs, ref, opt)
|
|
// If the field doesn't exist then don't build an iterator.
|
|
if cur == nil {
|
|
return nil, nil
|
|
}
|
|
if curCounter != nil {
|
|
curCounter.Add(1)
|
|
}
|
|
}
|
|
|
|
// Build auxiliary cursors.
|
|
// Tag values should be returned if the field doesn't exist.
|
|
var aux []cursorAt
|
|
if len(opt.Aux) > 0 {
|
|
aux = make([]cursorAt, len(opt.Aux))
|
|
for i, ref := range opt.Aux {
|
|
// Create cursor from field if a tag wasn't requested.
|
|
if ref.Type != influxql.Tag {
|
|
cur := e.buildCursor(ctx, name, seriesKey, tfs, &ref, opt)
|
|
if cur != nil {
|
|
if auxCounter != nil {
|
|
auxCounter.Add(1)
|
|
}
|
|
aux[i] = newBufCursor(cur, opt.Ascending)
|
|
continue
|
|
}
|
|
|
|
// If a field was requested, use a nil cursor of the requested type.
|
|
switch ref.Type {
|
|
case influxql.Float, influxql.AnyField:
|
|
aux[i] = nilFloatLiteralValueCursor
|
|
continue
|
|
case influxql.Integer:
|
|
aux[i] = nilIntegerLiteralValueCursor
|
|
continue
|
|
case influxql.Unsigned:
|
|
aux[i] = nilUnsignedLiteralValueCursor
|
|
continue
|
|
case influxql.String:
|
|
aux[i] = nilStringLiteralValueCursor
|
|
continue
|
|
case influxql.Boolean:
|
|
aux[i] = nilBooleanLiteralValueCursor
|
|
continue
|
|
}
|
|
}
|
|
|
|
// If field doesn't exist, use the tag value.
|
|
if v := tags.Value(ref.Val); v == "" {
|
|
// However, if the tag value is blank then return a null.
|
|
aux[i] = nilStringLiteralValueCursor
|
|
} else {
|
|
aux[i] = &literalValueCursor{value: v}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove _tagKey condition field.
|
|
// We can't seach on it because we can't join it to _tagValue based on time.
|
|
if varRefSliceContains(conditionFields, "_tagKey") {
|
|
conditionFields = varRefSliceRemove(conditionFields, "_tagKey")
|
|
|
|
// Remove _tagKey conditional references from iterator.
|
|
itrOpt.Condition = influxql.RewriteExpr(influxql.CloneExpr(itrOpt.Condition), func(expr influxql.Expr) influxql.Expr {
|
|
switch expr := expr.(type) {
|
|
case *influxql.BinaryExpr:
|
|
if ref, ok := expr.LHS.(*influxql.VarRef); ok && ref.Val == "_tagKey" {
|
|
return &influxql.BooleanLiteral{Val: true}
|
|
}
|
|
if ref, ok := expr.RHS.(*influxql.VarRef); ok && ref.Val == "_tagKey" {
|
|
return &influxql.BooleanLiteral{Val: true}
|
|
}
|
|
}
|
|
return expr
|
|
})
|
|
}
|
|
|
|
// Build conditional field cursors.
|
|
// If a conditional field doesn't exist then ignore the series.
|
|
var conds []cursorAt
|
|
if len(conditionFields) > 0 {
|
|
conds = make([]cursorAt, len(conditionFields))
|
|
for i, ref := range conditionFields {
|
|
// Create cursor from field if a tag wasn't requested.
|
|
if ref.Type != influxql.Tag {
|
|
cur := e.buildCursor(ctx, name, seriesKey, tfs, &ref, opt)
|
|
if cur != nil {
|
|
if condCounter != nil {
|
|
condCounter.Add(1)
|
|
}
|
|
conds[i] = newBufCursor(cur, opt.Ascending)
|
|
continue
|
|
}
|
|
|
|
// If a field was requested, use a nil cursor of the requested type.
|
|
switch ref.Type {
|
|
case influxql.Float, influxql.AnyField:
|
|
conds[i] = nilFloatLiteralValueCursor
|
|
continue
|
|
case influxql.Integer:
|
|
conds[i] = nilIntegerLiteralValueCursor
|
|
continue
|
|
case influxql.Unsigned:
|
|
conds[i] = nilUnsignedLiteralValueCursor
|
|
continue
|
|
case influxql.String:
|
|
conds[i] = nilStringLiteralValueCursor
|
|
continue
|
|
case influxql.Boolean:
|
|
conds[i] = nilBooleanLiteralValueCursor
|
|
continue
|
|
}
|
|
}
|
|
|
|
// If field doesn't exist, use the tag value.
|
|
if v := tags.Value(ref.Val); v == "" {
|
|
// However, if the tag value is blank then return a null.
|
|
conds[i] = nilStringLiteralValueCursor
|
|
} else {
|
|
conds[i] = &literalValueCursor{value: v}
|
|
}
|
|
}
|
|
}
|
|
condNames := influxql.VarRefs(conditionFields).Strings()
|
|
|
|
// Limit tags to only the dimensions selected.
|
|
dimensions := opt.GetDimensions()
|
|
tags = tags.Subset(dimensions)
|
|
|
|
// If it's only auxiliary fields then it doesn't matter what type of iterator we use.
|
|
if ref == nil {
|
|
if opt.StripName {
|
|
name = ""
|
|
}
|
|
return newFloatIterator(name, tags, itrOpt, nil, aux, conds, condNames), nil
|
|
}
|
|
|
|
// Remove name if requested.
|
|
if opt.StripName {
|
|
name = ""
|
|
}
|
|
|
|
switch cur := cur.(type) {
|
|
case floatCursor:
|
|
return newFloatIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
|
case integerCursor:
|
|
return newIntegerIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
|
case unsignedCursor:
|
|
return newUnsignedIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
|
case stringCursor:
|
|
return newStringIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
|
case booleanCursor:
|
|
return newBooleanIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
|
|
default:
|
|
panic("unreachable")
|
|
}
|
|
}
|
|
|
|
// buildCursor creates an untyped cursor for a field.
|
|
func (e *Engine) buildCursor(ctx context.Context, measurement, seriesKey string, tags models.Tags, ref *influxql.VarRef, opt query.IteratorOptions) cursor {
|
|
// Check if this is a system field cursor.
|
|
switch ref.Val {
|
|
case "_name":
|
|
return &stringSliceCursor{values: []string{measurement}}
|
|
case "_tagKey":
|
|
return &stringSliceCursor{values: tags.Keys()}
|
|
case "_tagValue":
|
|
return &stringSliceCursor{values: matchTagValues(tags, opt.Condition)}
|
|
case "_seriesKey":
|
|
return &stringSliceCursor{values: []string{seriesKey}}
|
|
}
|
|
|
|
// Look up fields for measurement.
|
|
mf := e.fieldset.FieldsByString(measurement)
|
|
if mf == nil {
|
|
return nil
|
|
}
|
|
|
|
// Check for system field for field keys.
|
|
if ref.Val == "_fieldKey" {
|
|
return &stringSliceCursor{values: mf.FieldKeys()}
|
|
}
|
|
|
|
// Find individual field.
|
|
f := mf.Field(ref.Val)
|
|
if f == nil {
|
|
return nil
|
|
}
|
|
|
|
// Check if we need to perform a cast. Performing a cast in the
|
|
// engine (if it is possible) is much more efficient than an automatic cast.
|
|
if ref.Type != influxql.Unknown && ref.Type != influxql.AnyField && ref.Type != f.Type {
|
|
switch ref.Type {
|
|
case influxql.Float:
|
|
switch f.Type {
|
|
case influxql.Integer:
|
|
cur := e.buildIntegerCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
return &floatCastIntegerCursor{cursor: cur}
|
|
case influxql.Unsigned:
|
|
cur := e.buildUnsignedCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
return &floatCastUnsignedCursor{cursor: cur}
|
|
}
|
|
case influxql.Integer:
|
|
switch f.Type {
|
|
case influxql.Float:
|
|
cur := e.buildFloatCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
return &integerCastFloatCursor{cursor: cur}
|
|
case influxql.Unsigned:
|
|
cur := e.buildUnsignedCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
return &integerCastUnsignedCursor{cursor: cur}
|
|
}
|
|
case influxql.Unsigned:
|
|
switch f.Type {
|
|
case influxql.Float:
|
|
cur := e.buildFloatCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
return &unsignedCastFloatCursor{cursor: cur}
|
|
case influxql.Integer:
|
|
cur := e.buildIntegerCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
return &unsignedCastIntegerCursor{cursor: cur}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Return appropriate cursor based on type.
|
|
switch f.Type {
|
|
case influxql.Float:
|
|
return e.buildFloatCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
case influxql.Integer:
|
|
return e.buildIntegerCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
case influxql.Unsigned:
|
|
return e.buildUnsignedCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
case influxql.String:
|
|
return e.buildStringCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
case influxql.Boolean:
|
|
return e.buildBooleanCursor(ctx, measurement, seriesKey, ref.Val, opt)
|
|
default:
|
|
panic("unreachable")
|
|
}
|
|
}
|
|
|
|
func matchTagValues(tags models.Tags, condition influxql.Expr) []string {
|
|
if condition == nil {
|
|
return tags.Values()
|
|
}
|
|
|
|
// Populate map with tag values.
|
|
data := map[string]interface{}{}
|
|
for _, tag := range tags {
|
|
data[string(tag.Key)] = string(tag.Value)
|
|
}
|
|
|
|
// Match against each specific tag.
|
|
var values []string
|
|
for _, tag := range tags {
|
|
data["_tagKey"] = string(tag.Key)
|
|
if influxql.EvalBool(condition, data) {
|
|
values = append(values, string(tag.Value))
|
|
}
|
|
}
|
|
return values
|
|
}
|
|
|
|
// IteratorCost produces the cost of an iterator.
|
|
func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
|
|
// Determine if this measurement exists. If it does not, then no shards are
|
|
// accessed to begin with.
|
|
if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil {
|
|
return query.IteratorCost{}, err
|
|
} else if !exists {
|
|
return query.IteratorCost{}, nil
|
|
}
|
|
|
|
// Determine all of the tag sets for this query.
|
|
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
|
tagSets, err := indexSet.TagSets(e.sfile, []byte(measurement), opt)
|
|
if err != nil {
|
|
return query.IteratorCost{}, err
|
|
}
|
|
|
|
// Attempt to retrieve the ref from the main expression (if it exists).
|
|
var ref *influxql.VarRef
|
|
if opt.Expr != nil {
|
|
if v, ok := opt.Expr.(*influxql.VarRef); ok {
|
|
ref = v
|
|
} else if call, ok := opt.Expr.(*influxql.Call); ok {
|
|
if len(call.Args) > 0 {
|
|
ref, _ = call.Args[0].(*influxql.VarRef)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Count the number of series concatenated from the tag set.
|
|
cost := query.IteratorCost{NumShards: 1}
|
|
for _, t := range tagSets {
|
|
cost.NumSeries += int64(len(t.SeriesKeys))
|
|
for i, key := range t.SeriesKeys {
|
|
// Retrieve the cost for the main expression (if it exists).
|
|
if ref != nil {
|
|
c := e.seriesCost(key, ref.Val, opt.StartTime, opt.EndTime)
|
|
cost = cost.Combine(c)
|
|
}
|
|
|
|
// Retrieve the cost for every auxiliary field since these are also
|
|
// iterators that we may have to look through.
|
|
// We may want to separate these though as we are unlikely to incur
|
|
// anywhere close to the full costs of the auxiliary iterators because
|
|
// many of the selected values are usually skipped.
|
|
for _, ref := range opt.Aux {
|
|
c := e.seriesCost(key, ref.Val, opt.StartTime, opt.EndTime)
|
|
cost = cost.Combine(c)
|
|
}
|
|
|
|
// Retrieve the expression names in the condition (if there is a condition).
|
|
// We will also create cursors for these too.
|
|
if t.Filters[i] != nil {
|
|
refs := influxql.ExprNames(t.Filters[i])
|
|
for _, ref := range refs {
|
|
c := e.seriesCost(key, ref.Val, opt.StartTime, opt.EndTime)
|
|
cost = cost.Combine(c)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return cost, nil
|
|
}
|
|
|
|
// Type returns FieldType for a series. If the series does not
|
|
// exist, ErrUnknownFieldType is returned.
|
|
func (e *Engine) Type(series []byte) (models.FieldType, error) {
|
|
if typ, err := e.Cache.Type(series); err == nil {
|
|
return typ, nil
|
|
}
|
|
|
|
typ, err := e.FileStore.Type(series)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
switch typ {
|
|
case BlockFloat64:
|
|
return models.Float, nil
|
|
case BlockInteger:
|
|
return models.Integer, nil
|
|
case BlockUnsigned:
|
|
return models.Unsigned, nil
|
|
case BlockString:
|
|
return models.String, nil
|
|
case BlockBoolean:
|
|
return models.Boolean, nil
|
|
}
|
|
return 0, tsdb.ErrUnknownFieldType
|
|
}
|
|
|
|
func (e *Engine) seriesCost(seriesKey, field string, tmin, tmax int64) query.IteratorCost {
|
|
key := SeriesFieldKeyBytes(seriesKey, field)
|
|
c := e.FileStore.Cost(key, tmin, tmax)
|
|
|
|
// Retrieve the range of values within the cache.
|
|
cacheValues := e.Cache.Values(key)
|
|
c.CachedValues = int64(len(cacheValues.Include(tmin, tmax)))
|
|
return c
|
|
}
|
|
|
|
// SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID.
|
|
func SeriesFieldKey(seriesKey, field string) string {
|
|
return seriesKey + keyFieldSeparator + field
|
|
}
|
|
|
|
func SeriesFieldKeyBytes(seriesKey, field string) []byte {
|
|
b := make([]byte, len(seriesKey)+len(keyFieldSeparator)+len(field))
|
|
i := copy(b, seriesKey)
|
|
i += copy(b[i:], keyFieldSeparatorBytes)
|
|
copy(b[i:], field)
|
|
return b
|
|
}
|
|
|
|
var (
|
|
blockToFieldType = [8]influxql.DataType{
|
|
BlockFloat64: influxql.Float,
|
|
BlockInteger: influxql.Integer,
|
|
BlockBoolean: influxql.Boolean,
|
|
BlockString: influxql.String,
|
|
BlockUnsigned: influxql.Unsigned,
|
|
5: influxql.Unknown,
|
|
6: influxql.Unknown,
|
|
7: influxql.Unknown,
|
|
}
|
|
)
|
|
|
|
func BlockTypeToInfluxQLDataType(typ byte) influxql.DataType { return blockToFieldType[typ&7] }
|
|
|
|
// SeriesAndFieldFromCompositeKey returns the series key and the field key extracted from the composite key.
|
|
func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, []byte) {
|
|
series, field, _ := bytes.Cut(key, keyFieldSeparatorBytes)
|
|
return series, field
|
|
}
|
|
|
|
func varRefSliceContains(a []influxql.VarRef, v string) bool {
|
|
for _, ref := range a {
|
|
if ref.Val == v {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func varRefSliceRemove(a []influxql.VarRef, v string) []influxql.VarRef {
|
|
if !varRefSliceContains(a, v) {
|
|
return a
|
|
}
|
|
|
|
other := make([]influxql.VarRef, 0, len(a))
|
|
for _, ref := range a {
|
|
if ref.Val != v {
|
|
other = append(other, ref)
|
|
}
|
|
}
|
|
return other
|
|
}
|
|
|
|
const reindexBatchSize = 10000
|
|
|
|
func (e *Engine) Reindex() error {
|
|
keys := make([][]byte, reindexBatchSize)
|
|
seriesKeys := make([][]byte, reindexBatchSize)
|
|
names := make([][]byte, reindexBatchSize)
|
|
tags := make([]models.Tags, reindexBatchSize)
|
|
|
|
n := 0
|
|
|
|
reindexBatch := func() error {
|
|
if n == 0 {
|
|
return nil
|
|
}
|
|
|
|
for i, key := range keys[:n] {
|
|
seriesKeys[i], _ = SeriesAndFieldFromCompositeKey(key)
|
|
names[i], tags[i] = models.ParseKeyBytes(seriesKeys[i])
|
|
e.traceLogger.Debug(
|
|
"Read series during reindexing",
|
|
logger.Shard(e.id),
|
|
zap.String("name", string(names[i])),
|
|
zap.String("tags", tags[i].String()),
|
|
)
|
|
}
|
|
|
|
e.logger.Debug("Reindexing data batch", logger.Shard(e.id), zap.Int("batch_size", n))
|
|
if err := e.index.CreateSeriesListIfNotExists(seriesKeys[:n], names[:n], tags[:n]); err != nil {
|
|
return err
|
|
}
|
|
|
|
n = 0
|
|
return nil
|
|
}
|
|
reindexKey := func(key []byte) error {
|
|
keys[n] = key
|
|
n++
|
|
|
|
if n < reindexBatchSize {
|
|
return nil
|
|
}
|
|
return reindexBatch()
|
|
}
|
|
|
|
// Index data stored in TSM files.
|
|
e.logger.Info("Reindexing TSM data", logger.Shard(e.id))
|
|
if err := e.FileStore.WalkKeys(nil, func(key []byte, _ byte) error {
|
|
return reindexKey(key)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Make sure all TSM data is indexed.
|
|
if err := reindexBatch(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !e.WALEnabled {
|
|
// All done.
|
|
return nil
|
|
}
|
|
|
|
// Reindex data stored in the WAL cache.
|
|
e.logger.Info("Reindexing WAL data", logger.Shard(e.id))
|
|
for _, key := range e.Cache.Keys() {
|
|
if err := reindexKey(key); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Make sure all WAL data is indexed.
|
|
return reindexBatch()
|
|
}
|