Add memory settings and WAL backpressure

pull/4317/merge
Paul Dix 2015-09-17 15:46:53 -04:00
parent 95f9e1619e
commit 750856836e
3 changed files with 76 additions and 41 deletions

View File

@ -43,6 +43,10 @@ const (
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 50 * 1024 * 1024 // 50MB
// Default WAL settings for the PD1 WAL
DefaultFlushMemorySizeThreshold = 50 * 1024 * 1024 // 50MB
DefaultMaxMemorySizeThreshold = 200 * 1024 * 1024 // 200MB
)
type Config struct {
@ -63,6 +67,10 @@ type Config struct {
WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"`
WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"`
// WAL configuration options for pd1 introduced in 0.9.5
WALFlushMemorySizeThreshold int `toml:"wal-flush-memory-size-threshold"`
WALMaxMemorySizeThreshold int `toml:"wal-max-memory-size-threshold"`
// Query logging
QueryLogEnabled bool `toml:"query-log-enabled"`
}
@ -80,6 +88,8 @@ func NewConfig() Config {
WALMaxSeriesSize: DefaultMaxSeriesSize,
WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval),
WALPartitionSizeThreshold: DefaultPartitionSizeThreshold,
WALFlushMemorySizeThreshold: DefaultFlushMemorySizeThreshold,
WALMaxMemorySizeThreshold: DefaultMaxMemorySizeThreshold,
QueryLogEnabled: true,
}

View File

@ -89,7 +89,8 @@ type Engine struct {
func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewLog(path)
w.FlushColdInterval = time.Duration(opt.Config.WALFlushColdInterval)
w.MemorySizeThreshold = int(opt.Config.WALPartitionSizeThreshold)
w.FlushMemorySizeThreshold = opt.Config.WALFlushMemorySizeThreshold
w.MaxMemorySizeThreshold = opt.Config.WALMaxMemorySizeThreshold
w.LoggingEnabled = opt.Config.WALLoggingEnabled
e := &Engine{

View File

@ -72,11 +72,11 @@ type Log struct {
currentSegmentID int
currentSegmentFile *os.File
currentSegmentSize int
// cache and flush variables
cacheLock sync.RWMutex
lastWriteTime time.Time
flushRunning bool
// cache variables
cacheLock sync.RWMutex
cache map[string]Values
cacheDirtySort map[string]bool // this map should be small, only for dirty vals
flushCache map[string]Values // temporary map while flushing
@ -99,8 +99,11 @@ type Log struct {
// SegmentSize is the file size at which a segment file will be rotated
SegmentSize int
// MemorySizeThreshold specifies when the log should be forced to be flushed.
MemorySizeThreshold int
// FlushMemorySizeThreshold specifies when the log should be forced to be flushed
FlushMemorySizeThreshold int
// MaxMemorySizeThreshold specifies the limit at which writes to the WAL should be rejected
MaxMemorySizeThreshold int
// Index is the database series will be flushed to
Index IndexWriter
@ -130,7 +133,8 @@ func NewLog(path string) *Log {
LogOutput: os.Stderr,
FlushColdInterval: tsdb.DefaultFlushColdInterval,
SegmentSize: DefaultSegmentSize,
MemorySizeThreshold: tsdb.DefaultPartitionSizeThreshold,
FlushMemorySizeThreshold: tsdb.DefaultFlushMemorySizeThreshold,
MaxMemorySizeThreshold: tsdb.DefaultMaxMemorySizeThreshold,
flushCheckInterval: defaultFlushCheckInterval,
logger: log.New(os.Stderr, "[pd1wal] ", log.LstdFlags),
}
@ -140,7 +144,7 @@ func NewLog(path string) *Log {
func (l *Log) Open() error {
if l.LoggingEnabled {
l.logger.Printf("PD1 WAL starting with %d memory size threshold\n", l.MemorySizeThreshold)
l.logger.Printf("PD1 WAL starting with %d flush memory size threshold and %d max memory size threshold\n", l.FlushMemorySizeThreshold, l.MaxMemorySizeThreshold)
l.logger.Printf("PD1 WAL writing to %s\n", l.path)
}
if err := os.MkdirAll(l.path, 0777); err != nil {
@ -199,6 +203,11 @@ func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
}
func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
// add everything to the cache, or return an error if we've hit our max memory
if addedToCache := l.addToCache(points, fields, series, true); !addedToCache {
return fmt.Errorf("WAL backed up flushing to index, hit max memory")
}
// make the write durable if specified
if !l.SkipDurability {
// write the points
@ -238,9 +247,6 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme
}
}
// add everything to the cache
l.addToCache(points, fields, series)
// usually skipping the cache is only for testing purposes and this was the easiest
// way to represent the logic (to cache and then immediately flush)
if l.SkipCache {
@ -250,10 +256,23 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme
return nil
}
func (l *Log) addToCache(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) {
// addToCache will add the points, measurements, and fields to the cache and return true if successful. They will be queryable
// immediately after return and will be flushed at the next flush cycle. Before adding to the cache we check if we're over the
// max memory threshold. If we are we request a flush in a new goroutine and return false, indicating we didn't add the values
// to the cache and that writes should return a failure.
func (l *Log) addToCache(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate, checkMemory bool) bool {
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
// if we should check memory and we're over the threshold, mark a flush as running and kick one off in a goroutine
if checkMemory && l.memorySize > l.MaxMemorySizeThreshold {
if !l.flushRunning {
l.flushRunning = true
go l.flush(memoryFlush)
}
return false
}
for _, p := range points {
for name, value := range p.Fields() {
k := seriesFieldKey(string(p.Key()), name)
@ -277,6 +296,8 @@ func (l *Log) addToCache(points []tsdb.Point, fields map[string]*tsdb.Measuremen
}
l.seriesToCreateCache = append(l.seriesToCreateCache, series...)
l.lastWriteTime = time.Now()
return true
}
// readAndFlushWAL is called on open and will read the segment files in, flushing whenever
@ -294,7 +315,7 @@ func (l *Log) readAndFlushWAL() error {
return err
}
if l.memorySize > l.MemorySizeThreshold {
if l.memorySize > l.MaxMemorySizeThreshold {
if err := l.flush(memoryFlush); err != nil {
return err
}
@ -354,19 +375,19 @@ func (l *Log) readFileToCache(fileName string) error {
if err != nil {
return err
}
l.addToCache(points, nil, nil)
l.addToCache(points, nil, nil, false)
case fieldsEntry:
fields := make(map[string]*tsdb.MeasurementFields)
if err := json.Unmarshal(data, &fields); err != nil {
return err
}
l.addToCache(nil, fields, nil)
l.addToCache(nil, fields, nil, false)
case seriesEntry:
series := make([]*tsdb.SeriesCreate, 0)
if err := json.Unmarshal(data, &series); err != nil {
return err
}
l.addToCache(nil, nil, series)
l.addToCache(nil, nil, series, false)
}
}
}
@ -460,20 +481,24 @@ func (l *Log) close() error {
// flush writes all wal data in memory to the index
func (l *Log) flush(flush flushType) error {
// only flush if there isn't one already running
l.writeLock.Lock()
if l.flushRunning {
l.writeLock.Unlock()
// only flush if there isn't one already running. Memory flushes are only triggered
// by writes, which will mark the flush as running, so we can ignore it.
l.cacheLock.Lock()
if l.flushRunning && flush != memoryFlush {
l.cacheLock.Unlock()
return nil
}
// only hold the lock while we rotate the segment file
// mark the flush as running and ensure that it gets marked as not running when we return
l.flushRunning = true
defer func() {
l.writeLock.Lock()
l.cacheLock.Lock()
l.flushRunning = false
l.writeLock.Unlock()
l.cacheLock.Unlock()
}()
// only hold the lock while we rotate the segment file
l.writeLock.Lock()
lastFileID := l.currentSegmentID
if err := l.newSegmentFile(); err != nil {
// there's no recovering from this, fail hard
@ -482,9 +507,6 @@ func (l *Log) flush(flush flushType) error {
l.writeLock.Unlock()
// copy the cache items to new maps so we can empty them out
l.cacheLock.Lock()
// move over the flush cache and make a copy to write
l.flushCache = l.cache
l.cache = make(map[string]Values)
l.cacheDirtySort = make(map[string]bool)
@ -561,6 +583,7 @@ func (l *Log) flush(flush flushType) error {
// triggerAutoFlush will flush and compact any partitions that have hit the thresholds for compaction
func (l *Log) triggerAutoFlush() {
//
if f := l.shouldFlush(); f != noFlush {
if err := l.flush(f); err != nil {
l.logger.Printf("error flushing wal: %s\n", err)
@ -615,17 +638,18 @@ func (l *Log) newSegmentFile() error {
return nil
}
// shouldFlush
// shouldFlush will return the flushType specifying whether we should flush. memoryFlush
// is never returned from this function since those can only be triggered by writes
func (l *Log) shouldFlush() flushType {
l.cacheLock.RLock()
defer l.cacheLock.RUnlock()
if len(l.cache) == 0 {
if l.flushRunning {
return noFlush
}
if l.memorySize > l.MemorySizeThreshold {
return memoryFlush
if len(l.cache) == 0 {
return noFlush
}
if time.Since(l.lastWriteTime) > l.FlushColdInterval {