Fix compaction and multi-write bugs.
* Fix bug with locking when the interval completely covers or is totally inside another one. * Fix bug with full compactions running when the index is actively being written to.pull/4317/merge
parent
c1accf7e59
commit
01b5b9268e
|
@ -71,7 +71,7 @@ var _ tsdb.Engine = &Engine{}
|
||||||
|
|
||||||
// Engine represents a storage engine with compressed blocks.
|
// Engine represents a storage engine with compressed blocks.
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
writeLock *writeLock
|
writeLock *WriteLock
|
||||||
metaLock sync.Mutex
|
metaLock sync.Mutex
|
||||||
path string
|
path string
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
@ -120,7 +120,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
|
||||||
|
|
||||||
e := &Engine{
|
e := &Engine{
|
||||||
path: path,
|
path: path,
|
||||||
writeLock: &writeLock{},
|
writeLock: &WriteLock{},
|
||||||
logger: log.New(os.Stderr, "[pd1] ", log.LstdFlags),
|
logger: log.New(os.Stderr, "[pd1] ", log.LstdFlags),
|
||||||
|
|
||||||
// TODO: this is the function where we can inject a check against the in memory collisions
|
// TODO: this is the function where we can inject a check against the in memory collisions
|
||||||
|
@ -149,10 +149,28 @@ func (e *Engine) PerformMaintenance() {
|
||||||
e.WAL.flush(f)
|
e.WAL.flush(f)
|
||||||
}()
|
}()
|
||||||
return
|
return
|
||||||
} else if e.shouldCompact() {
|
|
||||||
e.logger.Println("compacting for maintenance")
|
|
||||||
go e.Compact(true)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// don't do a full compaction if the WAL received writes in the time window
|
||||||
|
if time.Since(e.WAL.LastWriteTime()) < e.IndexCompactionFullAge {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
e.filesLock.RLock()
|
||||||
|
running := e.compactionRunning
|
||||||
|
e.filesLock.RUnlock()
|
||||||
|
if running {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// do a full compaction if all the index files are older than the compaction time
|
||||||
|
for _, f := range e.copyFilesCollection() {
|
||||||
|
if time.Since(f.modTime) < e.IndexCompactionFullAge {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go e.Compact(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Format returns the format type of this engine
|
// Format returns the format type of this engine
|
||||||
|
@ -204,6 +222,8 @@ func (e *Engine) Open() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.lastCompactionTime = time.Now()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -454,15 +474,12 @@ func (e *Engine) Compact(fullCompaction bool) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
var s string
|
|
||||||
if fullCompaction {
|
|
||||||
s = "FULL "
|
|
||||||
}
|
|
||||||
e.logger.Printf("Starting %scompaction in partition %s of %d files", s, e.path, len(files))
|
|
||||||
st := time.Now()
|
|
||||||
|
|
||||||
// mark the compaction as running
|
// mark the compaction as running
|
||||||
e.filesLock.Lock()
|
e.filesLock.Lock()
|
||||||
|
if e.compactionRunning {
|
||||||
|
e.filesLock.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
e.compactionRunning = true
|
e.compactionRunning = true
|
||||||
e.filesLock.Unlock()
|
e.filesLock.Unlock()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -474,11 +491,19 @@ func (e *Engine) Compact(fullCompaction bool) error {
|
||||||
e.filesLock.Unlock()
|
e.filesLock.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var s string
|
||||||
|
if fullCompaction {
|
||||||
|
s = "FULL "
|
||||||
|
}
|
||||||
|
fileName := e.nextFileName()
|
||||||
|
e.logger.Printf("Starting %scompaction in partition %s of %d files to new file %s", s, e.path, len(files), fileName)
|
||||||
|
st := time.Now()
|
||||||
|
|
||||||
positions := make([]uint32, len(files))
|
positions := make([]uint32, len(files))
|
||||||
ids := make([]uint64, len(files))
|
ids := make([]uint64, len(files))
|
||||||
|
|
||||||
// initilaize for writing
|
// initilaize for writing
|
||||||
f, err := os.OpenFile(e.nextFileName(), os.O_CREATE|os.O_RDWR, 0666)
|
f, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -931,6 +956,12 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if oldDF == nil {
|
||||||
|
e.logger.Printf("writing new index file %s", f.Name())
|
||||||
|
} else {
|
||||||
|
e.logger.Printf("rewriting index file %s with %s", oldDF.f.Name(), f.Name())
|
||||||
|
}
|
||||||
|
|
||||||
// write the magic number
|
// write the magic number
|
||||||
if _, err := f.Write(u32tob(magicNumber)); err != nil {
|
if _, err := f.Write(u32tob(magicNumber)); err != nil {
|
||||||
f.Close()
|
f.Close()
|
||||||
|
@ -1509,7 +1540,7 @@ func (d *dataFile) StartingPositionForID(id uint64) uint32 {
|
||||||
func (d *dataFile) block(pos uint32) (id uint64, t int64, block []byte) {
|
func (d *dataFile) block(pos uint32) (id uint64, t int64, block []byte) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
fmt.Println("FUCK: ", d.f.Name(), pos, id, t)
|
panic(fmt.Sprintf("panic decoding file: %s at position %d for id %d at time %d", d.f.Name(), pos, id, t))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if pos < d.indexPosition() {
|
if pos < d.indexPosition() {
|
||||||
|
|
|
@ -894,6 +894,73 @@ func TestEngine_RewritingOldBlocks(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEngine_WriteIntoCompactedFile(t *testing.T) {
|
||||||
|
e := OpenDefaultEngine()
|
||||||
|
defer e.Cleanup()
|
||||||
|
|
||||||
|
fields := []string{"value"}
|
||||||
|
|
||||||
|
e.MaxPointsPerBlock = 3
|
||||||
|
e.RotateFileSize = 10
|
||||||
|
|
||||||
|
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
|
||||||
|
p2 := parsePoint("cpu,host=A value=1.2 2000000000")
|
||||||
|
p3 := parsePoint("cpu,host=A value=1.3 3000000000")
|
||||||
|
p4 := parsePoint("cpu,host=A value=1.5 4000000000")
|
||||||
|
p5 := parsePoint("cpu,host=A value=1.6 2500000000")
|
||||||
|
|
||||||
|
if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil {
|
||||||
|
t.Fatalf("failed to write points: %s", err.Error())
|
||||||
|
}
|
||||||
|
if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil {
|
||||||
|
t.Fatalf("failed to write points: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.Compact(true); err != nil {
|
||||||
|
t.Fatalf("error compacting: %s", err.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.WritePoints([]models.Point{p4}, nil, nil); err != nil {
|
||||||
|
t.Fatalf("failed to write points: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.Compact(true); err != nil {
|
||||||
|
t.Fatalf("error compacting: %s", err.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.WritePoints([]models.Point{p5}, nil, nil); err != nil {
|
||||||
|
t.Fatalf("failed to write points: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if count := e.DataFileCount(); count != 1 {
|
||||||
|
t.Fatalf("execpted 1 data file but got %d", count)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, _ := e.Begin(false)
|
||||||
|
defer tx.Rollback()
|
||||||
|
c := tx.Cursor("cpu,host=A", fields, nil, true)
|
||||||
|
k, _ := c.SeekTo(0)
|
||||||
|
if k != 1000000000 {
|
||||||
|
t.Fatalf("wrong time: %d", k)
|
||||||
|
}
|
||||||
|
k, _ = c.Next()
|
||||||
|
if k != 2000000000 {
|
||||||
|
t.Fatalf("wrong time: %d", k)
|
||||||
|
}
|
||||||
|
k, _ = c.Next()
|
||||||
|
if k != 2500000000 {
|
||||||
|
t.Fatalf("wrong time: %d", k)
|
||||||
|
}
|
||||||
|
k, _ = c.Next()
|
||||||
|
if k != 3000000000 {
|
||||||
|
t.Fatalf("wrong time: %d", k)
|
||||||
|
}
|
||||||
|
k, _ = c.Next()
|
||||||
|
if k != 4000000000 {
|
||||||
|
t.Fatalf("wrong time: %d", k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Engine represents a test wrapper for pd1.Engine.
|
// Engine represents a test wrapper for pd1.Engine.
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
*pd1.Engine
|
*pd1.Engine
|
||||||
|
|
|
@ -542,7 +542,7 @@ func (l *Log) flush(flush flushType) error {
|
||||||
} else if flush == startupFlush {
|
} else if flush == startupFlush {
|
||||||
ftype = "startup"
|
ftype = "startup"
|
||||||
}
|
}
|
||||||
l.logger.Printf("%s flush of %d keys with %d values of %d bytes\n", ftype, len(l.flushCache), valueCount, flushSize)
|
l.logger.Printf("%s flush of %s with %d keys and %d total values of %d bytes\n", ftype, l.path, len(l.flushCache), valueCount, flushSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
@ -550,7 +550,7 @@ func (l *Log) flush(flush flushType) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if l.LoggingEnabled {
|
if l.LoggingEnabled {
|
||||||
l.logger.Printf("flush to index took %s\n", time.Since(startTime))
|
l.logger.Printf("%s flush to index took %s\n", l.path, time.Since(startTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
l.cacheLock.Lock()
|
l.cacheLock.Lock()
|
||||||
|
|
|
@ -1,15 +1,14 @@
|
||||||
package pd1
|
package pd1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// writeLock is a lock that enables locking of ranges between a
|
// writeLock is a lock that enables locking of ranges between a
|
||||||
// min and max value. We use this so that flushes from the WAL
|
// min and max value. We use this so that flushes from the WAL
|
||||||
// can occur concurrently along with compactions.
|
// can occur concurrently along with compactions.
|
||||||
type writeLock struct {
|
type WriteLock struct {
|
||||||
mu sync.Mutex
|
|
||||||
|
|
||||||
rangesLock sync.Mutex
|
rangesLock sync.Mutex
|
||||||
ranges []*rangeLock
|
ranges []*rangeLock
|
||||||
}
|
}
|
||||||
|
@ -19,11 +18,9 @@ type writeLock struct {
|
||||||
// an overlapping range will have to wait until the previous
|
// an overlapping range will have to wait until the previous
|
||||||
// lock is released. A corresponding call to UnlockRange should
|
// lock is released. A corresponding call to UnlockRange should
|
||||||
// be deferred.
|
// be deferred.
|
||||||
func (w *writeLock) LockRange(min, max int64) {
|
func (w *WriteLock) LockRange(min, max int64) {
|
||||||
w.mu.Lock()
|
|
||||||
defer w.mu.Unlock()
|
|
||||||
|
|
||||||
r := &rangeLock{min: min, max: max}
|
r := &rangeLock{min: min, max: max}
|
||||||
|
for {
|
||||||
ranges := w.currentlyLockedRanges()
|
ranges := w.currentlyLockedRanges()
|
||||||
|
|
||||||
// ensure there are no currently locked ranges that overlap
|
// ensure there are no currently locked ranges that overlap
|
||||||
|
@ -36,17 +33,26 @@ func (w *writeLock) LockRange(min, max int64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensure that no one else got a lock on the range while we
|
||||||
|
// were waiting
|
||||||
|
w.rangesLock.Lock()
|
||||||
|
if len(w.ranges) == 0 || reflect.DeepEqual(ranges, w.ranges) {
|
||||||
// and lock the range
|
// and lock the range
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
|
|
||||||
// now that we know the range is free, add it to the locks
|
// now that we know the range is free, add it to the locks
|
||||||
w.rangesLock.Lock()
|
|
||||||
w.ranges = append(w.ranges, r)
|
w.ranges = append(w.ranges, r)
|
||||||
w.rangesLock.Unlock()
|
w.rangesLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// try again
|
||||||
|
w.rangesLock.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnlockRange will release a previously locked range.
|
// UnlockRange will release a previously locked range.
|
||||||
func (w *writeLock) UnlockRange(min, max int64) {
|
func (w *WriteLock) UnlockRange(min, max int64) {
|
||||||
w.rangesLock.Lock()
|
w.rangesLock.Lock()
|
||||||
defer w.rangesLock.Unlock()
|
defer w.rangesLock.Unlock()
|
||||||
|
|
||||||
|
@ -62,7 +68,7 @@ func (w *writeLock) UnlockRange(min, max int64) {
|
||||||
w.ranges = a
|
w.ranges = a
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *writeLock) currentlyLockedRanges() []*rangeLock {
|
func (w *WriteLock) currentlyLockedRanges() []*rangeLock {
|
||||||
w.rangesLock.Lock()
|
w.rangesLock.Lock()
|
||||||
defer w.rangesLock.Unlock()
|
defer w.rangesLock.Unlock()
|
||||||
a := make([]*rangeLock, len(w.ranges))
|
a := make([]*rangeLock, len(w.ranges))
|
||||||
|
@ -81,6 +87,10 @@ func (r *rangeLock) overlaps(l *rangeLock) bool {
|
||||||
return true
|
return true
|
||||||
} else if l.max >= r.min && l.max <= r.max {
|
} else if l.max >= r.min && l.max <= r.max {
|
||||||
return true
|
return true
|
||||||
|
} else if l.min <= r.min && l.max >= r.max {
|
||||||
|
return true
|
||||||
|
} else if l.min >= r.min && l.max <= r.max {
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
package pd1_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
// "sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestWriteLock_FullCover(t *testing.T) {
|
||||||
|
w := &pd1.WriteLock{}
|
||||||
|
w.LockRange(2, 10)
|
||||||
|
|
||||||
|
lock := make(chan bool)
|
||||||
|
timeout := time.NewTimer(10 * time.Millisecond)
|
||||||
|
go func() {
|
||||||
|
w.LockRange(1, 11)
|
||||||
|
lock <- true
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-lock:
|
||||||
|
t.Fatal("able to get lock when we shouldn't")
|
||||||
|
case <-timeout.C:
|
||||||
|
// we're all good
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteLock_RightIntersect(t *testing.T) {
|
||||||
|
w := &pd1.WriteLock{}
|
||||||
|
w.LockRange(2, 10)
|
||||||
|
|
||||||
|
lock := make(chan bool)
|
||||||
|
timeout := time.NewTimer(10 * time.Millisecond)
|
||||||
|
go func() {
|
||||||
|
w.LockRange(5, 15)
|
||||||
|
lock <- true
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-lock:
|
||||||
|
t.Fatal("able to get lock when we shouldn't")
|
||||||
|
case <-timeout.C:
|
||||||
|
// we're all good
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteLock_LeftIntersect(t *testing.T) {
|
||||||
|
w := &pd1.WriteLock{}
|
||||||
|
w.LockRange(1, 4)
|
||||||
|
|
||||||
|
lock := make(chan bool)
|
||||||
|
timeout := time.NewTimer(10 * time.Millisecond)
|
||||||
|
go func() {
|
||||||
|
w.LockRange(1, 11)
|
||||||
|
lock <- true
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-lock:
|
||||||
|
t.Fatal("able to get lock when we shouldn't")
|
||||||
|
case <-timeout.C:
|
||||||
|
// we're all good
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteLock_Inside(t *testing.T) {
|
||||||
|
w := &pd1.WriteLock{}
|
||||||
|
w.LockRange(4, 8)
|
||||||
|
|
||||||
|
lock := make(chan bool)
|
||||||
|
timeout := time.NewTimer(10 * time.Millisecond)
|
||||||
|
go func() {
|
||||||
|
w.LockRange(1, 11)
|
||||||
|
lock <- true
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-lock:
|
||||||
|
t.Fatal("able to get lock when we shouldn't")
|
||||||
|
case <-timeout.C:
|
||||||
|
// we're all good
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteLock_Same(t *testing.T) {
|
||||||
|
w := &pd1.WriteLock{}
|
||||||
|
w.LockRange(2, 10)
|
||||||
|
|
||||||
|
lock := make(chan bool)
|
||||||
|
timeout := time.NewTimer(10 * time.Millisecond)
|
||||||
|
go func() {
|
||||||
|
w.LockRange(2, 10)
|
||||||
|
lock <- true
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-lock:
|
||||||
|
t.Fatal("able to get lock when we shouldn't")
|
||||||
|
case <-timeout.C:
|
||||||
|
// we're all good
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// func TestWriteLock_FreeRangeWithContentionElsewhere(t *testing.T) {
|
||||||
|
// w := &pd1.WriteLock{}
|
||||||
|
// w.LockRange(2, 10)
|
||||||
|
|
||||||
|
// lock := make(chan bool)
|
||||||
|
// freeRange := make(chan bool)
|
||||||
|
// timeout := time.NewTimer(10 * time.Millisecond)
|
||||||
|
// var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// wg.Add(1)
|
||||||
|
// go func() {
|
||||||
|
// wg.Done()
|
||||||
|
// w.LockRange(4, 12)
|
||||||
|
// lock <- true
|
||||||
|
// }()
|
||||||
|
|
||||||
|
// // make sure the other go func has gotten to the point of requesting the lock
|
||||||
|
// wg.Wait()
|
||||||
|
// go func() {
|
||||||
|
// w.LockRange(15, 23)
|
||||||
|
// freeRange <- true
|
||||||
|
// }()
|
||||||
|
// select {
|
||||||
|
// case <-lock:
|
||||||
|
// t.Fatal("able to get lock when we shouldn't")
|
||||||
|
// case <-timeout.C:
|
||||||
|
// t.Fatal("unable to get lock of free range when contention exists elsewhere")
|
||||||
|
// case <-freeRange:
|
||||||
|
// // we're all good
|
||||||
|
// }
|
||||||
|
// }
|
Loading…
Reference in New Issue