Merge pull request #5003 from influxdb/jw-compaction

Update compaction planning
pull/5005/merge
Paul Dix 2015-12-05 16:49:54 -05:00
commit 40e606cb14
9 changed files with 611 additions and 184 deletions

View File

@ -459,7 +459,9 @@ func cmdDumpTsm1dev(opts *tsdmDumpOpts) {
}
b := make([]byte, 8)
r, err := tsm1.NewTSMReader(f)
r, err := tsm1.NewTSMReaderWithOptions(tsm1.TSMReaderOptions{
MMAPFile: f,
})
if err != nil {
println("Error opening TSM files: ", err.Error())
}

View File

@ -21,15 +21,22 @@ import (
"time"
)
// maxCompactionSegments is the maximum number of segments that can be
// compaction at one time. A lower value would shorten
// minCompactionFileCount is the minimum number of TSM files that need to
// exist before a compaction cycle will run
const minCompactionFileCount = 5
// maxCompactionFileCount is the maximum number of TSM files that can be
// compacted at one time. A lower value would shorten
// compaction times and memory requirements, but produce more TSM files
// with lower compression ratios. A higher value increases compaction times
// and memory usage but produces more dense TSM files.
const maxCompactionSegments = 10
// and memory usage but produces more dense TSM files. This value is a cutoff
// for when to stop looking additional files to add to the compaction set.
const maxCompactionFileCount = 20
const maxTSMFileSize = 250 * 1024 * 1024
const rolloverTSMFileSize = 5 * 1025 * 1024
const rolloverTSMFileSize = 5 * 1024 * 1024
const CompactionTempExtension = "tmp"
var errMaxFileExceeded = fmt.Errorf("max file exceeded")
@ -38,63 +45,178 @@ var (
MinTime = time.Unix(0, 0)
)
// compactionSteps are the sizes of files to roll up into before combining.
var compactionSteps = []int64{
32 * 1024 * 1024,
128 * 1024 * 1024,
512 * 1024 * 1024,
2048 * 1024 * 1024,
}
// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run.
type CompactionPlanner interface {
Plan() []string
}
// DefaultPlanner implements CompactionPlanner using a strategy to minimize
// the number of closed WAL segments as well as rewriting existing files for
// improve compression ratios. It prefers compacting WAL segments over TSM
// files to allow cached points to be evicted more quickly. When looking at
// TSM files, it will pull in TSM files that need to be rewritten to ensure
// points exist in only one file. Reclaiming space is lower priority while
// there are multiple WAL segments still on disk (e.g. deleting tombstones,
// combining smaller TSM files, etc..)
//
// It prioritizes WAL segments and TSM files as follows:
//
// 1) If there are more than 10 closed WAL segments, it will use the 10 oldest
// 2) If there are any TSM files that contain points that would be overwritten
// by a WAL segment, those TSM files are included
// 3) If there are fewer than 10 WAL segments and no TSM files are required to be
// re-written, any TSM files containing tombstones are included.
// 4) If thare are still fewer than 10 WAL segments and no TSM files included, any
// TSM files less than the max file size are included.
// DefaultPlanner implements CompactionPlanner using a strategy to roll up
// multiple generations of TSM files into larger files in stages. It attempts
// to minimize the number of TSM files on disk while rolling up a bounder number
// of files.
type DefaultPlanner struct {
FileStore interface {
Stats() []FileStat
}
}
// tsmGeneration represents the TSM files within a generation.
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
type tsmGeneration struct {
files []FileStat
}
// size returns the total size of the generation
func (t *tsmGeneration) size() int64 {
var n int64
for _, f := range t.files {
n += int64(f.Size)
}
return n
}
func (t *tsmGeneration) lastModified() time.Time {
var max time.Time
for _, f := range t.files {
if f.LastModified.After(max) {
max = f.LastModified
}
}
return max
}
// count return then number of files in the generation
func (t *tsmGeneration) count() int {
return len(t.files)
}
// Plan returns a set of TSM files to rewrite
func (c *DefaultPlanner) Plan() []string {
tsmStats := c.FileStore.Stats()
// Determine the generations from all files on disk. We need to treat
// a generation conceptually as a single file even though it's may be
// split across several files in sequence.
generations := c.findGenerations()
var tsmPaths []string
var hasDeletes bool
for _, tsm := range tsmStats {
if tsm.HasTombstone {
tsmPaths = append(tsmPaths, tsm.Path)
hasDeletes = true
continue
// First find the minimum size of all generations and set of generations.
var order []int
minSize := int64(math.MaxInt64)
for gen, group := range generations {
order = append(order, gen)
if group.size() < minSize {
minSize = group.size()
}
}
sort.Ints(order)
if tsm.Size > rolloverTSMFileSize {
continue
// TODO: If we have multiple generations and they have not been modified
// after some time cutoff, add them all to the set so they all get rewritten
// into one generation.
// if len(generations) > 1 {
// cold := true
// for _, gen := range generations {
// if gen.lastModified().After(time.Now().Add(-10 * time.Minute)) {
// cold = false
// }
// }
// if cold {
// var tsmFiles []string
// for _, gen := range order {
// group := generations[gen]
// // If the generation size is less than our current roll up size,
// // include all the files in that generation.
// for _, f := range group.files {
// tsmFiles = append(tsmFiles, f.Path)
// }
// }
// sort.Strings(tsmFiles)
// return tsmFiles
// }
// }
// Default to the smallest roll up
stepSize := compactionSteps[0]
// Find the smallest rollup size based on the sizes of all generations.
// This is so we prioritize rolling up a bunch of little files over
// a few larger files since a larger number of files on disk impacts
// query performance as well as compression ratios.
for i := len(compactionSteps) - 1; i >= 0; i-- {
step := compactionSteps[i]
if minSize < step {
stepSize = step
}
tsmPaths = append(tsmPaths, tsm.Path)
}
sort.Strings(tsmPaths)
// The set of files that that should be compacted
compacted := &tsmGeneration{}
var genCount int
for _, gen := range order {
group := generations[gen]
if !hasDeletes && len(tsmPaths) <= 1 {
// If the generation size is less than our current roll up size,
// include all the files in that generation.
if group.size() < stepSize {
compacted.files = append(compacted.files, group.files...)
genCount++
}
// Make sure we don't include too many files in one compaction run.
if genCount >= maxCompactionFileCount {
break
}
}
// Make sure we have enough files for a compaction run to actually produce
// something better.
if compacted.count() < minCompactionFileCount {
return nil
}
return tsmPaths
// All the files to be compacted must be compacted in order
var tsmFiles []string
for _, f := range compacted.files {
tsmFiles = append(tsmFiles, f.Path)
}
sort.Strings(tsmFiles)
// Only one, we can't improve on that so nothing to do
if len(tsmFiles) == 1 {
return nil
}
return tsmFiles
}
// findGenerations groups all the TSM files by they generation based
// on their filename
func (c *DefaultPlanner) findGenerations() map[int]*tsmGeneration {
generations := map[int]*tsmGeneration{}
tsmStats := c.FileStore.Stats()
for _, f := range tsmStats {
gen, _, _ := ParseTSMFileName(f.Path)
group := generations[gen]
if group == nil {
group = &tsmGeneration{}
generations[gen] = group
}
group.files = append(group.files, f)
}
return generations
}
// Compactor merges multiple TSM files into new files or
@ -104,19 +226,38 @@ type Compactor struct {
MaxFileSize int
FileStore interface {
NextID() int
NextGeneration() int
}
}
// WriteSnapshot will write a Cache snapshot to a new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
iter := NewCacheKeyIterator(cache)
return c.writeNewFiles(iter)
return c.writeNewFiles(c.FileStore.NextGeneration(), 1, iter)
}
// Compact will write multiple smaller TSM files into 1 or more larger files
func (c *Compactor) Compact(tsmFiles []string) ([]string, error) {
// The new compacted files need to added to the max generation in the
// set. We need to find that max generation as well as the max sequence
// number to ensure we write to the next unique location.
var maxGeneration, maxSequence int
for _, f := range tsmFiles {
gen, seq, err := ParseTSMFileName(f)
if err != nil {
return nil, err
}
if gen > maxGeneration {
maxGeneration = gen
maxSequence = seq
}
if gen == maxGeneration && seq > maxSequence {
maxSequence = seq
}
}
// For each TSM file, create a TSM reader
var trs []*TSMReader
for _, file := range tsmFiles {
@ -144,7 +285,7 @@ func (c *Compactor) Compact(tsmFiles []string) ([]string, error) {
return nil, err
}
return c.writeNewFiles(tsm)
return c.writeNewFiles(maxGeneration, maxSequence, tsm)
}
// Clone will return a new compactor that can be used even if the engine is closed
@ -158,15 +299,14 @@ func (c *Compactor) Clone() *Compactor {
// writeNewFiles will write from the iterator into new TSM files, rotating
// to a new file when we've reached the max TSM file size
func (c *Compactor) writeNewFiles(iter KeyIterator) ([]string, error) {
func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([]string, error) {
// These are the new TSM files written
var files []string
for {
currentID := c.FileStore.NextID()
sequence++
// New TSM files are written to a temp file and renamed when fully completed.
fileName := filepath.Join(c.Dir, fmt.Sprintf("%07d.%s.tmp", currentID, "tsm1dev"))
fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.tmp", generation, sequence, "tsm1dev"))
// Write as much as possible to this file
err := c.write(fileName, iter)
@ -187,6 +327,9 @@ func (c *Compactor) writeNewFiles(iter KeyIterator) ([]string, error) {
// We hit an error but didn't finish the compaction. Remove the temp file and abort.
if err != nil {
if err := os.Remove(fileName); err != nil {
return nil, err
}
return nil, err
}
@ -307,7 +450,7 @@ func (k *tsmKeyIterator) Next() bool {
}
// Grab the key for this reader
key := r.Key(k.pos[i])
key, entries := r.Key(k.pos[i])
k.keys[i] = key
if key != "" && key <= k.key {
@ -322,9 +465,14 @@ func (k *tsmKeyIterator) Next() bool {
if key != "" {
// Note: this could be made more efficient to just grab chunks of values instead of
// all for the key.
values, err := r.ReadAll(key)
if err != nil {
k.err = err
var values []Value
for _, entry := range entries {
v, err := r.ReadAt(entry, nil)
if err != nil {
k.err = err
}
values = append(values, v...)
}
if len(values) > 0 {
@ -347,7 +495,6 @@ func (k *tsmKeyIterator) Next() bool {
// Determine our current key which is the smallest key in the values map
k.key = k.currentKey()
}
return len(k.values) > 0
}

View File

@ -3,6 +3,7 @@ package tsm1_test
import (
"fmt"
"os"
"path/filepath"
"testing"
"time"
@ -87,7 +88,7 @@ func TestCompactor_Compact(t *testing.T) {
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a1},
}
f1 := MustWriteTSM(dir, writes)
f1 := MustWriteTSM(dir, 1, writes)
a2 := tsm1.NewValue(time.Unix(2, 0), 1.2)
b1 := tsm1.NewValue(time.Unix(1, 0), 2.1)
@ -95,7 +96,7 @@ func TestCompactor_Compact(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{a2},
"cpu,host=B#!~#value": []tsm1.Value{b1},
}
f2 := MustWriteTSM(dir, writes)
f2 := MustWriteTSM(dir, 2, writes)
a3 := tsm1.NewValue(time.Unix(1, 0), 1.3)
c1 := tsm1.NewValue(time.Unix(1, 0), 3.1)
@ -103,7 +104,7 @@ func TestCompactor_Compact(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{a3},
"cpu,host=C#!~#value": []tsm1.Value{c1},
}
f3 := MustWriteTSM(dir, writes)
f3 := MustWriteTSM(dir, 3, writes)
compactor := &tsm1.Compactor{
Dir: dir,
@ -120,6 +121,25 @@ func TestCompactor_Compact(t *testing.T) {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
expGen, expSeq, err := tsm1.ParseTSMFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1
gotGen, gotSeq, err := tsm1.ParseTSMFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
if gotGen != expGen {
t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen)
}
if gotSeq != expSeq {
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
}
r := MustOpenTSMReader(files[0])
keys := r.Keys()
@ -162,7 +182,7 @@ func TestKeyIterator_TSM_Single(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v1},
}
r := MustTSMReader(dir, writes)
r := MustTSMReader(dir, 1, writes)
iter, err := tsm1.NewTSMKeyIterator(r)
if err != nil {
@ -211,7 +231,7 @@ func TestKeyIterator_TSM_Duplicate(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v1},
}
r := MustTSMReader(dir, writes)
r := MustTSMReader(dir, 1, writes)
iter, err := tsm1.NewTSMKeyIterator(r)
if err != nil {
@ -253,7 +273,7 @@ func TestKeyIterator_TSM_MultipleKeysDeleted(t *testing.T) {
"cpu,host=A#!~#value": []tsm1.Value{v1},
}
r1 := MustTSMReader(dir, points1)
r1 := MustTSMReader(dir, 1, points1)
r1.Delete("cpu,host=A#!~#value")
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
@ -264,7 +284,7 @@ func TestKeyIterator_TSM_MultipleKeysDeleted(t *testing.T) {
"cpu,host=B#!~#value": []tsm1.Value{v3},
}
r2 := MustTSMReader(dir, points2)
r2 := MustTSMReader(dir, 1, points2)
r2.Delete("cpu,host=A#!~#count")
iter, err := tsm1.NewTSMKeyIterator(r1, r2)
@ -346,21 +366,21 @@ func TestKeyIterator_Cache_Single(t *testing.T) {
}
}
func TestDefaultCompactionPlanner_OnlyTSM_MaxSize(t *testing.T) {
func TestDefaultCompactionPlanner_Min5(t *testing.T) {
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return []tsm1.FileStat{
tsm1.FileStat{
Path: "1.tsm1",
Path: "01-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "2.tsm1",
Path: "02-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "3.tsm",
Path: "03-1.tsm1",
Size: 251 * 1024 * 1024,
},
}
@ -369,59 +389,137 @@ func TestDefaultCompactionPlanner_OnlyTSM_MaxSize(t *testing.T) {
}
tsm := cp.Plan()
if exp, got := 2, len(tsm); got != exp {
if exp, got := 0, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
}
func TestDefaultCompactionPlanner_TSM_Rewrite(t *testing.T) {
func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "01-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "01-03.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "05-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-1.tsm1",
Size: 251 * 1024 * 1024,
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return []tsm1.FileStat{
tsm1.FileStat{
Path: "0001.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "0002.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Size: 251 * 1024 * 1024,
},
}
return data
},
},
}
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4]}
tsm := cp.Plan()
if exp, got := 2, len(tsm); got != exp {
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles {
if got, exp := tsm[i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultCompactionPlanner_Rewrite_Deletes(t *testing.T) {
func TestDefaultCompactionPlanner_SkipMaxSize(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
Size: 251 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "05-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return []tsm1.FileStat{
tsm1.FileStat{
Path: "000007.tsm1",
HasTombstone: true,
},
tsm1.FileStat{
Size: 251 * 1024 * 1024,
},
}
return data
},
},
}
expFiles := []tsm1.FileStat{data[1], data[2], data[3], data[4], data[5]}
tsm := cp.Plan()
if exp, got := 1, len(tsm); got != exp {
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles {
if got, exp := tsm[i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultCompactionPlanner_Limit20(t *testing.T) {
var data []tsm1.FileStat
for i := 1; i < 25; i++ {
data = append(data, tsm1.FileStat{
Path: fmt.Sprintf("%07d-01.tsm1", i),
Size: 1 * 1024 * 1024,
})
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
expFiles := data[:20]
tsm := cp.Plan()
if exp, got := len(expFiles), len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles {
if got, exp := tsm[i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func assertValueEqual(t *testing.T, a, b tsm1.Value) {
@ -459,8 +557,13 @@ func MustWALSegment(dir string, entries []tsm1.WALEntry) *tsm1.WALSegmentReader
return tsm1.NewWALSegmentReader(f)
}
func MustWriteTSM(dir string, values map[string][]tsm1.Value) string {
func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string {
f := MustTempFile(dir)
newName := filepath.Join(filepath.Dir(f.Name()), tsmFileName(gen))
if err := os.Rename(f.Name(), newName); err != nil {
panic(fmt.Sprintf("create tsm file: %v", err))
}
w, err := tsm1.NewTSMWriter(f)
if err != nil {
panic(fmt.Sprintf("create TSM writer: %v", err))
@ -480,11 +583,11 @@ func MustWriteTSM(dir string, values map[string][]tsm1.Value) string {
panic(fmt.Sprintf("write TSM close: %v", err))
}
return f.Name()
return newName
}
func MustTSMReader(dir string, values map[string][]tsm1.Value) *tsm1.TSMReader {
return MustOpenTSMReader(MustWriteTSM(dir, values))
func MustTSMReader(dir string, gen int, values map[string][]tsm1.Value) *tsm1.TSMReader {
return MustOpenTSMReader(MustWriteTSM(dir, gen, values))
}
func MustOpenTSMReader(name string) *tsm1.TSMReader {
@ -519,6 +622,6 @@ func (w *fakeFileStore) Stats() []tsm1.FileStat {
return w.PathsFn()
}
func (w *fakeFileStore) NextID() int {
func (w *fakeFileStore) NextGeneration() int {
return 1
}

View File

@ -146,7 +146,7 @@ type TSMIndex interface {
Keys() []string
// Key returns the key in the index at the given postion.
Key(index int) string
Key(index int) (string, []*IndexEntry)
// KeyCount returns the count of unique keys in the index.
KeyCount() int
@ -154,6 +154,12 @@ type TSMIndex interface {
// Size returns the size of a the current index in bytes
Size() int
// TimeRange returns the min and max time across all keys in the file.
TimeRange() (time.Time, time.Time)
// KeyRange returns the min and max keys in the file.
KeyRange() (string, string)
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist,
// an error is returned.
@ -292,17 +298,47 @@ func (d *directIndex) Keys() []string {
return keys
}
func (d *directIndex) Key(idx int) string {
func (d *directIndex) Key(idx int) (string, []*IndexEntry) {
if idx < 0 || idx >= len(d.blocks) {
return ""
return "", nil
}
return d.Keys()[idx]
k := d.Keys()[idx]
return k, d.blocks[k].entries
}
func (d *directIndex) KeyCount() int {
return len(d.Keys())
}
func (d *directIndex) KeyRange() (string, string) {
var min, max string
for k := range d.blocks {
if min == "" || k < min {
min = k
}
if max == "" || k > max {
max = k
}
}
return min, max
}
func (d *directIndex) TimeRange() (time.Time, time.Time) {
min, max := time.Unix(0, math.MaxInt64), time.Unix(0, math.MinInt64)
for _, entries := range d.blocks {
for _, e := range entries.entries {
if e.MinTime.Before(min) {
min = e.MinTime
}
if e.MaxTime.After(max) {
max = e.MaxTime
}
}
}
return min, max
}
func (d *directIndex) addEntries(key string, entries *indexEntries) {
existing := d.blocks[key]
if existing == nil {
@ -559,15 +595,16 @@ func (d *indirectIndex) Keys() []string {
return keys
}
func (d *indirectIndex) Key(idx int) string {
func (d *indirectIndex) Key(idx int) (string, []*IndexEntry) {
d.mu.RLock()
defer d.mu.RUnlock()
if idx < 0 || idx >= len(d.offsets) {
return ""
return "", nil
}
_, key, _ := readKey(d.b[d.offsets[idx]:])
return key
n, key, _ := readKey(d.b[d.offsets[idx]:])
_, entries, _ := readEntries(d.b[int(d.offsets[idx])+n:])
return key, entries.entries
}
func (d *indirectIndex) KeyCount() int {
@ -617,6 +654,14 @@ func (d *indirectIndex) Type(key string) (byte, error) {
return 0, fmt.Errorf("key does not exist: %v", key)
}
func (d *indirectIndex) KeyRange() (string, string) {
return d.minKey, d.maxKey
}
func (d *indirectIndex) TimeRange() (time.Time, time.Time) {
return d.minTime, d.maxTime
}
// MarshalBinary returns a byte slice encoded version of the index.
func (d *indirectIndex) MarshalBinary() ([]byte, error) {
d.mu.RLock()
@ -643,6 +688,19 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
for i < int32(len(b)) {
d.offsets = append(d.offsets, i)
_, key, err := readKey(b[i:])
if err != nil {
return err
}
if d.minKey == "" || key < d.minKey {
d.minKey = key
}
if d.maxKey == "" || key > d.maxKey {
d.maxKey = key
}
keyLen := int32(btou16(b[i : i+2]))
// Skip to the start of the key
i += 2
@ -650,17 +708,19 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
// Skip over the key
i += keyLen
// Skip over the block type
i += indexTypeSize
n, entries, err := readEntries(d.b[i:])
// Count of all the index blocks for this key
count := int32(btou16(b[i : i+2]))
minTime := entries.entries[0].MinTime
if d.minTime.IsZero() || minTime.Before(d.minTime) {
d.minTime = minTime
}
// Skip the count bytes
i += 2
maxTime := entries.entries[len(entries.entries)-1].MaxTime
if d.maxTime.IsZero() || maxTime.After(d.maxTime) {
d.maxTime = maxTime
}
// Skip over all the blocks
i += count * indexEntrySize
i += int32(n)
}
return nil
@ -770,7 +830,11 @@ type TSMReader struct {
// tombstoner ensures tombstoned keys are not available by the index.
tombstoner *Tombstoner
// size is the size of the file on disk.
size int64
// lastModified is the last time this file was modified on disk
lastModified time.Time
}
// blockAccessor abstracts a method of accessing blocks from a
@ -808,7 +872,14 @@ func NewTSMReaderWithOptions(opt TSMReaderOptions) (*TSMReader, error) {
return nil, err
}
t.size = size
if f, ok := opt.Reader.(*os.File); ok {
stat, err := f.Stat()
if err != nil {
return nil, err
}
t.lastModified = stat.ModTime()
}
t.accessor = &fileAccessor{
r: opt.Reader,
}
@ -819,6 +890,7 @@ func NewTSMReaderWithOptions(opt TSMReaderOptions) (*TSMReader, error) {
return nil, err
}
t.size = stat.Size()
t.lastModified = stat.ModTime()
t.accessor = &mmapAccessor{
f: opt.MMAPFile,
}
@ -866,7 +938,7 @@ func (t *TSMReader) Keys() []string {
return t.index.Keys()
}
func (t *TSMReader) Key(index int) string {
func (t *TSMReader) Key(index int) (string, []*IndexEntry) {
return t.index.Key(index)
}
@ -940,28 +1012,12 @@ func (t *TSMReader) Delete(key string) error {
// TimeRange returns the min and max time across all keys in the file.
func (t *TSMReader) TimeRange() (time.Time, time.Time) {
min, max := time.Unix(0, math.MaxInt64), time.Unix(0, math.MinInt64)
for _, k := range t.index.Keys() {
for _, e := range t.index.Entries(k) {
if e.MinTime.Before(min) {
min = e.MinTime
}
if e.MaxTime.After(max) {
max = e.MaxTime
}
}
}
return min, max
return t.index.TimeRange()
}
// KeyRange returns the min and max key across all keys in the file.
func (t *TSMReader) KeyRange() (string, string) {
min, max := "", ""
if t.index.KeyCount() > 0 {
min = t.index.Key(0)
max = t.index.Key(t.index.KeyCount() - 1)
}
return min, max
return t.index.KeyRange()
}
func (t *TSMReader) Entries(key string) []*IndexEntry {
@ -978,6 +1034,12 @@ func (t *TSMReader) Size() int {
return int(t.size)
}
func (t *TSMReader) LastModified() time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
return t.lastModified
}
// HasTombstones return true if there are any tombstone entries recorded.
func (t *TSMReader) HasTombstones() bool {
t.mu.RLock()
@ -985,6 +1047,21 @@ func (t *TSMReader) HasTombstones() bool {
return t.tombstoner.HasTombstones()
}
func (t *TSMReader) Stats() FileStat {
minTime, maxTime := t.index.TimeRange()
minKey, maxKey := t.index.KeyRange()
return FileStat{
Path: t.Path(),
Size: t.Size(),
LastModified: t.LastModified(),
MinTime: minTime,
MaxTime: maxTime,
MinKey: minKey,
MaxKey: maxKey,
HasTombstone: t.tombstoner.HasTombstones(),
}
}
// fileAccessor is file IO based block accessor. It provides access to blocks
// using a file IO based approach (seek, read, etc.)
type fileAccessor struct {

View File

@ -811,3 +811,68 @@ func TestTSMReader_MMAP_Tombstone(t *testing.T) {
t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
}
}
func TestTSMReader_MMAP_Stats(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
defer f.Close()
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values1 := []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}
if err := w.Write("cpu", values1); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
values2 := []tsm1.Value{tsm1.NewValue(time.Unix(1, 0), 1.0)}
if err := w.Write("mem", values2); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error writing index: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
f, err = os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error open file: %v", err)
}
r, err := tsm1.NewTSMReaderWithOptions(
tsm1.TSMReaderOptions{
MMAPFile: f,
})
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
defer r.Close()
stats := r.Stats()
if got, exp := stats.MinKey, "cpu"; got != exp {
t.Fatalf("min key mismatch: got %v, exp %v", got, exp)
}
if got, exp := stats.MaxKey, "mem"; got != exp {
t.Fatalf("max key mismatch: got %v, exp %v", got, exp)
}
if got, exp := stats.MinTime, values1[0].Time(); got != exp {
t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
}
if got, exp := stats.MaxTime, values2[0].Time(); got != exp {
t.Fatalf("max time mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(r.Keys()), 2; got != exp {
t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
}
}

View File

@ -527,3 +527,18 @@ func BenchmarkDecodeBlock_String_TypeSpecific(b *testing.B) {
}
}
}
func BenchmarkValues_Deduplicate(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
values := make([]tsm1.Value, len(times))
for i, t := range times {
values[i] = tsm1.NewValue(t, fmt.Sprintf("value %d", i))
}
values = append(values, values...)
b.ResetTimer()
for i := 0; i < b.N; i++ {
tsm1.Values(values).Deduplicate()
}
}

View File

@ -5,6 +5,7 @@ import (
"io"
"log"
"os"
"path/filepath"
"sort"
"sync"
"time"
@ -96,6 +97,10 @@ func (e *DevEngine) Open() error {
return err
}
if err := e.cleanup(); err != nil {
return err
}
if err := e.WAL.Open(); err != nil {
return err
}
@ -108,7 +113,8 @@ func (e *DevEngine) Open() error {
return err
}
go e.compact()
go e.compactCache()
go e.compactTSM()
return nil
}
@ -323,7 +329,7 @@ func (e *DevEngine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache
return nil
}
func (e *DevEngine) compact() {
func (e *DevEngine) compactCache() {
for {
if e.Cache.Size() > e.CacheFlushMemorySizeThreshold {
err := e.WriteSnapshot()
@ -331,7 +337,12 @@ func (e *DevEngine) compact() {
e.logger.Printf("error writing snapshot: %v", err)
}
}
time.Sleep(time.Second)
}
}
func (e *DevEngine) compactTSM() {
for {
tsmFiles := e.CompactionPlan.Plan()
if len(tsmFiles) == 0 {
@ -402,6 +413,20 @@ func (e *DevEngine) reloadCache() error {
return nil
}
func (e *DevEngine) cleanup() error {
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension)))
if err != nil {
return fmt.Errorf("error getting compaction checkpoints: %s", err.Error())
}
for _, f := range files {
if err := os.Remove(f); err != nil {
return fmt.Errorf("error removing temp compaction files: %v", err)
}
}
return nil
}
func (e *DevEngine) KeyCursor(key string) *KeyCursor {
e.mu.RLock()
defer e.mu.RUnlock()

View File

@ -61,23 +61,25 @@ type TSMFile interface {
// Remove deletes the file from the filesystem
Remove() error
// Stats returns summary information about the TSM file.
Stats() FileStat
}
type FileStore struct {
mu sync.RWMutex
currentFileID int
dir string
currentGeneration int
dir string
files []TSMFile
stats []FileStat
}
type FileStat struct {
Path string
HasTombstone bool
Size int
LastModified time.Time
MinTime, MaxTime time.Time
MinKey, MaxKey string
}
@ -108,19 +110,19 @@ func (f *FileStore) Count() int {
return len(f.files)
}
// CurrentID returns the max file ID + 1
func (f *FileStore) CurrentID() int {
// CurrentGeneration returns the max file ID + 1
func (f *FileStore) CurrentGeneration() int {
f.mu.RLock()
defer f.mu.RUnlock()
return f.currentFileID
return f.currentGeneration
}
// NextID returns the max file ID + 1
func (f *FileStore) NextID() int {
// NextGeneration returns the max file ID + 1
func (f *FileStore) NextGeneration() int {
f.mu.Lock()
defer f.mu.Unlock()
f.currentFileID++
return f.currentFileID
f.currentGeneration++
return f.currentGeneration
}
func (f *FileStore) Add(files ...TSMFile) {
@ -212,13 +214,13 @@ func (f *FileStore) Open() error {
for _, fn := range files {
// Keep track of the latest ID
id, err := f.idFromFileName(fn)
generation, _, err := ParseTSMFileName(fn)
if err != nil {
return err
}
if id >= f.currentFileID {
f.currentFileID = id + 1
if generation >= f.currentGeneration {
f.currentGeneration = generation + 1
}
file, err := os.OpenFile(fn, os.O_RDONLY, 0666)
@ -292,32 +294,13 @@ func (f *FileStore) KeyCursor(key string) *KeyCursor {
func (f *FileStore) Stats() []FileStat {
f.mu.RLock()
if f.stats == nil {
f.mu.RUnlock()
f.mu.Lock()
defer f.mu.Unlock()
var paths []FileStat
for _, fd := range f.files {
minTime, maxTime := fd.TimeRange()
minKey, maxKey := fd.KeyRange()
paths = append(paths, FileStat{
Path: fd.Path(),
Size: fd.Size(),
MinTime: minTime,
MaxTime: maxTime,
MinKey: minKey,
MaxKey: maxKey,
HasTombstone: fd.HasTombstones(),
})
}
f.stats = paths
return f.stats
}
defer f.mu.RUnlock()
stats := make([]FileStat, len(f.files))
for i, fd := range f.files {
stats[i] = fd.Stats()
}
return f.stats
return stats
}
func (f *FileStore) Replace(oldFiles, newFiles []string) error {
@ -338,7 +321,9 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
if strings.HasSuffix(file, ".tmp") {
// The new TSM files have a tmp extension. First rename them.
newName = file[:len(file)-4]
os.Rename(file, newName)
if err := os.Rename(file, newName); err != nil {
return err
}
}
fd, err := os.Open(newName)
@ -378,22 +363,30 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
}
}
f.stats = nil
f.files = active
return nil
}
// idFromFileName parses the segment file ID from its name
func (f *FileStore) idFromFileName(name string) (int, error) {
parts := strings.Split(filepath.Base(name), ".")
if len(parts) != 2 {
return 0, fmt.Errorf("file %s is named incorrectly", name)
// ParseTSMFileName parses the generation and sequence from a TSM file name.
func ParseTSMFileName(name string) (int, int, error) {
base := filepath.Base(name)
idx := strings.Index(base, ".")
if idx == -1 {
return 0, 0, fmt.Errorf("file %s is named incorrectly", name)
}
id, err := strconv.ParseUint(parts[0], 10, 32)
id := base[:idx]
return int(id), err
parts := strings.Split(id, "-")
if len(parts) != 2 {
return 0, 0, fmt.Errorf("file %s is named incorrectly", name)
}
generation, err := strconv.ParseUint(parts[0], 10, 32)
sequence, err := strconv.ParseUint(parts[1], 10, 32)
return int(generation), int(sequence), err
}
type KeyCursor struct {

View File

@ -361,7 +361,7 @@ func TestFileStore_Open(t *testing.T) {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
if got, exp := fs.CurrentID(), 4; got != exp {
if got, exp := fs.CurrentGeneration(), 4; got != exp {
t.Fatalf("current ID mismatch: got %v, exp %v", got, exp)
}
}
@ -392,7 +392,7 @@ func TestFileStore_Remove(t *testing.T) {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
if got, exp := fs.CurrentID(), 4; got != exp {
if got, exp := fs.CurrentGeneration(), 4; got != exp {
t.Fatalf("current ID mismatch: got %v, exp %v", got, exp)
}
@ -402,7 +402,7 @@ func TestFileStore_Remove(t *testing.T) {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
if got, exp := fs.CurrentID(), 4; got != exp {
if got, exp := fs.CurrentGeneration(), 4; got != exp {
t.Fatalf("current ID mismatch: got %v, exp %v", got, exp)
}
}
@ -563,9 +563,9 @@ func MustTempFile(dir string) *os.File {
}
func fatal(t *testing.T, msg string, err error) {
t.Fatalf("unexpected error %s: %v", msg, err)
t.Fatalf("unexpected error %v: %v", msg, err)
}
func tsmFileName(id int) string {
return fmt.Sprintf("%07d.tsm1dev", id)
return fmt.Sprintf("%09d-%09d.tsm1dev", id, 1)
}