refactor: hook up metrics and wal to storage engine

It turns out that LastModified and DiskSize are unused, and so it
was easy to change to not care about the WAL.

This hooks up metrics and starts the WAL again.
pull/11364/head
Jeff Wendling 2019-01-16 17:54:20 -07:00
parent 95de3d52b2
commit d2ddd48eea
6 changed files with 75 additions and 188 deletions

View File

@ -128,17 +128,20 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
// Initialise Engine
e.engine = tsm1.NewEngine(c.GetEnginePath(path), e.index, c.Engine,
tsm1.WithWAL(e.wal),
tsm1.WithTraceLogging(c.TraceLoggingEnabled))
// Apply options.
for _, option := range options {
option(e)
}
// Set default metrics labels.
e.engine.SetDefaultMetricLabels(e.defaultMetricLabels)
e.sfile.SetDefaultMetricLabels(e.defaultMetricLabels)
e.index.SetDefaultMetricLabels(e.defaultMetricLabels)
if e.wal != nil {
e.wal.SetDefaultMetricLabels(e.defaultMetricLabels)
}
return e
}
@ -159,6 +162,9 @@ func (e *Engine) WithLogger(log *zap.Logger) {
e.sfile.WithLogger(e.logger)
e.index.WithLogger(e.logger)
e.engine.WithLogger(e.logger)
if e.wal != nil {
e.wal.WithLogger(e.logger)
}
e.retentionEnforcer.WithLogger(e.logger)
}
@ -169,6 +175,7 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector {
metrics = append(metrics, tsdb.PrometheusCollectors()...)
metrics = append(metrics, tsi1.PrometheusCollectors()...)
metrics = append(metrics, tsm1.PrometheusCollectors()...)
metrics = append(metrics, wal.PrometheusCollectors()...)
metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...)
return metrics
}
@ -191,6 +198,12 @@ func (e *Engine) Open() error {
return err
}
if e.wal != nil {
if err := e.wal.Open(); err != nil {
return err
}
}
if err := e.engine.Open(); err != nil {
return err
}
@ -266,15 +279,25 @@ func (e *Engine) Close() error {
defer e.mu.Unlock()
e.closing = nil
if err := e.sfile.Close(); err != nil {
if err := e.engine.Close(); err != nil {
return err
}
if e.wal != nil {
if err := e.wal.Close(); err != nil {
return err
}
}
if err := e.index.Close(); err != nil {
return err
}
return e.engine.Close()
if err := e.sfile.Close(); err != nil {
return err
}
return nil
}
func (e *Engine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) {

View File

@ -2,10 +2,32 @@ package wal
import (
"sort"
"sync"
"github.com/prometheus/client_golang/prometheus"
)
// The following package variables act as singletons, to be shared by all
// storage.Engine instantiations. This allows multiple WALs to be monitored
// within the same process.
var (
wms *walMetrics // main metrics
mmu sync.RWMutex
)
// PrometheusCollectors returns all the metrics associated with the tsdb package.
func PrometheusCollectors() []prometheus.Collector {
mmu.RLock()
defer mmu.RUnlock()
var collectors []prometheus.Collector
if wms != nil {
collectors = append(collectors, wms.PrometheusCollectors()...)
}
return collectors
}
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"

View File

@ -105,7 +105,9 @@ type WAL struct {
// SegmentSize is the file size at which a segment file will be rotated
SegmentSize int
tracker *walTracker
tracker *walTracker
defaultMetricLabels prometheus.Labels // N.B this must not be mutated after Open is called.
limiter limiter.Fixed
}
@ -122,7 +124,6 @@ func NewWAL(path string) *WAL {
limiter: limiter.NewFixed(defaultWaitingWALWrites),
logger: logger,
traceLogger: logger,
tracker: newWALTracker(newWALMetrics(nil), nil),
}
}
@ -148,6 +149,15 @@ func (l *WAL) WithLogger(log *zap.Logger) {
}
}
// SetDefaultMetricLabels sets the default labels for metrics on the engine.
// It must be called before the Engine is opened.
func (l *WAL) SetDefaultMetricLabels(labels prometheus.Labels) {
l.defaultMetricLabels = make(prometheus.Labels, len(labels))
for k, v := range labels {
l.defaultMetricLabels[k] = v
}
}
// Path returns the directory the log was initialized with.
func (l *WAL) Path() string {
l.mu.RLock()
@ -160,6 +170,16 @@ func (l *WAL) Open() error {
l.mu.Lock()
defer l.mu.Unlock()
// Initialise metrics for trackers.
mmu.Lock()
if wms == nil {
wms = newWALMetrics(l.defaultMetricLabels)
}
mmu.Unlock()
// Set the shared metrics for the tracker
l.tracker = newWALTracker(wms, l.defaultMetricLabels)
l.traceLogger.Info("tsm1 WAL starting", zap.Int("segment_size", l.SegmentSize))
l.traceLogger.Info("tsm1 WAL writing", zap.String("path", l.path))

View File

@ -24,7 +24,6 @@ import (
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/pkg/metrics"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/storage/wal"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/tsi1"
"github.com/influxdata/influxql"
@ -79,13 +78,6 @@ const (
// an Engine.
type EngineOption func(i *Engine)
// WithWAL sets the WAL for the Engine
var WithWAL = func(w *wal.WAL) EngineOption {
return func(e *Engine) {
e.WAL = w
}
}
// WithTraceLogging sets if trace logging is enabled for the engine.
var WithTraceLogging = func(logging bool) EngineOption {
return func(e *Engine) {
@ -128,7 +120,6 @@ type Engine struct {
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
WAL *wal.WAL
Cache *Cache
Compactor *Compactor
CompactionPlan CompactionPlanner
@ -472,31 +463,11 @@ func (e *Engine) SeriesN() int64 {
return e.index.SeriesN()
}
// LastModified returns the time when this shard was last modified.
func (e *Engine) LastModified() time.Time {
fsTime := e.FileStore.LastModified()
if e.WAL != nil && e.WAL.LastWriteTime().After(fsTime) {
return e.WAL.LastWriteTime()
}
return fsTime
}
// MeasurementStats returns the current measurement stats for the engine.
func (e *Engine) MeasurementStats() (MeasurementStats, error) {
return e.FileStore.MeasurementStats()
}
// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
func (e *Engine) DiskSize() int64 {
var walDiskSizeBytes int64
if e.WAL != nil {
walDiskSizeBytes = e.WAL.DiskSizeBytes()
}
return e.FileStore.DiskSizeBytes() + walDiskSizeBytes
}
func (e *Engine) initTrackers() {
mmu.Lock()
defer mmu.Unlock()
@ -526,20 +497,10 @@ func (e *Engine) Open() error {
return err
}
if e.WAL != nil {
if err := e.WAL.Open(); err != nil {
return err
}
}
if err := e.FileStore.Open(); err != nil {
return err
}
if err := e.reloadCache(); err != nil {
return err
}
e.Compactor.Open()
if e.enableCompactionsOnOpen {
@ -562,12 +523,6 @@ func (e *Engine) Close() error {
return err
}
if e.WAL != nil {
if err := e.WAL.Close(); err != nil {
return err
}
}
return nil
}
@ -579,9 +534,6 @@ func (e *Engine) WithLogger(log *zap.Logger) {
e.traceLogger = e.logger
}
if e.WAL != nil {
e.WAL.WithLogger(e.logger)
}
e.FileStore.WithLogger(e.logger)
}
@ -654,18 +606,10 @@ func (e *Engine) WritePoints(points []models.Point) error {
e.mu.RLock()
defer e.mu.RUnlock()
// first try to write to the cache
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
// Then make the write durable in the cache.
if e.WAL != nil {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}
return nil
}
@ -875,13 +819,6 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
e.Cache.DeleteRange(deleteKeys, min, max)
// delete from the WAL
if e.WAL != nil {
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
}
// The series are deleted on disk, but the index may still say they exist.
// Depending on the the min,max time passed in, the series may or not actually
// exists now. To reconcile the index, we walk the series keys that still exists
@ -1201,24 +1138,9 @@ func (e *Engine) WriteSnapshot() error {
logEnd()
}()
closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.WAL != nil {
if err = e.WAL.CloseSegment(); err != nil {
return nil, nil, err
}
segments, err = e.WAL.ClosedSegments()
if err != nil {
return nil, nil, err
}
}
snapshot, err = e.Cache.Snapshot()
return segments, snapshot, err
}()
e.mu.Lock()
snapshot, err := e.Cache.Snapshot()
e.mu.Unlock()
if err != nil {
return err
}
@ -1237,11 +1159,11 @@ func (e *Engine) WriteSnapshot() error {
zap.String("path", e.path),
zap.Duration("duration", time.Since(dedup)))
return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
return e.writeSnapshotAndCommit(log, snapshot)
}
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.
func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {
func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, snapshot *Cache) (err error) {
defer func() {
if err != nil {
e.Cache.ClearSnapshot(false)
@ -1267,12 +1189,6 @@ func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, s
// clear the snapshot from the in-memory cache, then the old WAL files
e.Cache.ClearSnapshot(true)
if e.WAL != nil {
if err := e.WAL.Remove(closedFiles); err != nil {
log.Info("Error removing closed WAL segments", zap.Error(err))
}
}
return nil
}
@ -1578,36 +1494,6 @@ func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *c
return s
}
// reloadCache reads the WAL segment files and loads them into the cache.
func (e *Engine) reloadCache() error {
if e.WAL == nil {
return nil
}
now := time.Now()
files, err := wal.SegmentFileNames(e.WAL.Path())
if err != nil {
return err
}
limit := e.Cache.MaxSize()
defer func() {
e.Cache.SetMaxSize(limit)
}()
// Disable the max size during loading
e.Cache.SetMaxSize(0)
loader := NewCacheLoader(files)
loader.WithLogger(e.logger)
if err := loader.Load(e.Cache); err != nil {
return err
}
e.traceLogger.Info("Reloaded WAL cache", zap.String("path", e.WAL.Path()), zap.Duration("duration", time.Since(now)))
return nil
}
// cleanup removes all temp files and dirs that exist on disk. This is should only be run at startup to avoid
// removing tmp files that are still in use.
func (e *Engine) cleanup() error {

View File

@ -101,11 +101,6 @@ func (e *Engine) DeleteBucket(name []byte, min, max int64) error {
// Delete from the cache and WAL.
e.Cache.DeleteRange(deleteKeys, min, max)
if e.WAL != nil {
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
}
// Now that all of the data is purged, we need to find if some keys are fully deleted
// and if so, remove them from the index.

View File

@ -630,65 +630,6 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
}
}
func TestEngine_LastModified(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
e, err := NewEngine()
if err != nil {
t.Fatal(err)
}
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
e.SetEnabled(false)
if err := e.Open(); err != nil {
t.Fatal(err)
}
defer e.Close()
if err := e.writePoints(p1, p2, p3); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
lm := e.LastModified()
if lm.IsZero() {
t.Fatalf("expected non-zero time, got %v", lm.UTC())
}
e.SetEnabled(true)
// Artificial sleep added due to filesystems caching the mod time
// of files. This prevents the WAL last modified time from being
// returned and newer than the filestore's mod time.
time.Sleep(2 * time.Second) // Covers most filesystems.
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
lm2 := e.LastModified()
if got, exp := lm.Equal(lm2), false; exp != got {
t.Fatalf("expected time change, got %v, exp %v: %s == %s", got, exp, lm.String(), lm2.String())
}
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
lm3 := e.LastModified()
if got, exp := lm2.Equal(lm3), false; exp != got {
t.Fatalf("expected time change, got %v, exp %v", got, exp)
}
}
func TestEngine_SnapshotsDisabled(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()