Fix compaction and multi-write bugs.

* Fix bug with locking when the interval completely covers or is totally inside another one.
* Fix bug with full compactions running when the index is actively being written to.
pull/4308/head
Paul Dix 2015-09-30 16:38:55 -04:00
parent 2366baaf0b
commit a2b139e006
5 changed files with 280 additions and 41 deletions

View File

@ -71,7 +71,7 @@ var _ tsdb.Engine = &Engine{}
// Engine represents a storage engine with compressed blocks.
type Engine struct {
writeLock *writeLock
writeLock *WriteLock
metaLock sync.Mutex
path string
logger *log.Logger
@ -120,7 +120,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
e := &Engine{
path: path,
writeLock: &writeLock{},
writeLock: &WriteLock{},
logger: log.New(os.Stderr, "[pd1] ", log.LstdFlags),
// TODO: this is the function where we can inject a check against the in memory collisions
@ -149,10 +149,28 @@ func (e *Engine) PerformMaintenance() {
e.WAL.flush(f)
}()
return
} else if e.shouldCompact() {
e.logger.Println("compacting for maintenance")
go e.Compact(true)
}
// don't do a full compaction if the WAL received writes in the time window
if time.Since(e.WAL.LastWriteTime()) < e.IndexCompactionFullAge {
return
}
e.filesLock.RLock()
running := e.compactionRunning
e.filesLock.RUnlock()
if running {
return
}
// do a full compaction if all the index files are older than the compaction time
for _, f := range e.copyFilesCollection() {
if time.Since(f.modTime) < e.IndexCompactionFullAge {
return
}
}
go e.Compact(true)
}
// Format returns the format type of this engine
@ -204,6 +222,8 @@ func (e *Engine) Open() error {
return err
}
e.lastCompactionTime = time.Now()
return nil
}
@ -454,15 +474,12 @@ func (e *Engine) Compact(fullCompaction bool) error {
break
}
var s string
if fullCompaction {
s = "FULL "
}
e.logger.Printf("Starting %scompaction in partition %s of %d files", s, e.path, len(files))
st := time.Now()
// mark the compaction as running
e.filesLock.Lock()
if e.compactionRunning {
e.filesLock.Unlock()
return nil
}
e.compactionRunning = true
e.filesLock.Unlock()
defer func() {
@ -474,11 +491,19 @@ func (e *Engine) Compact(fullCompaction bool) error {
e.filesLock.Unlock()
}()
var s string
if fullCompaction {
s = "FULL "
}
fileName := e.nextFileName()
e.logger.Printf("Starting %scompaction in partition %s of %d files to new file %s", s, e.path, len(files), fileName)
st := time.Now()
positions := make([]uint32, len(files))
ids := make([]uint64, len(files))
// initilaize for writing
f, err := os.OpenFile(e.nextFileName(), os.O_CREATE|os.O_RDWR, 0666)
f, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
@ -931,6 +956,12 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
return err
}
if oldDF == nil {
e.logger.Printf("writing new index file %s", f.Name())
} else {
e.logger.Printf("rewriting index file %s with %s", oldDF.f.Name(), f.Name())
}
// write the magic number
if _, err := f.Write(u32tob(magicNumber)); err != nil {
f.Close()
@ -1509,7 +1540,7 @@ func (d *dataFile) StartingPositionForID(id uint64) uint32 {
func (d *dataFile) block(pos uint32) (id uint64, t int64, block []byte) {
defer func() {
if r := recover(); r != nil {
fmt.Println("FUCK: ", d.f.Name(), pos, id, t)
panic(fmt.Sprintf("panic decoding file: %s at position %d for id %d at time %d", d.f.Name(), pos, id, t))
}
}()
if pos < d.indexPosition() {

View File

@ -894,6 +894,73 @@ func TestEngine_RewritingOldBlocks(t *testing.T) {
}
}
func TestEngine_WriteIntoCompactedFile(t *testing.T) {
e := OpenDefaultEngine()
defer e.Cleanup()
fields := []string{"value"}
e.MaxPointsPerBlock = 3
e.RotateFileSize = 10
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=A value=1.2 2000000000")
p3 := parsePoint("cpu,host=A value=1.3 3000000000")
p4 := parsePoint("cpu,host=A value=1.5 4000000000")
p5 := parsePoint("cpu,host=A value=1.6 2500000000")
if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.Compact(true); err != nil {
t.Fatalf("error compacting: %s", err.Error)
}
if err := e.WritePoints([]models.Point{p4}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.Compact(true); err != nil {
t.Fatalf("error compacting: %s", err.Error)
}
if err := e.WritePoints([]models.Point{p5}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if count := e.DataFileCount(); count != 1 {
t.Fatalf("execpted 1 data file but got %d", count)
}
tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, _ := c.SeekTo(0)
if k != 1000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 2000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 2500000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 3000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 4000000000 {
t.Fatalf("wrong time: %d", k)
}
}
// Engine represents a test wrapper for pd1.Engine.
type Engine struct {
*pd1.Engine

View File

@ -542,7 +542,7 @@ func (l *Log) flush(flush flushType) error {
} else if flush == startupFlush {
ftype = "startup"
}
l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(l.flushCache), valueCount, flushSize)
l.logger.Printf("%s flush of %s with %d keys and %d total values of %d bytes\n", ftype, l.path, len(l.flushCache), valueCount, flushSize)
}
startTime := time.Now()
@ -550,7 +550,7 @@ func (l *Log) flush(flush flushType) error {
return err
}
if l.LoggingEnabled {
l.logger.Printf("flush to index took %s\n", time.Since(startTime))
l.logger.Printf("%s flush to index took %s\n", l.path, time.Since(startTime))
}
l.cacheLock.Lock()

View File

@ -1,15 +1,14 @@
package pd1
import (
"reflect"
"sync"
)
// writeLock is a lock that enables locking of ranges between a
// min and max value. We use this so that flushes from the WAL
// can occur concurrently along with compactions.
type writeLock struct {
mu sync.Mutex
type WriteLock struct {
rangesLock sync.Mutex
ranges []*rangeLock
}
@ -19,34 +18,41 @@ type writeLock struct {
// an overlapping range will have to wait until the previous
// lock is released. A corresponding call to UnlockRange should
// be deferred.
func (w *writeLock) LockRange(min, max int64) {
w.mu.Lock()
defer w.mu.Unlock()
func (w *WriteLock) LockRange(min, max int64) {
r := &rangeLock{min: min, max: max}
ranges := w.currentlyLockedRanges()
for {
ranges := w.currentlyLockedRanges()
// ensure there are no currently locked ranges that overlap
for _, rr := range ranges {
if rr.overlaps(r) {
// wait until it gets unlocked
rr.mu.Lock()
// release the lock so the object can get GC'd
rr.mu.Unlock()
// ensure there are no currently locked ranges that overlap
for _, rr := range ranges {
if rr.overlaps(r) {
// wait until it gets unlocked
rr.mu.Lock()
// release the lock so the object can get GC'd
rr.mu.Unlock()
}
}
// ensure that no one else got a lock on the range while we
// were waiting
w.rangesLock.Lock()
if len(w.ranges) == 0 || reflect.DeepEqual(ranges, w.ranges) {
// and lock the range
r.mu.Lock()
// now that we know the range is free, add it to the locks
w.ranges = append(w.ranges, r)
w.rangesLock.Unlock()
return
}
// try again
w.rangesLock.Unlock()
}
// and lock the range
r.mu.Lock()
// now that we know the range is free, add it to the locks
w.rangesLock.Lock()
w.ranges = append(w.ranges, r)
w.rangesLock.Unlock()
}
// UnlockRange will release a previously locked range.
func (w *writeLock) UnlockRange(min, max int64) {
func (w *WriteLock) UnlockRange(min, max int64) {
w.rangesLock.Lock()
defer w.rangesLock.Unlock()
@ -62,7 +68,7 @@ func (w *writeLock) UnlockRange(min, max int64) {
w.ranges = a
}
func (w *writeLock) currentlyLockedRanges() []*rangeLock {
func (w *WriteLock) currentlyLockedRanges() []*rangeLock {
w.rangesLock.Lock()
defer w.rangesLock.Unlock()
a := make([]*rangeLock, len(w.ranges))
@ -81,6 +87,10 @@ func (r *rangeLock) overlaps(l *rangeLock) bool {
return true
} else if l.max >= r.min && l.max <= r.max {
return true
} else if l.min <= r.min && l.max >= r.max {
return true
} else if l.min >= r.min && l.max <= r.max {
return true
}
return false
}

View File

@ -0,0 +1,131 @@
package pd1_test
import (
// "sync"
"testing"
"time"
"github.com/influxdb/influxdb/tsdb/engine/pd1"
)
func TestWriteLock_FullCover(t *testing.T) {
w := &pd1.WriteLock{}
w.LockRange(2, 10)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(1, 11)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
func TestWriteLock_RightIntersect(t *testing.T) {
w := &pd1.WriteLock{}
w.LockRange(2, 10)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(5, 15)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
func TestWriteLock_LeftIntersect(t *testing.T) {
w := &pd1.WriteLock{}
w.LockRange(1, 4)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(1, 11)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
func TestWriteLock_Inside(t *testing.T) {
w := &pd1.WriteLock{}
w.LockRange(4, 8)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(1, 11)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
func TestWriteLock_Same(t *testing.T) {
w := &pd1.WriteLock{}
w.LockRange(2, 10)
lock := make(chan bool)
timeout := time.NewTimer(10 * time.Millisecond)
go func() {
w.LockRange(2, 10)
lock <- true
}()
select {
case <-lock:
t.Fatal("able to get lock when we shouldn't")
case <-timeout.C:
// we're all good
}
}
// func TestWriteLock_FreeRangeWithContentionElsewhere(t *testing.T) {
// w := &pd1.WriteLock{}
// w.LockRange(2, 10)
// lock := make(chan bool)
// freeRange := make(chan bool)
// timeout := time.NewTimer(10 * time.Millisecond)
// var wg sync.WaitGroup
// wg.Add(1)
// go func() {
// wg.Done()
// w.LockRange(4, 12)
// lock <- true
// }()
// // make sure the other go func has gotten to the point of requesting the lock
// wg.Wait()
// go func() {
// w.LockRange(15, 23)
// freeRange <- true
// }()
// select {
// case <-lock:
// t.Fatal("able to get lock when we shouldn't")
// case <-timeout.C:
// t.Fatal("unable to get lock of free range when contention exists elsewhere")
// case <-freeRange:
// // we're all good
// }
// }