Simplifications
parent
c1840be88d
commit
5576e7fedb
|
@ -635,35 +635,26 @@ func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
|
||||||
defer f.mu.RUnlock()
|
defer f.mu.RUnlock()
|
||||||
|
|
||||||
// struct to hold the result of opening each reader in a goroutine
|
// struct to hold the result of opening each reader in a goroutine
|
||||||
type res struct {
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
resC := make(chan res)
|
|
||||||
var n int
|
|
||||||
|
|
||||||
|
errC := make(chan error, len(f.files))
|
||||||
for _, f := range f.files {
|
for _, f := range f.files {
|
||||||
n++
|
|
||||||
|
|
||||||
go func(tsm TSMFile) {
|
go func(tsm TSMFile) {
|
||||||
if err := fn(tsm); err != nil {
|
if err := fn(tsm); err != nil {
|
||||||
resC <- res{err: fmt.Errorf("file %s: %s", tsm.Path(), err)}
|
errC <- fmt.Errorf("file %s: %s", tsm.Path(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resC <- res{}
|
errC <- nil
|
||||||
}(f)
|
}(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
for i := 0; i < cap(errC); i++ {
|
||||||
for i := 0; i < n; i++ {
|
res := <-errC
|
||||||
res := <-resC
|
if res != nil {
|
||||||
if res.err != nil {
|
return res
|
||||||
err = res.err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(resC)
|
return nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// locations returns the files and index blocks for a key and time. ascending indicates
|
// locations returns the files and index blocks for a key and time. ascending indicates
|
||||||
|
|
|
@ -215,7 +215,7 @@ func (t *TSMReader) applyTombstones() error {
|
||||||
}
|
}
|
||||||
batch = append(batch, ts.Key)
|
batch = append(batch, ts.Key)
|
||||||
|
|
||||||
if len(batch) > 4096 {
|
if len(batch) >= 4096 {
|
||||||
t.index.DeleteRange(batch, prev.Min, prev.Max)
|
t.index.DeleteRange(batch, prev.Min, prev.Max)
|
||||||
batch = batch[:0]
|
batch = batch[:0]
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,10 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
const v2header = 0x1502
|
const (
|
||||||
|
v2header = 0x1502
|
||||||
|
v2headerSize = 4
|
||||||
|
)
|
||||||
|
|
||||||
type Tombstoner struct {
|
type Tombstoner struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
@ -116,8 +119,7 @@ func (t *Tombstoner) Walk(fn func(t Tombstone) error) error {
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
var b [4]byte
|
var b [4]byte
|
||||||
_, err = f.Read(b[:])
|
if _, err := f.Read(b[:]); err != nil {
|
||||||
if err != nil {
|
|
||||||
// Might be a zero length file which should not exist, but
|
// Might be a zero length file which should not exist, but
|
||||||
// an old bug allowed them to occur. Treat it as an empty
|
// an old bug allowed them to occur. Treat it as an empty
|
||||||
// v1 tombstone file so we don't abort loading the TSM file.
|
// v1 tombstone file so we don't abort loading the TSM file.
|
||||||
|
@ -221,7 +223,7 @@ func (t *Tombstoner) readTombstoneV1(f *os.File, fn func(t Tombstone) error) err
|
||||||
// format is binary.
|
// format is binary.
|
||||||
func (t *Tombstoner) readTombstoneV2(f *os.File, fn func(t Tombstone) error) error {
|
func (t *Tombstoner) readTombstoneV2(f *os.File, fn func(t Tombstone) error) error {
|
||||||
// Skip header, already checked earlier
|
// Skip header, already checked earlier
|
||||||
if _, err := f.Seek(4, os.SEEK_SET); err != nil {
|
if _, err := f.Seek(v2headerSize, os.SEEK_SET); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
n := int64(4)
|
n := int64(4)
|
||||||
|
|
Loading…
Reference in New Issue