From d7c5dd0a3e2bdcaa669d2d7ece8bd76ef07b075a Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 13 Apr 2017 23:28:35 -0600 Subject: [PATCH] Reduce wal sync goroutine churn Under high write load, the sync goroutine would startup, and end very frequently. Starting a new goroutine so frequently adds a small amount of latency which causes writes to take long and sometimes timeout. This changes the goroutine to loop until there are no more waiters which reduce the churn and latency. --- tsdb/engine/tsm1/wal.go | 80 +++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 34e3b0f91e..e81bea41d6 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -233,10 +233,10 @@ func (l *WAL) Open() error { return nil } -// sync will schedule an fsync to the current wal segment and notify any +// scheduleSync will schedule an fsync to the current wal segment and notify any // waiting gorutines. If an fsync is already scheduled, subsequent calls will // not schedule a new fsync and will be handle by the existing scheduled fsync. -func (l *WAL) sync() { +func (l *WAL) scheduleSync() { // If we're not the first to sync, then another goroutine is fsyncing the wal for us. if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) { return @@ -244,26 +244,50 @@ func (l *WAL) sync() { // Fsync the wal and notify all pending waiters go func() { - t := time.NewTimer(l.syncDelay) - select { - case <-t.C: - if len(l.syncWaiters) > 0 { - l.mu.Lock() - err := l.currentSegmentWriter.sync() - for len(l.syncWaiters) > 0 { - errC := <-l.syncWaiters - errC <- err - } - l.mu.Unlock() - } - case <-l.closing: - t.Stop() - } + defer atomic.StoreUint64(&l.syncCount, 0) + var timerCh <-chan time.Time - atomic.StoreUint64(&l.syncCount, 0) + // time.NewTicker requires a > 0 delay, since 0 indicates no delay, use a closed + // channel which will always be ready to read from. + if l.syncDelay == 0 { + // Create a RW chan and close it + timerChrw := make(chan time.Time) + close(timerChrw) + // Convert it to a read-only + timerCh = timerChrw + } else { + t := time.NewTicker(l.syncDelay) + defer t.Stop() + timerCh = t.C + } + for { + select { + case <-timerCh: + l.mu.Lock() + if len(l.syncWaiters) == 0 { + l.mu.Unlock() + return + } + + l.sync() + l.mu.Unlock() + case <-l.closing: + return + } + } }() } +// sync fsyncs the current wal segments and notifies any waiters. Callers must ensure +// a write lock on the WAL is obtained before calling sync. +func (l *WAL) sync() { + err := l.currentSegmentWriter.sync() + for len(l.syncWaiters) > 0 { + errC := <-l.syncWaiters + errC <- err + } +} + // WritePoints writes the given points to the WAL. It returns the WAL segment ID to // which the points were written. If an error is returned the segment ID should // be ignored. @@ -354,8 +378,8 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) { // limit how many concurrent encodings can be in flight. Since we can only // write one at a time to disk, a slow disk can cause the allocations below // to increase quickly. If we're backed up, wait until others have completed. - l.limiter.Take() - defer l.limiter.Release() + //l.limiter.Take() + //defer l.limiter.Release() // encode and compress the entry while we're not locked bytes := getBuf(walEncodeBufSize) @@ -369,7 +393,6 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) { encBuf := getBuf(snappy.MaxEncodedLen(len(b))) defer putBuf(encBuf) compressed := snappy.Encode(encBuf, b) - syncErr := make(chan error) segID, err := func() (int, error) { @@ -393,16 +416,17 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) { return -1, fmt.Errorf("error writing WAL entry: %v", err) } - // Update stats for current segment size - atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size)) - - l.lastWriteTime = time.Now() - select { case l.syncWaiters <- syncErr: default: return -1, fmt.Errorf("error syncing wal") } + l.scheduleSync() + + // Update stats for current segment size + atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size)) + + l.lastWriteTime = time.Now() return l.currentSegmentID, nil @@ -412,7 +436,6 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) { } // schedule an fsync and wait for it to complete - l.sync() return segID, <-syncErr } @@ -492,6 +515,7 @@ func (l *WAL) Close() error { close(l.closing) if l.currentSegmentWriter != nil { + l.sync() l.currentSegmentWriter.close() l.currentSegmentWriter = nil } @@ -514,6 +538,8 @@ func segmentFileNames(dir string) ([]string, error) { func (l *WAL) newSegmentFile() error { l.currentSegmentID++ if l.currentSegmentWriter != nil { + l.sync() + if err := l.currentSegmentWriter.close(); err != nil { return err }