Reduce wal sync goroutine churn

Under high write load, the sync goroutine would startup, and end
very frequently.  Starting a new goroutine so frequently adds a small
amount of latency which causes writes to take long and sometimes timeout.

This changes the goroutine to loop until there are no more waiters which
reduce the churn and latency.
pull/8302/head
Jason Wilder 2017-04-13 23:28:35 -06:00
parent aa9925621b
commit d7c5dd0a3e
1 changed files with 53 additions and 27 deletions

View File

@ -233,10 +233,10 @@ func (l *WAL) Open() error {
return nil return nil
} }
// sync will schedule an fsync to the current wal segment and notify any // scheduleSync will schedule an fsync to the current wal segment and notify any
// waiting gorutines. If an fsync is already scheduled, subsequent calls will // waiting gorutines. If an fsync is already scheduled, subsequent calls will
// not schedule a new fsync and will be handle by the existing scheduled fsync. // not schedule a new fsync and will be handle by the existing scheduled fsync.
func (l *WAL) sync() { func (l *WAL) scheduleSync() {
// If we're not the first to sync, then another goroutine is fsyncing the wal for us. // If we're not the first to sync, then another goroutine is fsyncing the wal for us.
if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) { if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) {
return return
@ -244,26 +244,50 @@ func (l *WAL) sync() {
// Fsync the wal and notify all pending waiters // Fsync the wal and notify all pending waiters
go func() { go func() {
t := time.NewTimer(l.syncDelay) defer atomic.StoreUint64(&l.syncCount, 0)
select { var timerCh <-chan time.Time
case <-t.C:
if len(l.syncWaiters) > 0 {
l.mu.Lock()
err := l.currentSegmentWriter.sync()
for len(l.syncWaiters) > 0 {
errC := <-l.syncWaiters
errC <- err
}
l.mu.Unlock()
}
case <-l.closing:
t.Stop()
}
atomic.StoreUint64(&l.syncCount, 0) // time.NewTicker requires a > 0 delay, since 0 indicates no delay, use a closed
// channel which will always be ready to read from.
if l.syncDelay == 0 {
// Create a RW chan and close it
timerChrw := make(chan time.Time)
close(timerChrw)
// Convert it to a read-only
timerCh = timerChrw
} else {
t := time.NewTicker(l.syncDelay)
defer t.Stop()
timerCh = t.C
}
for {
select {
case <-timerCh:
l.mu.Lock()
if len(l.syncWaiters) == 0 {
l.mu.Unlock()
return
}
l.sync()
l.mu.Unlock()
case <-l.closing:
return
}
}
}() }()
} }
// sync fsyncs the current wal segments and notifies any waiters. Callers must ensure
// a write lock on the WAL is obtained before calling sync.
func (l *WAL) sync() {
err := l.currentSegmentWriter.sync()
for len(l.syncWaiters) > 0 {
errC := <-l.syncWaiters
errC <- err
}
}
// WritePoints writes the given points to the WAL. It returns the WAL segment ID to // WritePoints writes the given points to the WAL. It returns the WAL segment ID to
// which the points were written. If an error is returned the segment ID should // which the points were written. If an error is returned the segment ID should
// be ignored. // be ignored.
@ -354,8 +378,8 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// limit how many concurrent encodings can be in flight. Since we can only // limit how many concurrent encodings can be in flight. Since we can only
// write one at a time to disk, a slow disk can cause the allocations below // write one at a time to disk, a slow disk can cause the allocations below
// to increase quickly. If we're backed up, wait until others have completed. // to increase quickly. If we're backed up, wait until others have completed.
l.limiter.Take() //l.limiter.Take()
defer l.limiter.Release() //defer l.limiter.Release()
// encode and compress the entry while we're not locked // encode and compress the entry while we're not locked
bytes := getBuf(walEncodeBufSize) bytes := getBuf(walEncodeBufSize)
@ -369,7 +393,6 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
encBuf := getBuf(snappy.MaxEncodedLen(len(b))) encBuf := getBuf(snappy.MaxEncodedLen(len(b)))
defer putBuf(encBuf) defer putBuf(encBuf)
compressed := snappy.Encode(encBuf, b) compressed := snappy.Encode(encBuf, b)
syncErr := make(chan error) syncErr := make(chan error)
segID, err := func() (int, error) { segID, err := func() (int, error) {
@ -393,16 +416,17 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
return -1, fmt.Errorf("error writing WAL entry: %v", err) return -1, fmt.Errorf("error writing WAL entry: %v", err)
} }
// Update stats for current segment size
atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))
l.lastWriteTime = time.Now()
select { select {
case l.syncWaiters <- syncErr: case l.syncWaiters <- syncErr:
default: default:
return -1, fmt.Errorf("error syncing wal") return -1, fmt.Errorf("error syncing wal")
} }
l.scheduleSync()
// Update stats for current segment size
atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))
l.lastWriteTime = time.Now()
return l.currentSegmentID, nil return l.currentSegmentID, nil
@ -412,7 +436,6 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
} }
// schedule an fsync and wait for it to complete // schedule an fsync and wait for it to complete
l.sync()
return segID, <-syncErr return segID, <-syncErr
} }
@ -492,6 +515,7 @@ func (l *WAL) Close() error {
close(l.closing) close(l.closing)
if l.currentSegmentWriter != nil { if l.currentSegmentWriter != nil {
l.sync()
l.currentSegmentWriter.close() l.currentSegmentWriter.close()
l.currentSegmentWriter = nil l.currentSegmentWriter = nil
} }
@ -514,6 +538,8 @@ func segmentFileNames(dir string) ([]string, error) {
func (l *WAL) newSegmentFile() error { func (l *WAL) newSegmentFile() error {
l.currentSegmentID++ l.currentSegmentID++
if l.currentSegmentWriter != nil { if l.currentSegmentWriter != nil {
l.sync()
if err := l.currentSegmentWriter.close(); err != nil { if err := l.currentSegmentWriter.close(); err != nil {
return err return err
} }