From 01b5b9268e6661b6f51c9c3abf7d714cb07b12f8 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Wed, 30 Sep 2015 16:38:55 -0400 Subject: [PATCH] Fix compaction and multi-write bugs. * Fix bug with locking when the interval completely covers or is totally inside another one. * Fix bug with full compactions running when the index is actively being written to. --- tsdb/engine/pd1/pd1.go | 59 ++++++++++--- tsdb/engine/pd1/pd1_test.go | 67 +++++++++++++++ tsdb/engine/pd1/wal.go | 4 +- tsdb/engine/pd1/write_lock.go | 60 +++++++------ tsdb/engine/pd1/write_lock_test.go | 131 +++++++++++++++++++++++++++++ 5 files changed, 280 insertions(+), 41 deletions(-) create mode 100644 tsdb/engine/pd1/write_lock_test.go diff --git a/tsdb/engine/pd1/pd1.go b/tsdb/engine/pd1/pd1.go index 37e16c6d49..5f64c6c2f5 100644 --- a/tsdb/engine/pd1/pd1.go +++ b/tsdb/engine/pd1/pd1.go @@ -71,7 +71,7 @@ var _ tsdb.Engine = &Engine{} // Engine represents a storage engine with compressed blocks. type Engine struct { - writeLock *writeLock + writeLock *WriteLock metaLock sync.Mutex path string logger *log.Logger @@ -120,7 +120,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine e := &Engine{ path: path, - writeLock: &writeLock{}, + writeLock: &WriteLock{}, logger: log.New(os.Stderr, "[pd1] ", log.LstdFlags), // TODO: this is the function where we can inject a check against the in memory collisions @@ -149,10 +149,28 @@ func (e *Engine) PerformMaintenance() { e.WAL.flush(f) }() return - } else if e.shouldCompact() { - e.logger.Println("compacting for maintenance") - go e.Compact(true) } + + // don't do a full compaction if the WAL received writes in the time window + if time.Since(e.WAL.LastWriteTime()) < e.IndexCompactionFullAge { + return + } + + e.filesLock.RLock() + running := e.compactionRunning + e.filesLock.RUnlock() + if running { + return + } + + // do a full compaction if all the index files are older than the compaction time + for _, f := range e.copyFilesCollection() { + if time.Since(f.modTime) < e.IndexCompactionFullAge { + return + } + } + + go e.Compact(true) } // Format returns the format type of this engine @@ -204,6 +222,8 @@ func (e *Engine) Open() error { return err } + e.lastCompactionTime = time.Now() + return nil } @@ -454,15 +474,12 @@ func (e *Engine) Compact(fullCompaction bool) error { break } - var s string - if fullCompaction { - s = "FULL " - } - e.logger.Printf("Starting %scompaction in partition %s of %d files", s, e.path, len(files)) - st := time.Now() - // mark the compaction as running e.filesLock.Lock() + if e.compactionRunning { + e.filesLock.Unlock() + return nil + } e.compactionRunning = true e.filesLock.Unlock() defer func() { @@ -474,11 +491,19 @@ func (e *Engine) Compact(fullCompaction bool) error { e.filesLock.Unlock() }() + var s string + if fullCompaction { + s = "FULL " + } + fileName := e.nextFileName() + e.logger.Printf("Starting %scompaction in partition %s of %d files to new file %s", s, e.path, len(files), fileName) + st := time.Now() + positions := make([]uint32, len(files)) ids := make([]uint64, len(files)) // initilaize for writing - f, err := os.OpenFile(e.nextFileName(), os.O_CREATE|os.O_RDWR, 0666) + f, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) if err != nil { return err } @@ -931,6 +956,12 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro return err } + if oldDF == nil { + e.logger.Printf("writing new index file %s", f.Name()) + } else { + e.logger.Printf("rewriting index file %s with %s", oldDF.f.Name(), f.Name()) + } + // write the magic number if _, err := f.Write(u32tob(magicNumber)); err != nil { f.Close() @@ -1509,7 +1540,7 @@ func (d *dataFile) StartingPositionForID(id uint64) uint32 { func (d *dataFile) block(pos uint32) (id uint64, t int64, block []byte) { defer func() { if r := recover(); r != nil { - fmt.Println("FUCK: ", d.f.Name(), pos, id, t) + panic(fmt.Sprintf("panic decoding file: %s at position %d for id %d at time %d", d.f.Name(), pos, id, t)) } }() if pos < d.indexPosition() { diff --git a/tsdb/engine/pd1/pd1_test.go b/tsdb/engine/pd1/pd1_test.go index 9eda01fdc4..494c63524f 100644 --- a/tsdb/engine/pd1/pd1_test.go +++ b/tsdb/engine/pd1/pd1_test.go @@ -894,6 +894,73 @@ func TestEngine_RewritingOldBlocks(t *testing.T) { } } +func TestEngine_WriteIntoCompactedFile(t *testing.T) { + e := OpenDefaultEngine() + defer e.Cleanup() + + fields := []string{"value"} + + e.MaxPointsPerBlock = 3 + e.RotateFileSize = 10 + + p1 := parsePoint("cpu,host=A value=1.1 1000000000") + p2 := parsePoint("cpu,host=A value=1.2 2000000000") + p3 := parsePoint("cpu,host=A value=1.3 3000000000") + p4 := parsePoint("cpu,host=A value=1.5 4000000000") + p5 := parsePoint("cpu,host=A value=1.6 2500000000") + + if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if err := e.Compact(true); err != nil { + t.Fatalf("error compacting: %s", err.Error) + } + + if err := e.WritePoints([]models.Point{p4}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if err := e.Compact(true); err != nil { + t.Fatalf("error compacting: %s", err.Error) + } + + if err := e.WritePoints([]models.Point{p5}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if count := e.DataFileCount(); count != 1 { + t.Fatalf("execpted 1 data file but got %d", count) + } + + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=A", fields, nil, true) + k, _ := c.SeekTo(0) + if k != 1000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 2000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 2500000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 3000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 4000000000 { + t.Fatalf("wrong time: %d", k) + } +} + // Engine represents a test wrapper for pd1.Engine. type Engine struct { *pd1.Engine diff --git a/tsdb/engine/pd1/wal.go b/tsdb/engine/pd1/wal.go index 6e51980f2b..ef4399912c 100644 --- a/tsdb/engine/pd1/wal.go +++ b/tsdb/engine/pd1/wal.go @@ -542,7 +542,7 @@ func (l *Log) flush(flush flushType) error { } else if flush == startupFlush { ftype = "startup" } - l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(l.flushCache), valueCount, flushSize) + l.logger.Printf("%s flush of %s with %d keys and %d total values of %d bytes\n", ftype, l.path, len(l.flushCache), valueCount, flushSize) } startTime := time.Now() @@ -550,7 +550,7 @@ func (l *Log) flush(flush flushType) error { return err } if l.LoggingEnabled { - l.logger.Printf("flush to index took %s\n", time.Since(startTime)) + l.logger.Printf("%s flush to index took %s\n", l.path, time.Since(startTime)) } l.cacheLock.Lock() diff --git a/tsdb/engine/pd1/write_lock.go b/tsdb/engine/pd1/write_lock.go index 5f48fb9311..a791b663ab 100644 --- a/tsdb/engine/pd1/write_lock.go +++ b/tsdb/engine/pd1/write_lock.go @@ -1,15 +1,14 @@ package pd1 import ( + "reflect" "sync" ) // writeLock is a lock that enables locking of ranges between a // min and max value. We use this so that flushes from the WAL // can occur concurrently along with compactions. -type writeLock struct { - mu sync.Mutex - +type WriteLock struct { rangesLock sync.Mutex ranges []*rangeLock } @@ -19,34 +18,41 @@ type writeLock struct { // an overlapping range will have to wait until the previous // lock is released. A corresponding call to UnlockRange should // be deferred. -func (w *writeLock) LockRange(min, max int64) { - w.mu.Lock() - defer w.mu.Unlock() - +func (w *WriteLock) LockRange(min, max int64) { r := &rangeLock{min: min, max: max} - ranges := w.currentlyLockedRanges() + for { + ranges := w.currentlyLockedRanges() - // ensure there are no currently locked ranges that overlap - for _, rr := range ranges { - if rr.overlaps(r) { - // wait until it gets unlocked - rr.mu.Lock() - // release the lock so the object can get GC'd - rr.mu.Unlock() + // ensure there are no currently locked ranges that overlap + for _, rr := range ranges { + if rr.overlaps(r) { + // wait until it gets unlocked + rr.mu.Lock() + // release the lock so the object can get GC'd + rr.mu.Unlock() + } } + + // ensure that no one else got a lock on the range while we + // were waiting + w.rangesLock.Lock() + if len(w.ranges) == 0 || reflect.DeepEqual(ranges, w.ranges) { + // and lock the range + r.mu.Lock() + + // now that we know the range is free, add it to the locks + w.ranges = append(w.ranges, r) + w.rangesLock.Unlock() + return + } + + // try again + w.rangesLock.Unlock() } - - // and lock the range - r.mu.Lock() - - // now that we know the range is free, add it to the locks - w.rangesLock.Lock() - w.ranges = append(w.ranges, r) - w.rangesLock.Unlock() } // UnlockRange will release a previously locked range. -func (w *writeLock) UnlockRange(min, max int64) { +func (w *WriteLock) UnlockRange(min, max int64) { w.rangesLock.Lock() defer w.rangesLock.Unlock() @@ -62,7 +68,7 @@ func (w *writeLock) UnlockRange(min, max int64) { w.ranges = a } -func (w *writeLock) currentlyLockedRanges() []*rangeLock { +func (w *WriteLock) currentlyLockedRanges() []*rangeLock { w.rangesLock.Lock() defer w.rangesLock.Unlock() a := make([]*rangeLock, len(w.ranges)) @@ -81,6 +87,10 @@ func (r *rangeLock) overlaps(l *rangeLock) bool { return true } else if l.max >= r.min && l.max <= r.max { return true + } else if l.min <= r.min && l.max >= r.max { + return true + } else if l.min >= r.min && l.max <= r.max { + return true } return false } diff --git a/tsdb/engine/pd1/write_lock_test.go b/tsdb/engine/pd1/write_lock_test.go new file mode 100644 index 0000000000..4cdc88e816 --- /dev/null +++ b/tsdb/engine/pd1/write_lock_test.go @@ -0,0 +1,131 @@ +package pd1_test + +import ( + // "sync" + "testing" + "time" + + "github.com/influxdb/influxdb/tsdb/engine/pd1" +) + +func TestWriteLock_FullCover(t *testing.T) { + w := &pd1.WriteLock{} + w.LockRange(2, 10) + + lock := make(chan bool) + timeout := time.NewTimer(10 * time.Millisecond) + go func() { + w.LockRange(1, 11) + lock <- true + }() + select { + case <-lock: + t.Fatal("able to get lock when we shouldn't") + case <-timeout.C: + // we're all good + } +} + +func TestWriteLock_RightIntersect(t *testing.T) { + w := &pd1.WriteLock{} + w.LockRange(2, 10) + + lock := make(chan bool) + timeout := time.NewTimer(10 * time.Millisecond) + go func() { + w.LockRange(5, 15) + lock <- true + }() + select { + case <-lock: + t.Fatal("able to get lock when we shouldn't") + case <-timeout.C: + // we're all good + } +} + +func TestWriteLock_LeftIntersect(t *testing.T) { + w := &pd1.WriteLock{} + w.LockRange(1, 4) + + lock := make(chan bool) + timeout := time.NewTimer(10 * time.Millisecond) + go func() { + w.LockRange(1, 11) + lock <- true + }() + select { + case <-lock: + t.Fatal("able to get lock when we shouldn't") + case <-timeout.C: + // we're all good + } +} + +func TestWriteLock_Inside(t *testing.T) { + w := &pd1.WriteLock{} + w.LockRange(4, 8) + + lock := make(chan bool) + timeout := time.NewTimer(10 * time.Millisecond) + go func() { + w.LockRange(1, 11) + lock <- true + }() + select { + case <-lock: + t.Fatal("able to get lock when we shouldn't") + case <-timeout.C: + // we're all good + } +} + +func TestWriteLock_Same(t *testing.T) { + w := &pd1.WriteLock{} + w.LockRange(2, 10) + + lock := make(chan bool) + timeout := time.NewTimer(10 * time.Millisecond) + go func() { + w.LockRange(2, 10) + lock <- true + }() + select { + case <-lock: + t.Fatal("able to get lock when we shouldn't") + case <-timeout.C: + // we're all good + } +} + +// func TestWriteLock_FreeRangeWithContentionElsewhere(t *testing.T) { +// w := &pd1.WriteLock{} +// w.LockRange(2, 10) + +// lock := make(chan bool) +// freeRange := make(chan bool) +// timeout := time.NewTimer(10 * time.Millisecond) +// var wg sync.WaitGroup + +// wg.Add(1) +// go func() { +// wg.Done() +// w.LockRange(4, 12) +// lock <- true +// }() + +// // make sure the other go func has gotten to the point of requesting the lock +// wg.Wait() +// go func() { +// w.LockRange(15, 23) +// freeRange <- true +// }() +// select { +// case <-lock: +// t.Fatal("able to get lock when we shouldn't") +// case <-timeout.C: +// t.Fatal("unable to get lock of free range when contention exists elsewhere") +// case <-freeRange: +// // we're all good +// } +// }