refactor: use concrete WAL in tsm1
At the cost of some nil checks, we don't have to have an interface, defend against subtle bugs with nils in non-nil interfaces, an empty implementation, etc. Also, the tsm1 engine is losing the WAL anyway.pull/11364/head
parent
c9bb55b889
commit
95de3d52b2
|
@ -120,17 +120,15 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
|
|||
tsi1.WithPath(c.GetIndexPath(path)))
|
||||
|
||||
// Initialize WAL
|
||||
var w tsm1.Log = new(tsm1.NopWAL)
|
||||
if c.WAL.Enabled {
|
||||
e.wal = wal.NewWAL(c.GetWALPath(path))
|
||||
e.wal.WithFsyncDelay(time.Duration(c.WAL.FsyncDelay))
|
||||
e.wal.EnableTraceLogging(c.TraceLoggingEnabled)
|
||||
w = e.wal
|
||||
}
|
||||
|
||||
// Initialise Engine
|
||||
e.engine = tsm1.NewEngine(c.GetEnginePath(path), e.index, c.Engine,
|
||||
tsm1.WithWAL(w),
|
||||
tsm1.WithWAL(e.wal),
|
||||
tsm1.WithTraceLogging(c.TraceLoggingEnabled))
|
||||
|
||||
// Apply options.
|
||||
|
|
|
@ -80,13 +80,7 @@ const (
|
|||
type EngineOption func(i *Engine)
|
||||
|
||||
// WithWAL sets the WAL for the Engine
|
||||
var WithWAL = func(w Log) EngineOption {
|
||||
// be defensive: it's very easy to pass in a nil WAL here
|
||||
// which will panic. Set any nil WALs to the NopWAL.
|
||||
if pwal, _ := w.(*wal.WAL); pwal == nil {
|
||||
w = NopWAL{}
|
||||
}
|
||||
|
||||
var WithWAL = func(w *wal.WAL) EngineOption {
|
||||
return func(e *Engine) {
|
||||
e.WAL = w
|
||||
}
|
||||
|
@ -134,7 +128,7 @@ type Engine struct {
|
|||
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
|
||||
traceLogging bool
|
||||
|
||||
WAL Log
|
||||
WAL *wal.WAL
|
||||
Cache *Cache
|
||||
Compactor *Compactor
|
||||
CompactionPlan CompactionPlanner
|
||||
|
@ -209,7 +203,6 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt
|
|||
logger: logger,
|
||||
traceLogger: logger,
|
||||
|
||||
WAL: NopWAL{},
|
||||
Cache: cache,
|
||||
|
||||
FileStore: fs,
|
||||
|
@ -483,7 +476,7 @@ func (e *Engine) SeriesN() int64 {
|
|||
func (e *Engine) LastModified() time.Time {
|
||||
fsTime := e.FileStore.LastModified()
|
||||
|
||||
if e.WAL.LastWriteTime().After(fsTime) {
|
||||
if e.WAL != nil && e.WAL.LastWriteTime().After(fsTime) {
|
||||
return e.WAL.LastWriteTime()
|
||||
}
|
||||
return fsTime
|
||||
|
@ -496,7 +489,11 @@ func (e *Engine) MeasurementStats() (MeasurementStats, error) {
|
|||
|
||||
// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
|
||||
func (e *Engine) DiskSize() int64 {
|
||||
walDiskSizeBytes := e.WAL.DiskSizeBytes()
|
||||
var walDiskSizeBytes int64
|
||||
if e.WAL != nil {
|
||||
walDiskSizeBytes = e.WAL.DiskSizeBytes()
|
||||
}
|
||||
|
||||
return e.FileStore.DiskSizeBytes() + walDiskSizeBytes
|
||||
}
|
||||
|
||||
|
@ -529,9 +526,11 @@ func (e *Engine) Open() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if e.WAL != nil {
|
||||
if err := e.WAL.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := e.FileStore.Open(); err != nil {
|
||||
return err
|
||||
|
@ -562,7 +561,14 @@ func (e *Engine) Close() error {
|
|||
if err := e.FileStore.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return e.WAL.Close()
|
||||
|
||||
if e.WAL != nil {
|
||||
if err := e.WAL.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithLogger sets the logger for the engine.
|
||||
|
@ -573,7 +579,9 @@ func (e *Engine) WithLogger(log *zap.Logger) {
|
|||
e.traceLogger = e.logger
|
||||
}
|
||||
|
||||
if e.WAL != nil {
|
||||
e.WAL.WithLogger(e.logger)
|
||||
}
|
||||
e.FileStore.WithLogger(e.logger)
|
||||
}
|
||||
|
||||
|
@ -652,9 +660,11 @@ func (e *Engine) WritePoints(points []models.Point) error {
|
|||
}
|
||||
|
||||
// Then make the write durable in the cache.
|
||||
if e.WAL != nil {
|
||||
if _, err := e.WAL.WriteMulti(values); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -866,9 +876,11 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
|||
e.Cache.DeleteRange(deleteKeys, min, max)
|
||||
|
||||
// delete from the WAL
|
||||
if e.WAL != nil {
|
||||
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// The series are deleted on disk, but the index may still say they exist.
|
||||
// Depending on the the min,max time passed in, the series may or not actually
|
||||
|
@ -1193,14 +1205,15 @@ func (e *Engine) WriteSnapshot() error {
|
|||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if e.WAL != nil {
|
||||
if err = e.WAL.CloseSegment(); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
segments, err = e.WAL.ClosedSegments()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
snapshot, err = e.Cache.Snapshot()
|
||||
return segments, snapshot, err
|
||||
|
@ -1254,9 +1267,11 @@ func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, s
|
|||
// clear the snapshot from the in-memory cache, then the old WAL files
|
||||
e.Cache.ClearSnapshot(true)
|
||||
|
||||
if e.WAL != nil {
|
||||
if err := e.WAL.Remove(closedFiles); err != nil {
|
||||
log.Info("Error removing closed WAL segments", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1565,6 +1580,10 @@ func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *c
|
|||
|
||||
// reloadCache reads the WAL segment files and loads them into the cache.
|
||||
func (e *Engine) reloadCache() error {
|
||||
if e.WAL == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
files, err := wal.SegmentFileNames(e.WAL.Path())
|
||||
if err != nil {
|
||||
|
|
|
@ -101,9 +101,11 @@ func (e *Engine) DeleteBucket(name []byte, min, max int64) error {
|
|||
|
||||
// Delete from the cache and WAL.
|
||||
e.Cache.DeleteRange(deleteKeys, min, max)
|
||||
if e.WAL != nil {
|
||||
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Now that all of the data is purged, we need to find if some keys are fully deleted
|
||||
// and if so, remove them from the index.
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// TODO(jeff): this only exists temporarily while we move the WAL into storage
|
||||
|
||||
// Log describes an interface for a durable disk-based log.
|
||||
type Log interface {
|
||||
WithLogger(*zap.Logger)
|
||||
|
||||
Open() error
|
||||
Close() error
|
||||
Path() string
|
||||
|
||||
LastWriteTime() time.Time
|
||||
DiskSizeBytes() int64
|
||||
|
||||
WriteMulti(values map[string][]Value) (int, error)
|
||||
DeleteRange(keys [][]byte, min, max int64) (int, error)
|
||||
|
||||
CloseSegment() error
|
||||
ClosedSegments() ([]string, error)
|
||||
Remove(files []string) error
|
||||
}
|
||||
|
||||
// NopWAL implements the Log interface and provides a no-op WAL implementation.
|
||||
type NopWAL struct{}
|
||||
|
||||
func (w NopWAL) WithLogger(*zap.Logger) {}
|
||||
|
||||
func (w NopWAL) Open() error { return nil }
|
||||
func (w NopWAL) Close() error { return nil }
|
||||
func (w NopWAL) Path() string { return "" }
|
||||
|
||||
func (w NopWAL) LastWriteTime() time.Time { return time.Time{} }
|
||||
func (w NopWAL) DiskSizeBytes() int64 { return 0 }
|
||||
|
||||
func (w NopWAL) WriteMulti(values map[string][]Value) (int, error) { return 0, nil }
|
||||
func (w NopWAL) DeleteRange(keys [][]byte, min, max int64) (int, error) { return 0, nil }
|
||||
|
||||
func (w NopWAL) CloseSegment() error { return nil }
|
||||
func (w NopWAL) ClosedSegments() ([]string, error) { return nil, nil }
|
||||
func (w NopWAL) Remove(files []string) error { return nil }
|
Loading…
Reference in New Issue