influxdb/tsdb/engine.go

223 lines
7.2 KiB
Go
Raw Normal View History

2015-07-22 14:53:20 +00:00
package tsdb
import (
"context"
2015-07-22 14:53:20 +00:00
"errors"
"fmt"
"io"
"os"
"regexp"
2018-06-01 18:20:44 +00:00
"runtime"
"sort"
2015-07-22 14:53:20 +00:00
"time"
"github.com/influxdata/influxdb/models"
2016-09-21 15:04:37 +00:00
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"go.uber.org/zap"
2015-07-22 14:53:20 +00:00
)
var (
// ErrFormatNotFound is returned when no format can be determined from a path.
ErrFormatNotFound = errors.New("format not found")
// ErrUnknownEngineFormat is returned when the engine format is
// unknown. ErrUnknownEngineFormat is currently returned if a format
// other than tsm1 is encountered.
ErrUnknownEngineFormat = errors.New("unknown engine format")
2015-07-22 14:53:20 +00:00
)
// Engine represents a swappable storage engine for the shard.
type Engine interface {
Open() error
Close() error
2016-09-14 13:55:44 +00:00
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
ScheduleFullCompaction() error
WithLogger(*zap.Logger)
2016-09-14 13:55:44 +00:00
2016-10-04 18:45:09 +00:00
LoadMetadataIndex(shardID uint64, index Index) error
2015-07-22 14:53:20 +00:00
CreateSnapshot(skipCacheOk bool) (string, error)
2016-04-29 00:29:09 +00:00
Backup(w io.Writer, basePath string, since time.Time) error
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
2016-04-29 00:29:09 +00:00
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
2018-01-05 18:39:33 +00:00
Digest() (io.ReadCloser, int64, error)
2016-04-29 00:29:09 +00:00
CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
CreateCursorIterator(ctx context.Context) (CursorIterator, error)
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
WritePoints(points []models.Point, tracker StatsTracker) error
feat: series creation ingress metrics (#20700) After turning this on and testing locally, note the 'seriesCreated' metric "localStore": {"name":"localStore","tags":null,"values":{"pointsWritten":2987,"seriesCreated":58,"valuesWritten":23754}}, "ingress": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"cq","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":4}}, "ingress:1": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"database","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":4}}, "ingress:2": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"httpd","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":46}}, "ingress:3": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"ingress","rp":"monitor"},"values":{"pointsWritten":14,"seriesCreated":14,"valuesWritten":42}}, "ingress:4": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"localStore","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":6}}, "ingress:5": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"queryExecutor","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":10}}, "ingress:6": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"runtime","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":30}}, "ingress:7": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"shard","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":22}}, "ingress:8": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"subscriber","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":6}}, "ingress:9": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_cache","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":18}}, "ingress:10": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_engine","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":58}}, "ingress:11": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_filestore","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":4}}, "ingress:12": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_wal","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":8}}, "ingress:13": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"write","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":18}}, "ingress:14": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"cpu","rp":"autogen"},"values":{"pointsWritten":1342,"seriesCreated":13,"valuesWritten":13420}}, "ingress:15": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"disk","rp":"autogen"},"values":{"pointsWritten":642,"seriesCreated":6,"valuesWritten":4494}}, "ingress:16": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"diskio","rp":"autogen"},"values":{"pointsWritten":214,"seriesCreated":2,"valuesWritten":2354}}, "ingress:17": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"mem","rp":"autogen"},"values":{"pointsWritten":107,"seriesCreated":1,"valuesWritten":963}}, "ingress:18": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"processes","rp":"autogen"},"values":{"pointsWritten":107,"seriesCreated":1,"valuesWritten":856}}, "ingress:19": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"swap","rp":"autogen"},"values":{"pointsWritten":214,"seriesCreated":1,"valuesWritten":642}}, "ingress:20": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"system","rp":"autogen"},"values":{"pointsWritten":321,"seriesCreated":1,"valuesWritten":749}}, Closes: https://github.com/influxdata/influxdb/issues/20613
2021-02-05 18:52:43 +00:00
CreateSeriesIfNotExists(key, name []byte, tags models.Tags, tracker StatsTracker) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags, tracker StatsTracker) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error
2016-09-21 15:04:37 +00:00
2016-09-23 13:33:47 +00:00
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
2016-11-29 12:26:52 +00:00
SeriesN() int64
2017-01-02 16:29:18 +00:00
MeasurementExists(name []byte) (bool, error)
2017-12-08 17:11:07 +00:00
2016-11-11 16:25:53 +00:00
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
2017-11-29 18:20:18 +00:00
MeasurementFieldSet() *MeasurementFieldSet
MeasurementFields(measurement []byte) *MeasurementFields
2016-12-15 15:31:18 +00:00
ForEachMeasurementName(fn func(name []byte) error) error
2016-12-05 17:51:06 +00:00
DeleteMeasurement(name []byte) error
HasTagKey(name, key []byte) (bool, error)
2016-12-06 17:30:41 +00:00
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
2017-03-24 15:48:10 +00:00
TagKeyCardinality(name, key []byte) int
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() (bool, string)
Free() error
io.WriterTo
2015-07-22 14:53:20 +00:00
}
// SeriesIDSets provides access to the total set of series IDs
type SeriesIDSets interface {
ForEach(f func(ids *SeriesIDSet)) error
}
2016-02-10 20:04:18 +00:00
// EngineFormat represents the format for an engine.
type EngineFormat int
const (
2016-02-10 20:04:18 +00:00
// TSM1Format is the format used by the tsm1 engine.
2015-11-04 21:06:06 +00:00
TSM1Format EngineFormat = 2
)
2015-07-22 14:53:20 +00:00
// NewEngineFunc creates a new engine.
2018-05-10 21:59:31 +00:00
type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine
2015-07-22 14:53:20 +00:00
// newEngineFuncs is a lookup of engine constructors by name.
var newEngineFuncs = make(map[string]NewEngineFunc)
// RegisterEngine registers a storage engine initializer by name.
func RegisterEngine(name string, fn NewEngineFunc) {
if _, ok := newEngineFuncs[name]; ok {
panic("engine already registered: " + name)
}
newEngineFuncs[name] = fn
}
// RegisteredEngines returns the slice of currently registered engines.
func RegisteredEngines() []string {
a := make([]string, 0, len(newEngineFuncs))
2016-02-10 20:04:18 +00:00
for k := range newEngineFuncs {
a = append(a, k)
}
sort.Strings(a)
return a
}
2015-07-22 14:53:20 +00:00
// NewEngine returns an instance of an engine based on its format.
// If the path does not exist then the DefaultFormat is used.
2018-05-10 21:59:31 +00:00
func NewEngine(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) (Engine, error) {
2015-07-22 14:53:20 +00:00
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
2018-05-10 21:59:31 +00:00
engine := newEngineFuncs[options.EngineVersion](id, i, path, walPath, sfile, options)
if options.OnNewEngine != nil {
options.OnNewEngine(engine)
}
return engine, nil
2015-07-22 14:53:20 +00:00
}
2015-11-04 21:06:06 +00:00
// If it's a dir then it's a tsm1 engine
format := DefaultEngine
2015-11-04 21:06:06 +00:00
if fi, err := os.Stat(path); err != nil {
2015-07-22 14:53:20 +00:00
return nil, err
2015-11-04 21:06:06 +00:00
} else if !fi.Mode().IsDir() {
return nil, ErrUnknownEngineFormat
2015-11-04 21:06:06 +00:00
} else {
format = "tsm1"
2015-07-22 14:53:20 +00:00
}
// Lookup engine by format.
fn := newEngineFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid engine format: %q", format)
}
2018-05-10 21:59:31 +00:00
engine := fn(id, i, path, walPath, sfile, options)
if options.OnNewEngine != nil {
options.OnNewEngine(engine)
}
return engine, nil
2015-07-22 14:53:20 +00:00
}
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
IndexVersion string
InmemIndex interface{} // shared in-memory index
// Limits the concurrent number of TSM files that can be loaded at once.
OpenLimiter limiter.Fixed
// CompactionDisabled specifies shards should not schedule compactions.
// This option is intended for offline tooling.
CompactionDisabled bool
CompactionPlannerCreator CompactionPlannerCreator
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate
WALEnabled bool
MonitorDisabled bool
// DatabaseFilter is a predicate controlling which databases may be opened.
// If no function is set, all databases will be opened.
DatabaseFilter func(database string) bool
// RetentionPolicyFilter is a predicate controlling which combination of database and retention policy may be opened.
// nil will allow all combinations to pass.
RetentionPolicyFilter func(database, rp string) bool
// ShardFilter is a predicate controlling which combination of database, retention policy and shard group may be opened.
// nil will allow all combinations to pass.
ShardFilter func(database, rp string, id uint64) bool
2015-08-18 20:59:54 +00:00
Config Config
SeriesIDSets SeriesIDSets
OnNewEngine func(Engine)
FileStoreObserver FileStoreObserver
2015-07-22 14:53:20 +00:00
}
2018-06-01 18:20:44 +00:00
// NewEngineOptions constructs an EngineOptions object with safe default values.
// This should only be used in tests; production environments should read from a config file.
2015-07-22 14:53:20 +00:00
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
2016-11-16 18:57:55 +00:00
IndexVersion: DefaultIndex,
Config: NewConfig(),
WALEnabled: true,
2018-06-01 18:20:44 +00:00
OpenLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
2015-07-22 14:53:20 +00:00
}
}
2016-12-19 16:57:05 +00:00
// NewInmemIndex returns a new "inmem" index type.
2017-11-15 23:09:25 +00:00
var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error)
type CompactionPlannerCreator func(cfg Config) interface{}
// FileStoreObserver is passed notifications before the file store adds or deletes files. In this way, it can
// be sure to observe every file that is added or removed even in the presence of process death.
type FileStoreObserver interface {
// FileFinishing is called before a file is renamed to it's final name.
FileFinishing(path string) error
// FileUnlinking is called before a file is unlinked.
FileUnlinking(path string) error
}