2018-10-02 15:06:37 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
2018-10-05 11:43:56 +00:00
|
|
|
"bytes"
|
2018-10-03 18:34:34 +00:00
|
|
|
"context"
|
2018-10-05 11:43:56 +00:00
|
|
|
"errors"
|
2018-11-29 15:15:20 +00:00
|
|
|
"fmt"
|
2019-01-09 22:24:26 +00:00
|
|
|
"math"
|
2018-10-03 18:34:34 +00:00
|
|
|
"sync"
|
2018-10-10 14:35:05 +00:00
|
|
|
"time"
|
2018-10-02 15:06:37 +00:00
|
|
|
|
2019-01-08 00:37:16 +00:00
|
|
|
platform "github.com/influxdata/influxdb"
|
|
|
|
"github.com/influxdata/influxdb/logger"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
2019-01-16 20:37:12 +00:00
|
|
|
"github.com/influxdata/influxdb/storage/wal"
|
2019-01-08 00:37:16 +00:00
|
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
|
|
"github.com/influxdata/influxdb/tsdb/tsi1"
|
|
|
|
"github.com/influxdata/influxdb/tsdb/tsm1"
|
2018-10-03 18:34:34 +00:00
|
|
|
"github.com/influxdata/influxql"
|
2018-10-08 15:49:33 +00:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
2018-10-02 15:06:37 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
2018-10-03 18:34:34 +00:00
|
|
|
// Static objects to prevent small allocs.
|
|
|
|
var timeBytes = []byte("time")
|
|
|
|
|
2018-10-05 11:43:56 +00:00
|
|
|
// ErrEngineClosed is returned when a caller attempts to use the engine while
|
|
|
|
// it's closed.
|
|
|
|
var ErrEngineClosed = errors.New("engine is closed")
|
|
|
|
|
2018-10-02 15:06:37 +00:00
|
|
|
type Engine struct {
|
2018-10-05 16:57:49 +00:00
|
|
|
config Config
|
|
|
|
path string
|
|
|
|
engineID *int // Not used by default.
|
|
|
|
nodeID *int // Not used by default.
|
2018-10-02 15:06:37 +00:00
|
|
|
|
2018-10-10 11:36:02 +00:00
|
|
|
mu sync.RWMutex
|
2018-10-10 14:35:05 +00:00
|
|
|
closing chan struct{} //closing returns the zero value when the engine is shutting down.
|
2018-10-10 11:36:02 +00:00
|
|
|
index *tsi1.Index
|
|
|
|
sfile *tsdb.SeriesFile
|
|
|
|
engine *tsm1.Engine
|
2019-01-16 20:37:12 +00:00
|
|
|
wal *wal.WAL
|
2018-10-10 11:36:02 +00:00
|
|
|
retentionEnforcer *retentionEnforcer
|
2018-10-02 15:06:37 +00:00
|
|
|
|
2018-11-07 16:44:25 +00:00
|
|
|
defaultMetricLabels prometheus.Labels
|
|
|
|
|
2018-10-10 14:35:05 +00:00
|
|
|
// Tracks all goroutines started by the Engine.
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
2018-10-02 15:06:37 +00:00
|
|
|
logger *zap.Logger
|
|
|
|
}
|
|
|
|
|
2018-10-03 18:34:34 +00:00
|
|
|
// Option provides a set
|
|
|
|
type Option func(*Engine)
|
|
|
|
|
|
|
|
// WithTSMFilenameFormatter sets a function on the underlying tsm1.Engine to specify
|
|
|
|
// how TSM files are named.
|
2018-11-07 23:27:02 +00:00
|
|
|
func WithTSMFilenameFormatter(fn tsm1.FormatFileNameFunc) Option {
|
2018-10-03 18:34:34 +00:00
|
|
|
return func(e *Engine) {
|
|
|
|
e.engine.WithFormatFileNameFunc(fn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-05 16:57:49 +00:00
|
|
|
// WithEngineID sets an engine id, which can be useful for logging when multiple
|
|
|
|
// engines are in use.
|
2018-11-07 23:27:02 +00:00
|
|
|
func WithEngineID(id int) Option {
|
2018-10-05 16:57:49 +00:00
|
|
|
return func(e *Engine) {
|
2018-10-15 18:31:02 +00:00
|
|
|
e.engineID = &id
|
2018-11-07 16:44:25 +00:00
|
|
|
e.defaultMetricLabels["engine_id"] = fmt.Sprint(*e.engineID)
|
2018-10-05 16:57:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithNodeID sets a node id on the engine, which can be useful for logging
|
|
|
|
// when a system has engines running on multiple nodes.
|
2018-11-07 23:27:02 +00:00
|
|
|
func WithNodeID(id int) Option {
|
2018-10-05 16:57:49 +00:00
|
|
|
return func(e *Engine) {
|
2018-10-15 18:31:02 +00:00
|
|
|
e.nodeID = &id
|
2018-11-07 16:44:25 +00:00
|
|
|
e.defaultMetricLabels["node_id"] = fmt.Sprint(*e.nodeID)
|
2018-10-05 16:57:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-10 11:36:02 +00:00
|
|
|
// WithRetentionEnforcer initialises a retention enforcer on the engine.
|
|
|
|
// WithRetentionEnforcer must be called after other options to ensure that all
|
2018-10-09 18:43:10 +00:00
|
|
|
// metrics are labelled correctly.
|
2018-11-07 23:27:02 +00:00
|
|
|
func WithRetentionEnforcer(finder BucketFinder) Option {
|
2018-10-08 15:49:33 +00:00
|
|
|
return func(e *Engine) {
|
2018-10-10 14:35:05 +00:00
|
|
|
e.retentionEnforcer = newRetentionEnforcer(e, finder)
|
2018-10-08 15:49:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-07 23:27:02 +00:00
|
|
|
// WithFileStoreObserver makes the engine have the provided file store observer.
|
|
|
|
func WithFileStoreObserver(obs tsm1.FileStoreObserver) Option {
|
|
|
|
return func(e *Engine) {
|
|
|
|
e.engine.WithFileStoreObserver(obs)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-08 17:19:27 +00:00
|
|
|
// WithCompactionPlanner makes the engine have the provided compaction planner.
|
|
|
|
func WithCompactionPlanner(planner tsm1.CompactionPlanner) Option {
|
|
|
|
return func(e *Engine) {
|
|
|
|
e.engine.WithCompactionPlanner(planner)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-03 18:34:34 +00:00
|
|
|
// NewEngine initialises a new storage engine, including a series file, index and
|
|
|
|
// TSM engine.
|
|
|
|
func NewEngine(path string, c Config, options ...Option) *Engine {
|
2018-10-02 15:06:37 +00:00
|
|
|
e := &Engine{
|
2018-11-07 16:44:25 +00:00
|
|
|
config: c,
|
|
|
|
path: path,
|
|
|
|
defaultMetricLabels: prometheus.Labels{},
|
|
|
|
logger: zap.NewNop(),
|
2018-10-02 15:06:37 +00:00
|
|
|
}
|
|
|
|
|
2018-11-07 18:30:48 +00:00
|
|
|
// Initialize series file.
|
2018-11-07 23:27:02 +00:00
|
|
|
e.sfile = tsdb.NewSeriesFile(c.GetSeriesFilePath(path))
|
2018-11-07 18:30:48 +00:00
|
|
|
|
2018-10-02 15:06:37 +00:00
|
|
|
// Initialise index.
|
2018-11-07 18:30:48 +00:00
|
|
|
e.index = tsi1.NewIndex(e.sfile, c.Index,
|
2018-11-07 23:27:02 +00:00
|
|
|
tsi1.WithPath(c.GetIndexPath(path)))
|
2018-11-07 18:30:48 +00:00
|
|
|
|
|
|
|
// Initialize WAL
|
2019-01-17 01:28:57 +00:00
|
|
|
e.wal = wal.NewWAL(c.GetWALPath(path))
|
|
|
|
e.wal.WithFsyncDelay(time.Duration(c.WAL.FsyncDelay))
|
|
|
|
e.wal.EnableTraceLogging(c.TraceLoggingEnabled)
|
|
|
|
e.wal.SetEnabled(c.WAL.Enabled)
|
2018-10-02 15:06:37 +00:00
|
|
|
|
|
|
|
// Initialise Engine
|
2018-11-07 23:27:02 +00:00
|
|
|
e.engine = tsm1.NewEngine(c.GetEnginePath(path), e.index, c.Engine,
|
2018-11-07 18:30:48 +00:00
|
|
|
tsm1.WithTraceLogging(c.TraceLoggingEnabled))
|
2018-10-02 15:06:37 +00:00
|
|
|
|
2018-10-03 18:34:34 +00:00
|
|
|
// Apply options.
|
|
|
|
for _, option := range options {
|
|
|
|
option(e)
|
|
|
|
}
|
2019-01-17 00:54:20 +00:00
|
|
|
|
2018-11-07 16:44:25 +00:00
|
|
|
// Set default metrics labels.
|
2018-11-09 10:16:34 +00:00
|
|
|
e.engine.SetDefaultMetricLabels(e.defaultMetricLabels)
|
|
|
|
e.sfile.SetDefaultMetricLabels(e.defaultMetricLabels)
|
2018-11-29 14:13:16 +00:00
|
|
|
e.index.SetDefaultMetricLabels(e.defaultMetricLabels)
|
2019-01-17 00:54:20 +00:00
|
|
|
if e.wal != nil {
|
|
|
|
e.wal.SetDefaultMetricLabels(e.defaultMetricLabels)
|
|
|
|
}
|
2018-11-07 16:44:25 +00:00
|
|
|
|
2018-10-02 15:06:37 +00:00
|
|
|
return e
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithLogger sets the logger on the Store. It must be called before Open.
|
|
|
|
func (e *Engine) WithLogger(log *zap.Logger) {
|
2018-10-05 16:57:49 +00:00
|
|
|
fields := []zap.Field{}
|
|
|
|
if e.nodeID != nil {
|
|
|
|
fields = append(fields, zap.Int("node_id", *e.nodeID))
|
|
|
|
}
|
|
|
|
|
|
|
|
if e.engineID != nil {
|
2018-11-07 17:30:12 +00:00
|
|
|
fields = append(fields, zap.Int("engine_id", *e.engineID))
|
2018-10-05 16:57:49 +00:00
|
|
|
}
|
|
|
|
fields = append(fields, zap.String("service", "storage-engine"))
|
|
|
|
|
|
|
|
e.logger = log.With(fields...)
|
2018-10-02 15:06:37 +00:00
|
|
|
e.sfile.WithLogger(e.logger)
|
|
|
|
e.index.WithLogger(e.logger)
|
|
|
|
e.engine.WithLogger(e.logger)
|
2019-01-17 01:28:57 +00:00
|
|
|
e.wal.WithLogger(e.logger)
|
2018-10-10 11:36:02 +00:00
|
|
|
e.retentionEnforcer.WithLogger(e.logger)
|
2018-10-02 15:06:37 +00:00
|
|
|
}
|
|
|
|
|
2018-10-10 10:33:42 +00:00
|
|
|
// PrometheusCollectors returns all the prometheus collectors associated with
|
|
|
|
// the engine and its components.
|
|
|
|
func (e *Engine) PrometheusCollectors() []prometheus.Collector {
|
|
|
|
var metrics []prometheus.Collector
|
2018-12-06 15:39:06 +00:00
|
|
|
metrics = append(metrics, tsdb.PrometheusCollectors()...)
|
2018-12-07 13:48:43 +00:00
|
|
|
metrics = append(metrics, tsi1.PrometheusCollectors()...)
|
2018-12-05 16:41:00 +00:00
|
|
|
metrics = append(metrics, tsm1.PrometheusCollectors()...)
|
2019-01-17 00:54:20 +00:00
|
|
|
metrics = append(metrics, wal.PrometheusCollectors()...)
|
2018-10-10 11:36:02 +00:00
|
|
|
metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...)
|
2018-10-10 10:33:42 +00:00
|
|
|
return metrics
|
|
|
|
}
|
|
|
|
|
2018-10-02 15:06:37 +00:00
|
|
|
// Open opens the store and all underlying resources. It returns an error if
|
|
|
|
// any of the underlying systems fail to open.
|
2019-01-17 01:28:57 +00:00
|
|
|
func (e *Engine) Open() (err error) {
|
2018-10-03 18:34:34 +00:00
|
|
|
e.mu.Lock()
|
|
|
|
defer e.mu.Unlock()
|
|
|
|
|
2018-10-10 14:35:05 +00:00
|
|
|
if e.closing != nil {
|
|
|
|
return nil // Already open
|
2018-10-05 11:43:56 +00:00
|
|
|
}
|
|
|
|
|
2019-01-17 01:28:57 +00:00
|
|
|
// Open the services in order and clean up if any fail.
|
|
|
|
var oh openHelper
|
|
|
|
oh.Open(e.sfile)
|
|
|
|
oh.Open(e.index)
|
|
|
|
oh.Open(e.wal)
|
|
|
|
oh.Open(e.engine)
|
|
|
|
if err := oh.Done(); err != nil {
|
2018-10-03 18:34:34 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-01-17 01:28:57 +00:00
|
|
|
if err := e.reloadCache(); err != nil {
|
2018-10-03 18:34:34 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
e.engine.SetCompactionsEnabled(true) // TODO(edd):is this needed?
|
2018-10-10 10:42:37 +00:00
|
|
|
|
2018-10-10 14:35:05 +00:00
|
|
|
e.closing = make(chan struct{})
|
2018-11-07 16:44:25 +00:00
|
|
|
|
2018-10-10 14:35:05 +00:00
|
|
|
// TODO(edd) background tasks will be run in priority order via a scheduler.
|
|
|
|
// For now we will just run on an interval as we only have the retention
|
|
|
|
// policy enforcer.
|
|
|
|
e.runRetentionEnforcer()
|
2018-11-29 15:15:20 +00:00
|
|
|
|
2018-10-02 15:06:37 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-01-17 01:28:57 +00:00
|
|
|
// reloadCache reads the WAL segment files and loads them into the cache.
|
|
|
|
func (e *Engine) reloadCache() error {
|
|
|
|
if !e.config.WAL.Enabled {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
files, err := wal.SegmentFileNames(e.wal.Path())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
limit := e.engine.Cache.MaxSize()
|
|
|
|
defer func() {
|
|
|
|
e.engine.Cache.SetMaxSize(limit)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Disable the max size during loading
|
|
|
|
e.engine.Cache.SetMaxSize(0)
|
|
|
|
|
|
|
|
loader := tsm1.NewCacheLoader(files)
|
|
|
|
loader.WithLogger(e.logger)
|
|
|
|
if err := loader.Load(e.engine.Cache); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
e.logger.Info("Reloaded WAL cache", zap.String("path", e.wal.Path()), zap.Duration("duration", time.Since(now)))
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-10-10 14:35:05 +00:00
|
|
|
// runRetentionEnforcer runs the retention enforcer in a separate goroutine.
|
|
|
|
//
|
|
|
|
// Currently this just runs on an interval, but in the future we will add the
|
|
|
|
// ability to reschedule the retention enforcement if there are not enough
|
|
|
|
// resources available.
|
|
|
|
func (e *Engine) runRetentionEnforcer() {
|
2018-10-31 19:19:54 +00:00
|
|
|
interval := time.Duration(e.config.RetentionInterval)
|
|
|
|
|
|
|
|
if interval == 0 {
|
2018-10-10 14:35:05 +00:00
|
|
|
e.logger.Info("Retention enforcer disabled")
|
|
|
|
return // Enforcer disabled.
|
2018-10-31 19:19:54 +00:00
|
|
|
} else if interval < 0 {
|
|
|
|
e.logger.Error("Negative retention interval", logger.DurationLiteral("check_interval", interval))
|
2018-10-10 14:35:05 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-11-07 16:44:25 +00:00
|
|
|
if e.retentionEnforcer != nil {
|
|
|
|
// Set default metric labels on retention enforcer.
|
|
|
|
e.retentionEnforcer.metrics = newRetentionMetrics(e.defaultMetricLabels)
|
|
|
|
}
|
|
|
|
|
2018-10-29 21:42:55 +00:00
|
|
|
l := e.logger.With(zap.String("component", "retention_enforcer"), logger.DurationLiteral("check_interval", interval))
|
|
|
|
l.Info("Starting")
|
2018-10-10 14:35:05 +00:00
|
|
|
|
|
|
|
ticker := time.NewTicker(interval)
|
|
|
|
e.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer e.wg.Done()
|
|
|
|
for {
|
|
|
|
// It's safe to read closing without a lock because it's never
|
|
|
|
// modified if this goroutine is active.
|
|
|
|
select {
|
|
|
|
case <-e.closing:
|
2018-10-29 21:42:55 +00:00
|
|
|
l.Info("Stopping")
|
2018-10-10 14:35:05 +00:00
|
|
|
return
|
|
|
|
case <-ticker.C:
|
|
|
|
e.retentionEnforcer.run()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
2018-10-02 15:06:37 +00:00
|
|
|
// Close closes the store and all underlying resources. It returns an error if
|
|
|
|
// any of the underlying systems fail to close.
|
|
|
|
func (e *Engine) Close() error {
|
2018-10-10 14:35:05 +00:00
|
|
|
e.mu.RLock()
|
|
|
|
if e.closing == nil {
|
|
|
|
return nil // Already closed
|
2018-10-05 11:43:56 +00:00
|
|
|
}
|
2018-10-05 11:44:06 +00:00
|
|
|
|
2018-10-10 14:35:05 +00:00
|
|
|
close(e.closing)
|
|
|
|
e.mu.RUnlock()
|
|
|
|
|
|
|
|
// Wait for any other goroutines to finish.
|
|
|
|
e.wg.Wait()
|
|
|
|
|
|
|
|
e.mu.Lock()
|
|
|
|
defer e.mu.Unlock()
|
|
|
|
e.closing = nil
|
2018-10-10 10:42:37 +00:00
|
|
|
|
2019-01-17 01:28:57 +00:00
|
|
|
var ch closeHelper
|
|
|
|
ch.Close(e.engine)
|
|
|
|
ch.Close(e.wal)
|
|
|
|
ch.Close(e.index)
|
|
|
|
ch.Close(e.sfile)
|
|
|
|
return ch.Done()
|
2018-10-03 18:34:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (e *Engine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) {
|
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
2018-10-10 14:35:05 +00:00
|
|
|
if e.closing == nil {
|
2018-10-05 11:43:56 +00:00
|
|
|
return nil, ErrEngineClosed
|
|
|
|
}
|
2018-10-23 07:32:11 +00:00
|
|
|
return newSeriesCursor(req, e.index, cond)
|
2018-10-03 18:34:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (e *Engine) CreateCursorIterator(ctx context.Context) (tsdb.CursorIterator, error) {
|
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
2018-10-10 14:35:05 +00:00
|
|
|
if e.closing == nil {
|
2018-10-05 11:43:56 +00:00
|
|
|
return nil, ErrEngineClosed
|
|
|
|
}
|
2018-10-03 18:34:34 +00:00
|
|
|
return e.engine.CreateCursorIterator(ctx)
|
|
|
|
}
|
|
|
|
|
|
|
|
// WritePoints writes the provided points to the engine.
|
|
|
|
//
|
|
|
|
// The Engine expects all points to have been correctly validated by the caller.
|
|
|
|
// WritePoints will however determine if there are any field type conflicts, and
|
|
|
|
// return an appropriate error in that case.
|
|
|
|
func (e *Engine) WritePoints(points []models.Point) error {
|
|
|
|
collection := tsdb.NewSeriesCollection(points)
|
|
|
|
|
|
|
|
j := 0
|
|
|
|
for iter := collection.Iterator(); iter.Next(); {
|
|
|
|
tags := iter.Tags()
|
|
|
|
|
2018-10-05 11:43:56 +00:00
|
|
|
if tags.Len() > 0 && bytes.Equal(tags[0].Key, tsdb.FieldKeyTagKeyBytes) && bytes.Equal(tags[0].Value, timeBytes) {
|
|
|
|
// Field key "time" is invalid
|
|
|
|
if collection.Reason == "" {
|
|
|
|
collection.Reason = fmt.Sprintf("invalid field key: input field %q is invalid", timeBytes)
|
|
|
|
}
|
|
|
|
collection.Dropped++
|
|
|
|
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
|
2018-11-12 15:59:48 +00:00
|
|
|
continue
|
2018-10-05 11:43:56 +00:00
|
|
|
}
|
|
|
|
|
2018-10-03 18:34:34 +00:00
|
|
|
// Filter out any tags with key equal to "time": they are invalid.
|
|
|
|
if tags.Get(timeBytes) != nil {
|
|
|
|
if collection.Reason == "" {
|
2018-10-05 11:43:56 +00:00
|
|
|
collection.Reason = fmt.Sprintf("invalid tag key: input tag %q on measurement %q is invalid", timeBytes, iter.Name())
|
2018-10-03 18:34:34 +00:00
|
|
|
}
|
|
|
|
collection.Dropped++
|
|
|
|
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Drop any series with invalid unicode characters in the key.
|
|
|
|
if e.config.ValidateKeys && !models.ValidKeyTokens(string(iter.Name()), tags) {
|
|
|
|
if collection.Reason == "" {
|
|
|
|
collection.Reason = fmt.Sprintf("key contains invalid unicode: %q", iter.Key())
|
|
|
|
}
|
|
|
|
collection.Dropped++
|
|
|
|
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
collection.Copy(j, iter.Index())
|
|
|
|
j++
|
|
|
|
}
|
|
|
|
collection.Truncate(j)
|
|
|
|
|
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
|
|
|
|
2018-10-10 14:35:05 +00:00
|
|
|
if e.closing == nil {
|
2018-10-05 11:43:56 +00:00
|
|
|
return ErrEngineClosed
|
|
|
|
}
|
|
|
|
|
2018-10-03 18:34:34 +00:00
|
|
|
// Add new series to the index and series file. Check for partial writes.
|
|
|
|
if err := e.index.CreateSeriesListIfNotExists(collection); err != nil {
|
|
|
|
// ignore PartialWriteErrors. The collection captures it.
|
|
|
|
// TODO(edd/jeff): should we just remove PartialWriteError from the index then?
|
|
|
|
if _, ok := err.(tsdb.PartialWriteError); !ok {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write the points to the cache and WAL.
|
|
|
|
if err := e.engine.WritePoints(collection.Points); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return collection.PartialWriteError()
|
|
|
|
}
|
|
|
|
|
2019-01-08 15:56:18 +00:00
|
|
|
// DeleteBucket deletes an entire bucket from the storage engine.
|
|
|
|
func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error {
|
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
|
|
|
if e.closing == nil {
|
|
|
|
return ErrEngineClosed
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
|
|
|
// don't have to remember to get it right everywhere we need to touch TSM data.
|
2019-01-09 22:24:26 +00:00
|
|
|
encoded := tsdb.EncodeName(orgID, bucketID)
|
2019-01-11 19:28:46 +00:00
|
|
|
name := models.EscapeMeasurement(encoded[:])
|
2019-01-09 15:09:56 +00:00
|
|
|
|
2019-01-11 19:28:46 +00:00
|
|
|
return e.engine.DeleteBucket(name, math.MinInt64, math.MaxInt64)
|
2019-01-08 15:56:18 +00:00
|
|
|
}
|
|
|
|
|
2018-10-03 18:34:34 +00:00
|
|
|
// DeleteSeriesRangeWithPredicate deletes all series data iterated over if fn returns
|
|
|
|
// true for that series.
|
|
|
|
func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error {
|
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
2018-10-10 14:35:05 +00:00
|
|
|
if e.closing == nil {
|
2018-10-05 11:43:56 +00:00
|
|
|
return ErrEngineClosed
|
|
|
|
}
|
2018-10-03 18:34:34 +00:00
|
|
|
return e.engine.DeleteSeriesRangeWithPredicate(itr, fn)
|
|
|
|
}
|
|
|
|
|
|
|
|
// SeriesCardinality returns the number of series in the engine.
|
|
|
|
func (e *Engine) SeriesCardinality() int64 {
|
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
2018-10-10 14:35:05 +00:00
|
|
|
if e.closing == nil {
|
2018-10-05 11:43:56 +00:00
|
|
|
return 0
|
|
|
|
}
|
2018-10-03 18:34:34 +00:00
|
|
|
return e.index.SeriesN()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Path returns the path of the engine's base directory.
|
|
|
|
func (e *Engine) Path() string {
|
|
|
|
return e.path
|
|
|
|
}
|
|
|
|
|
|
|
|
// ApplyFnToSeriesIDSet allows the caller to apply fn to the SeriesIDSet held
|
|
|
|
// within the engine's index.
|
|
|
|
func (e *Engine) ApplyFnToSeriesIDSet(fn func(*tsdb.SeriesIDSet)) {
|
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
2018-10-10 14:35:05 +00:00
|
|
|
if e.closing == nil {
|
2018-10-05 11:43:56 +00:00
|
|
|
return
|
|
|
|
}
|
2018-10-03 18:34:34 +00:00
|
|
|
fn(e.index.SeriesIDSet())
|
2018-10-02 15:06:37 +00:00
|
|
|
}
|
2018-10-18 18:10:21 +00:00
|
|
|
|
|
|
|
// MeasurementCardinalityStats returns cardinality stats for all measurements.
|
|
|
|
func (e *Engine) MeasurementCardinalityStats() tsi1.MeasurementCardinalityStats {
|
|
|
|
return e.index.MeasurementCardinalityStats()
|
|
|
|
}
|
|
|
|
|
|
|
|
// MeasurementStats returns the current measurement stats for the engine.
|
|
|
|
func (e *Engine) MeasurementStats() (tsm1.MeasurementStats, error) {
|
|
|
|
return e.engine.MeasurementStats()
|
|
|
|
}
|