Merge pull request #927 from influxdata/bj-tsm1-stats
feat(tsdb/tsm1): Add TSM1 measurement stats.pull/10616/head
commit
cf3c70a1a0
|
@ -25,8 +25,8 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"github.com/influxdata/platform/pkg/limiter"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
)
|
||||
|
||||
const maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB
|
||||
|
@ -37,6 +37,9 @@ const (
|
|||
|
||||
// TSMFileExtension is the extension used for TSM files.
|
||||
TSMFileExtension = "tsm"
|
||||
|
||||
// TSSFileExtension is the extension used for TSM stats files.
|
||||
TSSFileExtension = "tss"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -1027,6 +1030,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
|
|||
|
||||
// New TSM files are written to a temp file and renamed when fully completed.
|
||||
fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)
|
||||
statsFileName := StatsFilename(fileName)
|
||||
|
||||
// Write as much as possible to this file
|
||||
err := c.write(fileName, iter, throttle)
|
||||
|
@ -1041,6 +1045,8 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
|
|||
// file that we can drop.
|
||||
if err := os.RemoveAll(fileName); err != nil {
|
||||
return nil, err
|
||||
} else if err := os.RemoveAll(statsFileName); err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
break
|
||||
} else if _, ok := err.(errCompactionInProgress); ok {
|
||||
|
@ -1052,11 +1058,15 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
|
|||
for _, f := range files {
|
||||
if err := os.RemoveAll(f); err != nil {
|
||||
return nil, err
|
||||
} else if err := os.RemoveAll(StatsFilename(f)); err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// We hit an error and didn't finish the compaction. Remove the temp file and abort.
|
||||
if err := os.RemoveAll(fileName); err != nil {
|
||||
return nil, err
|
||||
} else if err := os.RemoveAll(statsFileName); err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tsm1_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
|
@ -9,6 +10,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"github.com/influxdata/platform/tsdb/tsm1"
|
||||
)
|
||||
|
@ -142,6 +144,17 @@ func TestCompactor_CompactFull(t *testing.T) {
|
|||
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
stats := tsm1.NewMeasurementStats()
|
||||
if f, err := os.Open(tsm1.StatsFilename(files[0])); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := stats.ReadFrom(bufio.NewReader(f)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsm1.MeasurementStats{"cpu": 112}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error parsing file name: %v", err)
|
||||
|
@ -234,6 +247,17 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
|
|||
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
stats := tsm1.NewMeasurementStats()
|
||||
if f, err := os.Open(tsm1.StatsFilename(files[0])); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := stats.ReadFrom(bufio.NewReader(f)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsm1.MeasurementStats{"cpu": 116}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
r := MustOpenTSMReader(files[0])
|
||||
|
||||
if got, exp := r.KeyCount(), 1; got != exp {
|
||||
|
@ -314,6 +338,17 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
|
|||
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
stats := tsm1.NewMeasurementStats()
|
||||
if f, err := os.Open(tsm1.StatsFilename(files[0])); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := stats.ReadFrom(bufio.NewReader(f)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsm1.MeasurementStats{"cpu": 202}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
r := MustOpenTSMReader(files[0])
|
||||
|
||||
if got, exp := r.KeyCount(), 1; got != exp {
|
||||
|
@ -629,6 +664,17 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
|
|||
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
stats := tsm1.NewMeasurementStats()
|
||||
if f, err := os.Open(tsm1.StatsFilename(files[0])); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := stats.ReadFrom(bufio.NewReader(f)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsm1.MeasurementStats{"cpu": 44}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error parsing file name: %v", err)
|
||||
|
@ -732,6 +778,17 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
|
|||
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
stats := tsm1.NewMeasurementStats()
|
||||
if f, err := os.Open(tsm1.StatsFilename(files[0])); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := stats.ReadFrom(bufio.NewReader(f)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsm1.MeasurementStats{"cpu": 78}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error parsing file name: %v", err)
|
||||
|
|
|
@ -614,6 +614,11 @@ func (e *Engine) LastModified() time.Time {
|
|||
return fsTime
|
||||
}
|
||||
|
||||
// MeasurementStats returns the current measurement stats for the engine.
|
||||
func (e *Engine) MeasurementStats() (MeasurementStats, error) {
|
||||
return e.FileStore.MeasurementStats()
|
||||
}
|
||||
|
||||
// EngineStatistics maintains statistics for the engine.
|
||||
type EngineStatistics struct {
|
||||
CacheCompactions int64 // Counter of cache compactions that have ever run.
|
||||
|
|
|
@ -19,10 +19,10 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/pkg/metrics"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/file"
|
||||
"github.com/influxdata/platform/pkg/limiter"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -145,6 +145,9 @@ type TSMFile interface {
|
|||
|
||||
// Free releases any resources held by the FileStore to free up system resources.
|
||||
Free() error
|
||||
|
||||
// Stats returns the statistics for the file.
|
||||
MeasurementStats() (MeasurementStats, error)
|
||||
}
|
||||
|
||||
// Statistics gathered by the FileStore.
|
||||
|
@ -1067,6 +1070,22 @@ func (f *FileStore) CreateSnapshot() (string, error) {
|
|||
return tmpPath, nil
|
||||
}
|
||||
|
||||
// MeasurementStats returns the sum of all measurement stats within the store.
|
||||
func (f *FileStore) MeasurementStats() (MeasurementStats, error) {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
||||
stats := NewMeasurementStats()
|
||||
for _, file := range f.files {
|
||||
s, err := file.MeasurementStats()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stats.Add(s)
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// FormatFileNameFunc is executed when generating a new TSM filename.
|
||||
// Source filenames are provided via src.
|
||||
type FormatFileNameFunc func(generation, sequence int) string
|
||||
|
|
|
@ -177,6 +177,7 @@ func (*mockTSMFile) Unref()
|
|||
func (*mockTSMFile) Stats() FileStat { panic("implement me") }
|
||||
func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") }
|
||||
func (*mockTSMFile) Free() error { panic("implement me") }
|
||||
func (*mockTSMFile) MeasurementStats() (MeasurementStats, error) { panic("implement me") }
|
||||
|
||||
func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) {
|
||||
panic("implement me")
|
||||
|
|
|
@ -12,9 +12,9 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"github.com/influxdata/platform/pkg/bytesutil"
|
||||
"github.com/influxdata/platform/pkg/file"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
)
|
||||
|
||||
// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used.
|
||||
|
@ -359,6 +359,23 @@ func (t *TSMReader) Type(key []byte) (byte, error) {
|
|||
return t.index.Type(key)
|
||||
}
|
||||
|
||||
// MeasurementStats returns the on-disk measurement stats for this file, if available.
|
||||
func (t *TSMReader) MeasurementStats() (MeasurementStats, error) {
|
||||
f, err := os.Open(StatsFilename(t.Path()))
|
||||
if os.IsNotExist(err) {
|
||||
return make(MeasurementStats), nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
stats := make(MeasurementStats)
|
||||
if _, err := stats.ReadFrom(f); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return stats, err
|
||||
}
|
||||
|
||||
// Close closes the TSMReader.
|
||||
func (t *TSMReader) Close() error {
|
||||
t.refsWG.Wait()
|
||||
|
@ -418,8 +435,9 @@ func (t *TSMReader) remove() error {
|
|||
}
|
||||
|
||||
if path != "" {
|
||||
err := os.RemoveAll(path)
|
||||
if err != nil {
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
return err
|
||||
} else if err := os.RemoveAll(StatsFilename(path)); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,221 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/platform/pkg/binaryutil"
|
||||
)
|
||||
|
||||
const (
|
||||
// MeasurementMagicNumber is written as the first 4 bytes of a data file to
|
||||
// identify the file as a tsm1 stats file.
|
||||
MeasurementStatsMagicNumber string = "TSS1"
|
||||
|
||||
// MeasurementStatsVersion indicates the version of the TSS1 file format.
|
||||
MeasurementStatsVersion byte = 1
|
||||
)
|
||||
|
||||
// MeasurementStats represents a set of measurement sizes.
|
||||
type MeasurementStats map[string]int
|
||||
|
||||
// NewStats returns a new instance of Stats.
|
||||
func NewMeasurementStats() MeasurementStats {
|
||||
return make(MeasurementStats)
|
||||
}
|
||||
|
||||
// MeasurementNames returns a list of sorted measurement names.
|
||||
func (s MeasurementStats) MeasurementNames() []string {
|
||||
a := make([]string, 0, len(s))
|
||||
for name := range s {
|
||||
a = append(a, name)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return a
|
||||
}
|
||||
|
||||
// Add adds the values of all measurements in other to s.
|
||||
func (s MeasurementStats) Add(other MeasurementStats) {
|
||||
for name, v := range other {
|
||||
s[name] += v
|
||||
}
|
||||
}
|
||||
|
||||
// Sub subtracts the values of all measurements in other from s.
|
||||
func (s MeasurementStats) Sub(other MeasurementStats) {
|
||||
for name, v := range other {
|
||||
s[name] -= v
|
||||
}
|
||||
}
|
||||
|
||||
// ReadFrom reads stats from r in a binary format. Reader must also be an io.ByteReader.
|
||||
func (s MeasurementStats) ReadFrom(r io.Reader) (n int64, err error) {
|
||||
br, ok := r.(io.ByteReader)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("tsm1.MeasurementStats.ReadFrom: ByteReader required")
|
||||
}
|
||||
|
||||
// Read & verify magic.
|
||||
magic := make([]byte, 4)
|
||||
nn, err := io.ReadFull(r, magic)
|
||||
if n += int64(nn); err != nil {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.ReadFrom: cannot read stats magic: %s", err)
|
||||
} else if string(magic) != MeasurementStatsMagicNumber {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.ReadFrom: invalid tsm1 stats file")
|
||||
}
|
||||
|
||||
// Read & verify version.
|
||||
version := make([]byte, 1)
|
||||
nn, err = io.ReadFull(r, version)
|
||||
if n += int64(nn); err != nil {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.ReadFrom: cannot read stats version: %s", err)
|
||||
} else if version[0] != MeasurementStatsVersion {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.ReadFrom: incompatible tsm1 stats version: %d", version[0])
|
||||
}
|
||||
|
||||
// Read checksum.
|
||||
checksum := make([]byte, 4)
|
||||
nn, err = io.ReadFull(r, checksum)
|
||||
if n += int64(nn); err != nil {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.ReadFrom: cannot read checksum: %s", err)
|
||||
}
|
||||
|
||||
// Read measurement count.
|
||||
measurementN, err := binary.ReadVarint(br)
|
||||
if err != nil {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.ReadFrom: cannot read stats measurement count: %s", err)
|
||||
}
|
||||
n += int64(binaryutil.VarintSize(measurementN))
|
||||
|
||||
// Read measurements.
|
||||
for i := int64(0); i < measurementN; i++ {
|
||||
nn64, err := s.readMeasurementFrom(r)
|
||||
if n += nn64; err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
|
||||
// Expect end-of-file.
|
||||
buf := make([]byte, 1)
|
||||
if _, err := r.Read(buf); err != io.EOF {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.ReadFrom: file too large, expected EOF")
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// readMeasurementFrom reads a measurement stat from r in a binary format.
|
||||
func (s MeasurementStats) readMeasurementFrom(r io.Reader) (n int64, err error) {
|
||||
br, ok := r.(io.ByteReader)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("tsm1.MeasurementStats.readMeasurementFrom: ByteReader required")
|
||||
}
|
||||
|
||||
// Read measurement name length.
|
||||
nameLen, err := binary.ReadVarint(br)
|
||||
if err != nil {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.readMeasurementFrom: cannot read stats measurement name length: %s", err)
|
||||
}
|
||||
n += int64(binaryutil.VarintSize(nameLen))
|
||||
|
||||
// Read measurement name. Use large capacity so it can usually be stack allocated.
|
||||
// Go allocates unescaped variables smaller than 64KB on the stack.
|
||||
name := make([]byte, nameLen)
|
||||
nn, err := io.ReadFull(r, name)
|
||||
if n += int64(nn); err != nil {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.readMeasurementFrom: cannot read stats measurement name: %s", err)
|
||||
}
|
||||
|
||||
// Read size.
|
||||
sz, err := binary.ReadVarint(br)
|
||||
if err != nil {
|
||||
return n, fmt.Errorf("tsm1.MeasurementStats.readMeasurementFrom: cannot read stats measurement size: %s", err)
|
||||
}
|
||||
n += int64(binaryutil.VarintSize(sz))
|
||||
|
||||
// Insert into map.
|
||||
s[string(name)] = int(sz)
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// WriteTo writes stats to w in a binary format.
|
||||
func (s MeasurementStats) WriteTo(w io.Writer) (n int64, err error) {
|
||||
// Write magic & version.
|
||||
nn, err := io.WriteString(w, MeasurementStatsMagicNumber)
|
||||
if n += int64(nn); err != nil {
|
||||
return n, err
|
||||
}
|
||||
nn, err = w.Write([]byte{MeasurementStatsVersion})
|
||||
if n += int64(nn); err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write measurement count.
|
||||
var buf bytes.Buffer
|
||||
b := make([]byte, binary.MaxVarintLen64)
|
||||
if _, err = buf.Write(b[:binary.PutVarint(b, int64(len(s)))]); err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write all measurements in sorted order.
|
||||
for _, name := range s.MeasurementNames() {
|
||||
if _, err := s.writeMeasurementTo(&buf, name, s[name]); err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
data := buf.Bytes()
|
||||
|
||||
// Compute & write checksum.
|
||||
if err := binary.Write(w, binary.BigEndian, crc32.ChecksumIEEE(data)); err != nil {
|
||||
return n, err
|
||||
}
|
||||
n += 4
|
||||
|
||||
// Write buffer.
|
||||
nn, err = w.Write(data)
|
||||
if n += int64(nn); err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (s MeasurementStats) writeMeasurementTo(w io.Writer, name string, sz int) (n int64, err error) {
|
||||
// Write measurement name length.
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
nn, err := w.Write(buf[:binary.PutVarint(buf, int64(len(name)))])
|
||||
if n += int64(nn); err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write measurement name.
|
||||
nn, err = io.WriteString(w, name)
|
||||
if n += int64(nn); err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write size.
|
||||
nn, err = w.Write(buf[:binary.PutVarint(buf, int64(sz))])
|
||||
if n += int64(nn); err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// StatsFilename returns the path to the stats file for a given TSM file path.
|
||||
func StatsFilename(tsmPath string) string {
|
||||
if strings.HasSuffix(tsmPath, "."+TmpTSMFileExtension) {
|
||||
tsmPath = strings.TrimSuffix(tsmPath, "."+TmpTSMFileExtension)
|
||||
}
|
||||
if strings.HasSuffix(tsmPath, "."+TSMFileExtension) {
|
||||
tsmPath = strings.TrimSuffix(tsmPath, "."+TSMFileExtension)
|
||||
}
|
||||
return tsmPath + "." + TSSFileExtension
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package tsm1_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/platform/tsdb/tsm1"
|
||||
)
|
||||
|
||||
func TestMeasurementStats_WriteTo(t *testing.T) {
|
||||
t.Run("Empty", func(t *testing.T) {
|
||||
stats, other := tsm1.NewMeasurementStats(), tsm1.NewMeasurementStats()
|
||||
var buf bytes.Buffer
|
||||
if wn, err := stats.WriteTo(&buf); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if rn, err := other.ReadFrom(&buf); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if wn != rn {
|
||||
t.Fatalf("byte count mismatch: w=%d r=%d", wn, rn)
|
||||
} else if diff := cmp.Diff(stats, other); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("WithData", func(t *testing.T) {
|
||||
stats, other := tsm1.NewMeasurementStats(), tsm1.NewMeasurementStats()
|
||||
stats["cpu"] = 100
|
||||
stats["mem"] = 2000
|
||||
|
||||
var buf bytes.Buffer
|
||||
if wn, err := stats.WriteTo(&buf); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if rn, err := other.ReadFrom(&buf); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if wn != rn {
|
||||
t.Fatalf("byte count mismatch: w=%d r=%d", wn, rn)
|
||||
} else if diff := cmp.Diff(stats, other); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -73,6 +73,8 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform/models"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -147,6 +149,9 @@ type TSMWriter interface {
|
|||
// Size returns the current size in bytes of the file.
|
||||
Size() uint32
|
||||
|
||||
// Stats returns the statistics generated by the writer.
|
||||
MeasurementStats() MeasurementStats
|
||||
|
||||
Remove() error
|
||||
}
|
||||
|
||||
|
@ -548,12 +553,19 @@ type tsmWriter struct {
|
|||
|
||||
// The bytes written count of when we last fsync'd
|
||||
lastSync int64
|
||||
|
||||
stats MeasurementStats
|
||||
}
|
||||
|
||||
// NewTSMWriter returns a new TSMWriter writing to w.
|
||||
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
|
||||
index := NewIndexWriter()
|
||||
return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
|
||||
return &tsmWriter{
|
||||
wrapped: w,
|
||||
w: bufio.NewWriterSize(w, 1024*1024),
|
||||
index: index,
|
||||
stats: NewMeasurementStats(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk
|
||||
|
@ -572,9 +584,17 @@ func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) {
|
|||
index = NewIndexWriter()
|
||||
}
|
||||
|
||||
return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
|
||||
return &tsmWriter{
|
||||
wrapped: w,
|
||||
w: bufio.NewWriterSize(w, 1024*1024),
|
||||
index: index,
|
||||
stats: NewMeasurementStats(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// MeasurementStats returns the measurement statistics generated by the writer.
|
||||
func (t *tsmWriter) MeasurementStats() MeasurementStats { return t.stats }
|
||||
|
||||
func (t *tsmWriter) writeHeader() error {
|
||||
var buf [5]byte
|
||||
binary.BigEndian.PutUint32(buf[0:4], MagicNumber)
|
||||
|
@ -633,6 +653,10 @@ func (t *tsmWriter) Write(key []byte, values Values) error {
|
|||
// Record this block in index
|
||||
t.index.Add(key, blockType, values[0].UnixNano(), values[len(values)-1].UnixNano(), t.n, uint32(n))
|
||||
|
||||
// Add block size to measurement stats.
|
||||
name := models.ParseName(key)
|
||||
t.stats[string(name)] += n
|
||||
|
||||
// Increment file position pointer
|
||||
t.n += int64(n)
|
||||
|
||||
|
@ -685,6 +709,10 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte)
|
|||
// Record this block in index
|
||||
t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(n))
|
||||
|
||||
// Add block size to measurement stats.
|
||||
name := models.ParseName(key)
|
||||
t.stats[string(name)] += n
|
||||
|
||||
// Increment file position pointer (checksum + block len)
|
||||
t.n += int64(n)
|
||||
|
||||
|
@ -755,6 +783,26 @@ func (t *tsmWriter) sync() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *tsmWriter) writeStatsFile() error {
|
||||
fw, ok := t.wrapped.(syncer)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(StatsFilename(fw.Name()), os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if _, err := t.stats.WriteTo(f); err != nil {
|
||||
return err
|
||||
} else if err := f.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
func (t *tsmWriter) Close() error {
|
||||
if err := t.Flush(); err != nil {
|
||||
return err
|
||||
|
@ -764,6 +812,11 @@ func (t *tsmWriter) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Write stats to disk, if writer is a file.
|
||||
if err := t.writeStatsFile(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c, ok := t.wrapped.(io.Closer); ok {
|
||||
return c.Close()
|
||||
}
|
||||
|
@ -788,7 +841,11 @@ func (t *tsmWriter) Remove() error {
|
|||
// we just want to cleanup and remove the file.
|
||||
_ = f.Close()
|
||||
|
||||
return os.Remove(f.Name())
|
||||
if err := os.Remove(f.Name()); err != nil {
|
||||
return err
|
||||
} else if err := os.Remove(StatsFilename(f.Name())); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tsm1_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
@ -8,6 +9,7 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/platform/tsdb/tsm1"
|
||||
)
|
||||
|
||||
|
@ -584,6 +586,35 @@ func TestTSMWriter_Write_MaxKey(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensures that a writer will properly compute stats for multiple measurements.
|
||||
func TestTSMWriter_Write_MultipleMeasurements(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
// Write file with multiple measurements.
|
||||
f1 := MustWriteTSM(dir, 1, map[string][]tsm1.Value{
|
||||
"cpu,host=A#!~#value": {tsm1.NewValue(1, 1.1), tsm1.NewValue(2, 1.2)},
|
||||
"cpu,host=B#!~#value": {tsm1.NewValue(1, 1.1)},
|
||||
"mem,host=A#!~#value": {tsm1.NewValue(1, 1.1), tsm1.NewValue(2, 1.2)},
|
||||
"disk,host=A#!~#value": {tsm1.NewValue(1, 1.1)},
|
||||
})
|
||||
|
||||
stats := tsm1.NewMeasurementStats()
|
||||
if f, err := os.Open(tsm1.StatsFilename(f1)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := stats.ReadFrom(bufio.NewReader(f)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsm1.MeasurementStats{
|
||||
"cpu": 78,
|
||||
"mem": 44,
|
||||
"disk": 34,
|
||||
}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
}
|
||||
|
||||
type fakeSyncer bool
|
||||
|
||||
func (f *fakeSyncer) Sync() error {
|
||||
|
|
Loading…
Reference in New Issue