Convert Filestore stats

pull/10616/head
Edd Robinson 2018-10-31 18:36:22 +00:00
parent f56bc0853f
commit d61b9f1645
4 changed files with 168 additions and 193 deletions

View File

@ -1,126 +0,0 @@
package tsdb
import (
"context"
"errors"
"regexp"
"runtime"
"time"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/limiter"
"go.uber.org/zap"
)
var (
// ErrUnknownEngineFormat is returned when the engine format is
// unknown. ErrUnknownEngineFormat is currently returned if a format
// other than tsm1 is encountered.
ErrUnknownEngineFormat = errors.New("unknown engine format")
)
// Engine represents a swappable storage engine for the shard.
type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
ScheduleFullCompaction() error
WithLogger(*zap.Logger)
CreateCursorIterator(ctx context.Context) (CursorIterator, error)
WritePoints(points []models.Point) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ models.FieldType) error
CreateSeriesListIfNotExists(collection *SeriesCollection) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error
SeriesN() int64
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
TagKeyCardinality(name, key []byte) int
LastModified() time.Time
DiskSize() int64
IsIdle() bool
Free() error
}
// SeriesIDSets provides access to the total set of series IDs
type SeriesIDSets interface {
ForEach(f func(ids *SeriesIDSet)) error
}
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
ShardID uint64
// Limits the concurrent number of TSM files that can be loaded at once.
OpenLimiter limiter.Fixed
// CompactionDisabled specifies shards should not schedule compactions.
// This option is intended for offline tooling.
CompactionDisabled bool
CompactionPlannerCreator CompactionPlannerCreator
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate
WALEnabled bool
MonitorDisabled bool
// DatabaseFilter is a predicate controlling which databases may be opened.
// If no function is set, all databases will be opened.
DatabaseFilter func(database string) bool
// RetentionPolicyFilter is a predicate controlling which combination of database and retention policy may be opened.
// nil will allow all combinations to pass.
RetentionPolicyFilter func(database, rp string) bool
// ShardFilter is a predicate controlling which combination of database, retention policy and shard group may be opened.
// nil will allow all combinations to pass.
ShardFilter func(database, rp string, id uint64) bool
Config Config
SeriesIDSets SeriesIDSets
OnNewEngine func(Engine)
FileStoreObserver FileStoreObserver
}
// NewEngineOptions constructs an EngineOptions object with safe default values.
// This should only be used in tests; production environments should read from a config file.
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
Config: NewConfig(),
OpenLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
CompactionLimiter: limiter.NewFixed(1),
WALEnabled: false,
}
}
// NewInmemIndex returns a new "inmem" index type.
var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error)
type CompactionPlannerCreator func(cfg Config) interface{}
// FileStoreObserver is passed notifications before the file store adds or deletes files. In this way, it can
// be sure to observe every file that is added or removed even in the presence of process death.
type FileStoreObserver interface {
// FileFinishing is called before a file is renamed to it's final name.
FileFinishing(path string) error
// FileUnlinking is called before a file is unlinked.
FileUnlinking(path string) error
}

View File

@ -221,7 +221,7 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt
enableCompactionsOnOpen: true,
formatFileName: DefaultFormatFileName,
compactionLimiter: limiter.NewFixed(maxCompactions),
scheduler: newScheduler(stats, maxCompactions),
scheduler: newScheduler(maxCompactions),
}
for _, option := range options {
@ -497,7 +497,11 @@ func (e *Engine) DiskSize() int64 {
func (e *Engine) Open() error {
// Initialise metrics...
e.blockMetrics = newBlockMetrics(e.defaultMetricLabels)
e.compactionTracker = newCompactionTracker(e.blockMetrics)
// Propagate prometheus metrics down into trackers.
e.compactionTracker = newCompactionTracker(e.blockMetrics.compactionMetrics)
e.FileStore.fileTracker = newFileTracker(e.blockMetrics.fileMetrics)
e.scheduler.setCompactionTracker(e.compactionTracker)
if err := os.MkdirAll(e.path, 0777); err != nil {
@ -550,7 +554,6 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, e.blockMetrics.PrometheusCollectors()...)
// TODO(edd): Add Filestore metrics
// TODO(edd): Add Cache metrics
// TODO(edd): Add WAL metrics
return metrics
@ -1035,7 +1038,7 @@ func (l compactionLevel) String() string {
// *NOTE* - compactionTracker fields should not be directory modified. Doing so
// could result in the Engine exposing inaccurate metrics.
type compactionTracker struct {
metrics *blockMetrics
metrics *compactionMetrics
// Note: Compactions are levelled as follows:
// 0 Snapshots
@ -1049,8 +1052,8 @@ type compactionTracker struct {
queue [6]uint64 // Gauge of TSM compactions queues (by level).
}
func newCompactionTracker(blockMetrics *blockMetrics) *compactionTracker {
return &compactionTracker{metrics: blockMetrics}
func newCompactionTracker(metrics *compactionMetrics) *compactionTracker {
return &compactionTracker{metrics: metrics}
}
// Completed returns the total number of compactions for the provided level.
@ -1090,7 +1093,7 @@ func (t *compactionTracker) Errors(level int) uint64 { return atomic.LoadUint64(
func (t *compactionTracker) IncActive(level compactionLevel) {
atomic.AddUint64(&t.active[level], 1)
labels := t.metrics.CompactionLabels(level)
labels := t.metrics.Labels(level)
t.metrics.CompactionsActive.With(labels).Inc()
}
@ -1101,7 +1104,7 @@ func (t *compactionTracker) IncFullActive() { t.IncActive(5) }
func (t *compactionTracker) DecActive(level compactionLevel) {
atomic.AddUint64(&t.active[level], ^uint64(0))
labels := t.metrics.CompactionLabels(level)
labels := t.metrics.Labels(level)
t.metrics.CompactionsActive.With(labels).Dec()
}
@ -1113,7 +1116,7 @@ func (t *compactionTracker) Attempted(level compactionLevel, success bool, durat
if success {
atomic.AddUint64(&t.ok[level], 1)
labels := t.metrics.CompactionLabels(level)
labels := t.metrics.Labels(level)
t.metrics.CompactionDuration.With(labels).Observe(duration.Seconds())
labels["status"] = "ok"
@ -1123,7 +1126,7 @@ func (t *compactionTracker) Attempted(level compactionLevel, success bool, durat
atomic.AddUint64(&t.errors[level], 1)
labels := t.metrics.CompactionLabels(level)
labels := t.metrics.Labels(level)
labels["status"] = "error"
t.metrics.Compactions.With(labels).Inc()
}
@ -1137,7 +1140,7 @@ func (t *compactionTracker) SnapshotAttempted(success bool, duration time.Durati
func (t *compactionTracker) SetQueue(level compactionLevel, length uint64) {
atomic.StoreUint64(&t.queue[level], length)
labels := t.metrics.CompactionLabels(level)
labels := t.metrics.Labels(level)
t.metrics.CompactionQueue.With(labels).Set(float64(length))
}

View File

@ -17,7 +17,6 @@ import (
"sync/atomic"
"time"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/file"
"github.com/influxdata/platform/pkg/limiter"
"github.com/influxdata/platform/pkg/metrics"
@ -160,12 +159,6 @@ type FileStoreObserver interface {
FileUnlinking(path string) error
}
// Statistics gathered by the FileStore.
const (
statFileStoreBytes = "diskBytes"
statFileStoreCount = "numFiles"
)
var (
floatBlocksDecodedCounter = metrics.MustRegisterCounter("float_blocks_decoded", metrics.WithGroup(tsmGroup))
floatBlocksSizeCounter = metrics.MustRegisterCounter("float_blocks_size_bytes", metrics.WithGroup(tsmGroup))
@ -198,8 +191,8 @@ type FileStore struct {
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
stats *FileStoreStatistics
purger *purger
fileTracker *fileTracker
purger *purger
currentTempDirID int
@ -242,7 +235,6 @@ func NewFileStore(dir string) *FileStore {
logger: logger,
traceLogger: logger,
openLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
stats: &FileStoreStatistics{},
purger: &purger{
files: map[string]TSMFile{},
logger: logger,
@ -290,20 +282,53 @@ func (f *FileStore) WithLogger(log *zap.Logger) {
// FileStoreStatistics keeps statistics about the file store.
type FileStoreStatistics struct {
DiskBytes int64
FileCount int64
SDiskBytes int64
SFileCount int64
}
// Statistics returns statistics for periodic monitoring.
func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "tsm1_filestore",
Tags: tags,
Values: map[string]interface{}{
statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes),
statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount),
},
}}
// fileTracker tracks file counts and sizes within the FileStore.
//
// As well as being responsible for providing atomic reads and writes to the
// statistics, fileTracker also mirrors any changes to the external prometheus
// metrics, which the Engine exposes.
//
// *NOTE* - fileTracker fields should not be directory modified. Doing so
// could result in the Engine exposing inaccurate metrics.
type fileTracker struct {
metrics *fileMetrics
diskBytes uint64
fileCount uint64
}
func newFileTracker(metrics *fileMetrics) *fileTracker {
return &fileTracker{metrics: metrics}
}
// Bytes returns the number of bytes in use on disk.
func (t *fileTracker) Bytes() uint64 { return atomic.LoadUint64(&t.diskBytes) }
// SetBytes sets the number of bytes in use on disk.
func (t *fileTracker) SetBytes(bytes uint64) {
atomic.StoreUint64(&t.diskBytes, bytes)
labels := t.metrics.Labels()
t.metrics.DiskSize.With(labels).Set(float64(bytes))
}
// AddBytes increases the number of bytes.
func (t *fileTracker) AddBytes(bytes uint64) {
atomic.AddUint64(&t.diskBytes, bytes)
labels := t.metrics.Labels()
t.metrics.DiskSize.With(labels).Add(float64(bytes))
}
// SetFileCount sets the number of files in the FileStore.
func (t *fileTracker) SetFileCount(files uint64) {
atomic.StoreUint64(&t.fileCount, files)
labels := t.metrics.Labels()
t.metrics.Files.With(labels).Set(float64(files))
}
// Count returns the number of TSM files currently loaded.
@ -581,10 +606,11 @@ func (f *FileStore) Open() error {
f.files = append(f.files, res.r)
// Accumulate file store size stats
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
totalSize := uint64(res.r.Size())
for _, ts := range res.r.TombstoneFiles() {
atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size))
totalSize += uint64(ts.Size)
}
f.fileTracker.AddBytes(totalSize)
// Re-initialize the lastModified time for the file store
if res.r.LastModified() > lm {
@ -596,7 +622,7 @@ func (f *FileStore) Open() error {
close(readerC)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
f.fileTracker.SetFileCount(uint64(len(f.files)))
return nil
}
@ -609,7 +635,7 @@ func (f *FileStore) Close() error {
f.lastFileStats = nil
f.files = nil
atomic.StoreInt64(&f.stats.FileCount, 0)
f.fileTracker.SetFileCount(uint64(0))
// Let other methods access this closed object while we do the actual closing.
f.mu.Unlock()
@ -624,9 +650,8 @@ func (f *FileStore) Close() error {
return nil
}
func (f *FileStore) DiskSizeBytes() int64 {
return atomic.LoadInt64(&f.stats.DiskBytes)
}
// DiskSizeBytes returns the total number of bytes consumed by the files in the FileStore.
func (f *FileStore) DiskSizeBytes() int64 { return int64(f.fileTracker.Bytes()) }
// Read returns the slice of values for the given key and the given timestamp,
// if any file matches those constraints.
@ -878,18 +903,18 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
f.lastFileStats = nil
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
f.fileTracker.SetFileCount(uint64(len(f.files)))
// Recalculate the disk size stat
var totalSize int64
var totalSize uint64
for _, file := range f.files {
totalSize += int64(file.Size())
totalSize += uint64(file.Size())
for _, ts := range file.TombstoneFiles() {
totalSize += int64(ts.Size)
totalSize += uint64(ts.Size)
}
}
atomic.StoreInt64(&f.stats.DiskBytes, totalSize)
f.fileTracker.SetBytes(totalSize)
return nil
}

View File

@ -10,20 +10,44 @@ import (
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"
const blockSubsystem = "block" // sub-system associated with metrics for block storage.
const compactionSubsystem = "compactions" // sub-system associated with metrics for compactions
const fileStoreSubsystem = "tsm_files" // sub-system associated with metrics for compactions
// blockMetrics are a set of metrics concerned with tracking data about block storage.
type blockMetrics struct {
labels prometheus.Labels // Read only.
labels prometheus.Labels
*compactionMetrics
*fileMetrics
}
// newBlockMetrics initialises the prometheus metrics for the block subsystem.
func newBlockMetrics(labels prometheus.Labels) *blockMetrics {
return &blockMetrics{
labels: labels,
compactionMetrics: newCompactionMetrics(labels),
fileMetrics: newFileMetrics(labels),
}
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (m *blockMetrics) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, m.compactionMetrics.PrometheusCollectors()...)
metrics = append(metrics, m.fileMetrics.PrometheusCollectors()...)
return metrics
}
// compactionMetrics are a set of metrics concerned with tracking data about compactions.
type compactionMetrics struct {
labels prometheus.Labels // Read Only
Compactions *prometheus.CounterVec
CompactionsActive *prometheus.GaugeVec
CompactionDuration *prometheus.HistogramVec
CompactionQueue *prometheus.GaugeVec
}
// newBlockMetrics initialises the prometheus metrics for the block subsystem.
func newBlockMetrics(labels prometheus.Labels) *blockMetrics {
// newCompactionMetrics initialises the prometheus metrics for compactions.
func newCompactionMetrics(labels prometheus.Labels) *compactionMetrics {
compactionNames := []string{"level"} // All compaction metrics have a `level` label.
for k := range labels {
compactionNames = append(compactionNames, k)
@ -32,41 +56,41 @@ func newBlockMetrics(labels prometheus.Labels) *blockMetrics {
totalCompactionsNames := append(compactionNames, "status")
sort.Strings(totalCompactionsNames)
return &blockMetrics{
return &compactionMetrics{
labels: labels,
Compactions: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: blockSubsystem,
Name: "compactions_total",
Subsystem: compactionSubsystem,
Name: "total",
Help: "Number of times cache snapshotted or TSM compaction attempted.",
}, totalCompactionsNames),
CompactionsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: blockSubsystem,
Name: "compactions_active",
Subsystem: compactionSubsystem,
Name: "active",
Help: "Number of active compactions.",
}, compactionNames),
CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: blockSubsystem,
Name: "compaction_duration_seconds",
Subsystem: compactionSubsystem,
Name: "duration_seconds",
Help: "Time taken for a successful compaction or snapshot.",
// 30 buckets spaced exponentially between 5s and ~53 minutes.
Buckets: prometheus.ExponentialBuckets(5.0, 1.25, 30),
}, compactionNames),
CompactionQueue: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: blockSubsystem,
Name: "compactions_queued",
Subsystem: compactionSubsystem,
Name: "queued",
Help: "Number of queued compactions.",
}, compactionNames),
}
}
// CompactionLabels returns a copy of labels for use with compaction metrics.
func (b *blockMetrics) CompactionLabels(level compactionLevel) prometheus.Labels {
l := make(map[string]string, len(b.labels))
for k, v := range b.labels {
// Labels returns a copy of labels for use with compaction metrics.
func (m *compactionMetrics) Labels(level compactionLevel) prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {
l[k] = v
}
l["level"] = fmt.Sprint(level)
@ -74,11 +98,60 @@ func (b *blockMetrics) CompactionLabels(level compactionLevel) prometheus.Labels
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (b *blockMetrics) PrometheusCollectors() []prometheus.Collector {
func (m *compactionMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
b.Compactions,
b.CompactionsActive,
b.CompactionDuration,
b.CompactionQueue,
m.Compactions,
m.CompactionsActive,
m.CompactionDuration,
m.CompactionQueue,
}
}
// fileMetrics are a set of metrics concerned with tracking data about compactions.
type fileMetrics struct {
labels prometheus.Labels
DiskSize *prometheus.GaugeVec
Files *prometheus.GaugeVec
}
// newFileMetrics initialises the prometheus metrics for tracking files on disk.
func newFileMetrics(labels prometheus.Labels) *fileMetrics {
var names []string
for k := range labels {
names = append(names, k)
}
sort.Strings(names)
return &fileMetrics{
labels: labels,
DiskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: fileStoreSubsystem,
Name: "disk_bytes",
Help: "Number of bytes TSM files using on disk.",
}, names),
Files: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: fileStoreSubsystem,
Name: "total",
Help: "Number of files.",
}, names),
}
}
// Labels returns a copy of labels for use with file metrics.
func (m *fileMetrics) Labels() prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {
l[k] = v
}
return l
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (m *fileMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
m.DiskSize,
m.Files,
}
}