Enable the WAL

pull/10616/head
Edd Robinson 2018-10-04 15:09:59 +01:00
parent 05b68ce154
commit e85999ed45
4 changed files with 137 additions and 7 deletions

View File

@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/toml"
)
@ -62,6 +63,14 @@ type Config struct {
Engine string `toml:"-"`
Index string `toml:"index-version"`
// General WAL configuration options
WALDir string `toml:"wal-dir"`
// WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration
// greater than 0 can be used to batch up multiple fsync calls. This is useful for slower
// disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL.
WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"`
// Enables unicode validation on series keys on write.
ValidateKeys bool `toml:"validate-keys"`
@ -148,3 +157,17 @@ func (c *Config) Validate() error {
return nil
}
// Diagnostics returns a diagnostics representation of a subset of the Config.
func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
return diagnostics.RowFromMap(map[string]interface{}{
"dir": c.Dir,
"wal-dir": c.WALDir,
"wal-fsync-delay": c.WALFsyncDelay,
"cache-max-memory-size": c.CacheMaxMemorySize,
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
"compact-full-write-cold-duration": c.CompactFullWriteColdDuration,
"max-concurrent-compactions": c.MaxConcurrentCompactions,
}), nil
}

View File

@ -161,6 +161,7 @@ type EngineOptions struct {
CompactionPlannerCreator CompactionPlannerCreator
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate
WALEnabled bool
MonitorDisabled bool
// DatabaseFilter is a predicate controlling which databases may be opened.
@ -192,6 +193,7 @@ func NewEngineOptions() EngineOptions {
Config: NewConfig(),
OpenLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
CompactionLimiter: limiter.NewFixed(1),
WALEnabled: false,
}
}

View File

@ -155,6 +155,7 @@ type Engine struct {
fieldset *tsdb.MeasurementFieldSet
WAL Log
Cache *Cache
Compactor *Compactor
CompactionPlan CompactionPlanner
@ -224,6 +225,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
traceLogger: logger,
traceLogging: opt.Config.TraceLoggingEnabled,
WAL: NopWAL{},
Cache: cache,
FileStore: fs,
@ -240,6 +242,12 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
seriesIDSets: opt.SeriesIDSets,
}
if opt.WALEnabled {
wal := NewWAL(walPath)
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
e.WAL = wal
}
// Feature flag to enable per-series type checking, by default this is off and
// e.seriesTypeMap will be nil.
if os.Getenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED") != "" {
@ -248,8 +256,10 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
if e.traceLogging {
fs.enableTraceLogging(true)
if wal, ok := e.WAL.(*WAL); ok {
wal.enableTraceLogging(true)
}
}
return e
}
@ -598,6 +608,9 @@ func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
func (e *Engine) LastModified() time.Time {
fsTime := e.FileStore.LastModified()
if e.WAL.LastWriteTime().After(fsTime) {
return e.WAL.LastWriteTime()
}
return fsTime
}
@ -678,7 +691,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
func (e *Engine) DiskSize() int64 {
var walDiskSizeBytes int64
walDiskSizeBytes := e.WAL.DiskSizeBytes()
return e.FileStore.DiskSizeBytes() + walDiskSizeBytes
}
@ -703,10 +716,18 @@ func (e *Engine) Open() error {
e.index.SetFieldSet(fields)
if err := e.WAL.Open(); err != nil {
return err
}
if err := e.FileStore.Open(); err != nil {
return err
}
if err := e.reloadCache(); err != nil {
return err
}
e.Compactor.Open()
if e.enableCompactionsOnOpen {
@ -728,7 +749,7 @@ func (e *Engine) Close() error {
if err := e.FileStore.Close(); err != nil {
return err
}
return nil
return e.WAL.Close()
}
// WithLogger sets the logger for the engine.
@ -738,6 +759,11 @@ func (e *Engine) WithLogger(log *zap.Logger) {
if e.traceLogging {
e.traceLogger = e.logger
}
if wal, ok := e.WAL.(*WAL); ok {
wal.WithLogger(e.logger)
}
e.FileStore.WithLogger(e.logger)
}
@ -1314,6 +1340,11 @@ func (e *Engine) WritePoints(points []models.Point) error {
return err
}
// Then make the write durable in the cache.
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
return seriesErr
}
@ -1522,6 +1553,9 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
e.Cache.DeleteRange(deleteKeys, min, max)
// delete from the WAL
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
// The series are deleted on disk, but the index may still say they exist.
// Depending on the the min,max time passed in, the series may or not actually
@ -1737,12 +1771,18 @@ func (e *Engine) WriteSnapshot() error {
closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {
e.mu.Lock()
defer e.mu.Unlock()
snapshot, err = e.Cache.Snapshot()
if err != nil {
return
if err = e.WAL.CloseSegment(); err != nil {
return nil, nil, err
}
return
segments, err = e.WAL.ClosedSegments()
if err != nil {
return nil, nil, err
}
snapshot, err = e.Cache.Snapshot()
return segments, snapshot, err
}()
if err != nil {
@ -1810,6 +1850,11 @@ func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, s
// clear the snapshot from the in-memory cache, then the old WAL files
e.Cache.ClearSnapshot(true)
if err := e.WAL.Remove(closedFiles); err != nil {
log.Info("Error removing closed WAL segments", zap.Error(err))
}
return nil
}
@ -2136,6 +2181,32 @@ func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *c
return s
}
// reloadCache reads the WAL segment files and loads them into the cache.
func (e *Engine) reloadCache() error {
now := time.Now()
files, err := segmentFileNames(e.WAL.Path())
if err != nil {
return err
}
limit := e.Cache.MaxSize()
defer func() {
e.Cache.SetMaxSize(limit)
}()
// Disable the max size during loading
e.Cache.SetMaxSize(0)
loader := NewCacheLoader(files)
loader.WithLogger(e.logger)
if err := loader.Load(e.Cache); err != nil {
return err
}
e.traceLogger.Info("Reloaded WAL cache",zap.String("path", e.WAL.Path()), zap.Duration("duration", time.Since(now)))
return nil
}
// cleanup removes all temp files and dirs that exist on disk. This is should only be run at startup to avoid
// removing tmp files that are still in use.
func (e *Engine) cleanup() error {

View File

@ -24,6 +24,23 @@ import (
"go.uber.org/zap"
)
// Log describes an interface for a durable disk-based log.
type Log interface {
Open() error
Close() error
Path() string
LastWriteTime() time.Time
DiskSizeBytes() int64
WriteMulti(values map[string][]Value) (int, error)
DeleteRange(keys [][]byte, min, max int64) (int, error)
CloseSegment() error
ClosedSegments() ([]string, error)
Remove(files []string) error
}
const (
// DefaultSegmentSize of 10MB is the size at which segment files will be rolled over.
DefaultSegmentSize = 10 * 1024 * 1024
@ -1236,3 +1253,20 @@ func idFromFileName(name string) (int, error) {
return int(id), err
}
// NopWAL implements the Log interface and provides a no-op WAL implementation.
type NopWAL struct{}
func (w NopWAL) Open() error { return nil }
func (w NopWAL) Close() error { return nil }
func (w NopWAL) Path() string { return "" }
func (w NopWAL) LastWriteTime() time.Time { return time.Time{} }
func (w NopWAL) DiskSizeBytes() int64 { return 0 }
func (w NopWAL) WriteMulti(values map[string][]Value) (int, error) { return 0, nil }
func (w NopWAL) DeleteRange(keys [][]byte, min, max int64) (int, error) { return 0, nil }
func (w NopWAL) CloseSegment() error { return nil }
func (w NopWAL) ClosedSegments() ([]string, error) { return nil, nil }
func (w NopWAL) Remove(files []string) error { return nil }