feat: disk size metrics per shard (#22912)

pull/22915/head
Sam Arnold 2021-11-22 16:53:55 -05:00 committed by GitHub
parent 0a740856c7
commit a74e05177c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 140 additions and 159 deletions

1
go.sum
View File

@ -440,7 +440,6 @@ github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803 h1:LpaVAM5Www2R7M0GJAxAdL3swBvmna8Pyzw6F7o+j04=
github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803/go.mod h1:qgAMR6M9EokX+R5X7jUQfubwBdS1tBIl4yVJ3shhcWk=
github.com/influxdata/pkg-config v0.2.9-0.20210928145121-f721f9766b86 h1:GeX1p061dF9UjD5dWa5X4kat8IN1YwkWm7vZ0zVF20Y=
github.com/influxdata/pkg-config v0.2.9-0.20210928145121-f721f9766b86/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk=
github.com/influxdata/pkg-config v0.2.9 h1:OXQkn8rKk+dLibqRRDCqOvwtXZZbkt9KCkwAVikHwcA=
github.com/influxdata/pkg-config v0.2.9/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk=

View File

@ -43,7 +43,6 @@ type TSDBStoreMock struct {
ShardNFn func() int
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
StatisticsFn func(tags map[string]string) []models.Statistic
TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log *zap.Logger)
@ -135,9 +134,6 @@ func (s *TSDBStoreMock) ShardRelativePath(id uint64) (string, error) {
func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard {
return s.ShardsFn(ids)
}
func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic {
return s.StatisticsFn(tags)
}
func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
return s.TagKeysFn(ctx, auth, shardIDs, cond)
}

View File

@ -1,55 +0,0 @@
package models_test
import (
"reflect"
"testing"
"github.com/influxdata/influxdb/v2/models"
)
func TestTags_Merge(t *testing.T) {
examples := []struct {
Base map[string]string
Arg map[string]string
Result map[string]string
}{
{
Base: nil,
Arg: nil,
Result: map[string]string{},
},
{
Base: nil,
Arg: map[string]string{"foo": "foo"},
Result: map[string]string{"foo": "foo"},
},
{
Base: map[string]string{"foo": "foo"},
Arg: nil,
Result: map[string]string{"foo": "foo"},
},
{
Base: map[string]string{"foo": "foo"},
Arg: map[string]string{"bar": "bar"},
Result: map[string]string{"foo": "foo", "bar": "bar"},
},
{
Base: map[string]string{"foo": "foo", "bar": "bar"},
Arg: map[string]string{"zoo": "zoo"},
Result: map[string]string{"foo": "foo", "bar": "bar", "zoo": "zoo"},
},
{
Base: map[string]string{"foo": "foo", "bar": "bar"},
Arg: map[string]string{"bar": "newbar"},
Result: map[string]string{"foo": "foo", "bar": "newbar"},
},
}
for i, example := range examples {
i++
result := models.StatisticTags(example.Base).Merge(example.Arg)
if got, exp := result, example.Result; !reflect.DeepEqual(got, exp) {
t.Errorf("[Example %d] got %#v, expected %#v", i, got, exp)
}
}
}

View File

@ -73,7 +73,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
t.Run("cache", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir)
fs := NewFileStore(dir, EngineTags{})
const START, END = 10, 1
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
@ -96,7 +96,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
t.Run("tsm", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir)
fs := NewFileStore(dir, EngineTags{})
const START, END = 10, 1
@ -133,7 +133,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
func TestFileStore_DuplicatePoints(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir)
fs := NewFileStore(dir, EngineTags{})
makeVals := func(ts ...int64) []Value {
vals := make([]Value, len(ts))
@ -218,7 +218,7 @@ func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir)
fs := NewFileStore(dir, EngineTags{})
// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64) []Value {
@ -320,7 +320,7 @@ func (a *FloatArray) Swap(i, j int) {
func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir)
fs := NewFileStore(dir, EngineTags{})
// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64, v float64) []Value {
@ -414,7 +414,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing
func TestFileStore_SeekBoundaries(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir)
fs := NewFileStore(dir, EngineTags{})
// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64, v float64) []Value {

View File

@ -158,13 +158,21 @@ type Engine struct {
// NewEngine returns a new instance of Engine.
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine {
etags := EngineTags{
path: path,
walPath: walPath,
id: fmt.Sprintf("%d", id),
bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db
engineVersion: opt.EngineVersion,
}
var wal *WAL
if opt.WALEnabled {
wal = NewWAL(walPath, opt.Config.WALMaxConcurrentWrites, opt.Config.WALMaxWriteDelay)
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
}
fs := NewFileStore(path)
fs := NewFileStore(path, etags)
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
@ -184,13 +192,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
planner.SetFileStore(fs)
}
stats := newEngineMetrics(engineTags{
path: path,
walPath: walPath,
id: fmt.Sprintf("%d", id),
bucket: filepath.Base(filepath.Dir(filepath.Dir(path))), // discard shard & rp, take db
engineVersion: opt.EngineVersion,
})
stats := newEngineMetrics(etags)
activeCompactions := &compactionCounter{}
e := &Engine{
id: id,
@ -597,6 +599,8 @@ func PrometheusCollectors() []prometheus.Collector {
globalCompactionMetrics.Active,
globalCompactionMetrics.Failed,
globalCompactionMetrics.Queued,
globalFileStoreMetrics.files,
globalFileStoreMetrics.size,
}
}
@ -683,11 +687,11 @@ type compactionMetrics struct {
Failed *prometheus.CounterVec
}
type engineTags struct {
type EngineTags struct {
path, walPath, id, bucket, engineVersion string
}
func (et *engineTags) getLabels() prometheus.Labels {
func (et *EngineTags) getLabels() prometheus.Labels {
return prometheus.Labels{
"path": et.path,
"walPath": et.walPath,
@ -698,7 +702,7 @@ func (et *engineTags) getLabels() prometheus.Labels {
}
func engineLabelNames() []string {
emptyLabels := (&engineTags{}).getLabels()
emptyLabels := (&EngineTags{}).getLabels()
val := make([]string, 0, len(emptyLabels))
for k := range emptyLabels {
val = append(val, k)
@ -706,7 +710,7 @@ func engineLabelNames() []string {
return val
}
func newEngineMetrics(tags engineTags) *compactionMetrics {
func newEngineMetrics(tags EngineTags) *compactionMetrics {
engineLabels := tags.getLabels()
return &compactionMetrics{
Duration: globalCompactionMetrics.Duration.MustCurryWith(engineLabels),

View File

@ -19,11 +19,11 @@ import (
"time"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/file"
"github.com/influxdata/influxdb/v2/pkg/limiter"
"github.com/influxdata/influxdb/v2/pkg/metrics"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -148,12 +148,6 @@ type TSMFile interface {
Free() error
}
// Statistics gathered by the FileStore.
const (
statFileStoreBytes = "diskBytes"
statFileStoreCount = "numFiles"
)
var (
floatBlocksDecodedCounter = metrics.MustRegisterCounter("float_blocks_decoded", metrics.WithGroup(tsmGroup))
floatBlocksSizeCounter = metrics.MustRegisterCounter("float_blocks_size_bytes", metrics.WithGroup(tsmGroup))
@ -186,7 +180,7 @@ type FileStore struct {
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
stats *FileStoreStatistics
stats *fileStoreMetrics
purger *purger
currentTempDirID int
@ -232,7 +226,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
}
// NewFileStore returns a new instance of FileStore based on the given directory.
func NewFileStore(dir string) *FileStore {
func NewFileStore(dir string, tags EngineTags) *FileStore {
logger := zap.NewNop()
fs := &FileStore{
dir: dir,
@ -240,7 +234,7 @@ func NewFileStore(dir string) *FileStore {
logger: logger,
traceLogger: logger,
openLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
stats: &FileStoreStatistics{},
stats: newFileStoreMetrics(tags),
purger: &purger{
files: map[string]TSMFile{},
logger: logger,
@ -284,22 +278,60 @@ func (f *FileStore) WithLogger(log *zap.Logger) {
}
}
// FileStoreStatistics keeps statistics about the file store.
type FileStoreStatistics struct {
DiskBytes int64
FileCount int64
var globalFileStoreMetrics = newAllFileStoreMetrics()
const namespace = "storage"
const filesSubsystem = "tsm_files"
type allFileStoreMetrics struct {
files *prometheus.GaugeVec
size *prometheus.GaugeVec
}
// Statistics returns statistics for periodic monitoring.
func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "tsm1_filestore",
Tags: tags,
Values: map[string]interface{}{
statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes),
statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount),
},
}}
type fileStoreMetrics struct {
files prometheus.Gauge
size prometheus.Gauge
sizeAtomic int64
}
func (f *fileStoreMetrics) AddSize(n int64) {
val := atomic.AddInt64(&f.sizeAtomic, n)
f.size.Set(float64(val))
}
func (f *fileStoreMetrics) SetSize(n int64) {
atomic.StoreInt64(&f.sizeAtomic, n)
f.size.Set(float64(n))
}
func (f *fileStoreMetrics) SetFiles(n int64) {
f.files.Set(float64(n))
}
func newAllFileStoreMetrics() *allFileStoreMetrics {
labels := engineLabelNames()
return &allFileStoreMetrics{
files: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: filesSubsystem,
Name: "total",
Help: "Gauge of number of files per shard",
}, labels),
size: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: filesSubsystem,
Name: "disk_bytes",
Help: "Gauge of data size in bytes for each shard",
}, labels),
}
}
func newFileStoreMetrics(tags EngineTags) *fileStoreMetrics {
labels := tags.getLabels()
return &fileStoreMetrics{
files: globalFileStoreMetrics.files.With(labels),
size: globalFileStoreMetrics.size.With(labels),
}
}
// Count returns the number of TSM files currently loaded.
@ -598,9 +630,9 @@ func (f *FileStore) Open(ctx context.Context) error {
f.files = append(f.files, res.r)
// Accumulate file store size stats
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
f.stats.AddSize(int64(res.r.Size()))
if ts := res.r.TombstoneStats(); ts.TombstoneExists {
atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size))
f.stats.AddSize(int64(ts.Size))
}
// Re-initialize the lastModified time for the file store
@ -622,7 +654,7 @@ func (f *FileStore) Open(ctx context.Context) error {
close(readerC)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
f.stats.SetFiles(int64(len(f.files)))
return nil
}
@ -635,7 +667,8 @@ func (f *FileStore) Close() error {
f.lastFileStats = nil
f.files = nil
atomic.StoreInt64(&f.stats.FileCount, 0)
f.stats.SetFiles(0)
// Let other methods access this closed object while we do the actual closing.
f.mu.Unlock()
@ -651,7 +684,7 @@ func (f *FileStore) Close() error {
}
func (f *FileStore) DiskSizeBytes() int64 {
return atomic.LoadInt64(&f.stats.DiskBytes)
return atomic.LoadInt64(&f.stats.sizeAtomic)
}
// Read returns the slice of values for the given key and the given timestamp,
@ -917,7 +950,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
f.lastFileStats = nil
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
f.stats.SetFiles(int64(len(f.files)))
// Recalculate the disk size stat
var totalSize int64
@ -927,7 +960,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
totalSize += int64(ts.Size)
}
}
atomic.StoreInt64(&f.stats.DiskBytes, totalSize)
f.stats.SetSize(totalSize)
return nil
}

View File

@ -312,7 +312,7 @@ func TestFileStore_Array(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := tsm1.NewFileStore(dir, tsm1.EngineTags{})
files, err := newFiles(dir, tc.data...)
if err != nil {

View File

@ -18,7 +18,7 @@ import (
func TestFileStore_Read(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -55,7 +55,7 @@ func TestFileStore_Read(t *testing.T) {
func TestFileStore_SeekToAsc_FromStart(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -94,7 +94,7 @@ func TestFileStore_SeekToAsc_FromStart(t *testing.T) {
func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -167,7 +167,7 @@ func TestFileStore_SeekToAsc_Duplicate(t *testing.T) {
func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -208,7 +208,7 @@ func TestFileStore_SeekToAsc_BeforeStart(t *testing.T) {
func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -275,7 +275,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapFloat(t *testing.T) {
func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -341,7 +341,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapInteger(t *testing.T) {
func TestFileStore_SeekToAsc_BeforeStart_OverlapUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -407,7 +407,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapUnsigned(t *testing.T) {
func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -473,7 +473,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapBoolean(t *testing.T) {
func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -539,7 +539,7 @@ func TestFileStore_SeekToAsc_BeforeStart_OverlapString(t *testing.T) {
func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -619,7 +619,7 @@ func TestFileStore_SeekToAsc_OverlapMinFloat(t *testing.T) {
func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -698,7 +698,7 @@ func TestFileStore_SeekToAsc_OverlapMinInteger(t *testing.T) {
func TestFileStore_SeekToAsc_OverlapMinUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -777,7 +777,7 @@ func TestFileStore_SeekToAsc_OverlapMinUnsigned(t *testing.T) {
func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -856,7 +856,7 @@ func TestFileStore_SeekToAsc_OverlapMinBoolean(t *testing.T) {
func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -933,7 +933,7 @@ func TestFileStore_SeekToAsc_OverlapMinString(t *testing.T) {
func TestFileStore_SeekToAsc_Middle(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -991,7 +991,7 @@ func TestFileStore_SeekToAsc_Middle(t *testing.T) {
func TestFileStore_SeekToAsc_End(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1029,7 +1029,7 @@ func TestFileStore_SeekToAsc_End(t *testing.T) {
func TestFileStore_SeekToDesc_FromStart(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1067,7 +1067,7 @@ func TestFileStore_SeekToDesc_FromStart(t *testing.T) {
func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1126,7 +1126,7 @@ func TestFileStore_SeekToDesc_Duplicate(t *testing.T) {
func TestFileStore_SeekToDesc_OverlapMaxFloat(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1191,7 +1191,7 @@ func TestFileStore_SeekToDesc_OverlapMaxFloat(t *testing.T) {
func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1253,7 +1253,7 @@ func TestFileStore_SeekToDesc_OverlapMaxInteger(t *testing.T) {
func TestFileStore_SeekToDesc_OverlapMaxUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1316,7 +1316,7 @@ func TestFileStore_SeekToDesc_OverlapMaxUnsigned(t *testing.T) {
func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1379,7 +1379,7 @@ func TestFileStore_SeekToDesc_OverlapMaxBoolean(t *testing.T) {
func TestFileStore_SeekToDesc_OverlapMaxString(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1442,7 +1442,7 @@ func TestFileStore_SeekToDesc_OverlapMaxString(t *testing.T) {
func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1480,7 +1480,7 @@ func TestFileStore_SeekToDesc_AfterEnd(t *testing.T) {
func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 4 files
data := []keyValues{
@ -1577,7 +1577,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapFloat(t *testing.T) {
func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1654,7 +1654,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapInteger(t *testing.T) {
func TestFileStore_SeekToDesc_AfterEnd_OverlapUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1731,7 +1731,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapUnsigned(t *testing.T) {
func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1828,7 +1828,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapBoolean(t *testing.T) {
func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -1925,7 +1925,7 @@ func TestFileStore_SeekToDesc_AfterEnd_OverlapString(t *testing.T) {
func TestFileStore_SeekToDesc_Middle(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2002,7 +2002,7 @@ func TestFileStore_SeekToDesc_Middle(t *testing.T) {
func TestFileStore_SeekToDesc_End(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2040,7 +2040,7 @@ func TestFileStore_SeekToDesc_End(t *testing.T) {
func TestKeyCursor_TombstoneRange(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2084,7 +2084,7 @@ func TestKeyCursor_TombstoneRange(t *testing.T) {
func TestKeyCursor_TombstoneRange_PartialFirst(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2127,7 +2127,7 @@ func TestKeyCursor_TombstoneRange_PartialFirst(t *testing.T) {
func TestKeyCursor_TombstoneRange_PartialFloat(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2171,7 +2171,7 @@ func TestKeyCursor_TombstoneRange_PartialFloat(t *testing.T) {
func TestKeyCursor_TombstoneRange_PartialInteger(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2215,7 +2215,7 @@ func TestKeyCursor_TombstoneRange_PartialInteger(t *testing.T) {
func TestKeyCursor_TombstoneRange_PartialUnsigned(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2259,7 +2259,7 @@ func TestKeyCursor_TombstoneRange_PartialUnsigned(t *testing.T) {
func TestKeyCursor_TombstoneRange_PartialString(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2303,7 +2303,7 @@ func TestKeyCursor_TombstoneRange_PartialString(t *testing.T) {
func TestKeyCursor_TombstoneRange_PartialBoolean(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2360,7 +2360,7 @@ func TestFileStore_Open(t *testing.T) {
fatal(t, "creating test files", err)
}
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
if err := fs.Open(context.Background()); err != nil {
fatal(t, "opening file store", err)
}
@ -2391,7 +2391,7 @@ func TestFileStore_Remove(t *testing.T) {
fatal(t, "creating test files", err)
}
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
if err := fs.Open(context.Background()); err != nil {
fatal(t, "opening file store", err)
}
@ -2436,7 +2436,7 @@ func TestFileStore_Replace(t *testing.T) {
replacement := fmt.Sprintf("%s.%s", files[2], tsm1.TmpTSMFileExtension)
os.Rename(files[2], replacement)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
if err := fs.Open(context.Background()); err != nil {
fatal(t, "opening file store", err)
}
@ -2523,7 +2523,7 @@ func TestFileStore_Open_Deleted(t *testing.T) {
fatal(t, "creating test files", err)
}
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
if err := fs.Open(context.Background()); err != nil {
fatal(t, "opening file store", err)
}
@ -2537,7 +2537,7 @@ func TestFileStore_Open_Deleted(t *testing.T) {
fatal(t, "deleting", err)
}
fs2 := tsm1.NewFileStore(dir)
fs2 := newTestFileStore(dir)
if err := fs2.Open(context.Background()); err != nil {
fatal(t, "opening file store", err)
}
@ -2551,7 +2551,7 @@ func TestFileStore_Open_Deleted(t *testing.T) {
func TestFileStore_Delete(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2585,7 +2585,7 @@ func TestFileStore_Delete(t *testing.T) {
func TestFileStore_Apply(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2635,7 +2635,7 @@ func TestFileStore_Stats(t *testing.T) {
fatal(t, "creating test files", err)
}
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
if err := fs.Open(context.Background()); err != nil {
fatal(t, "opening file store", err)
}
@ -2697,7 +2697,7 @@ func TestFileStore_Stats(t *testing.T) {
func TestFileStore_CreateSnapshot(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
// Setup 3 files
data := []keyValues{
@ -2748,6 +2748,10 @@ func TestFileStore_CreateSnapshot(t *testing.T) {
}
}
func newTestFileStore(dir string) *tsm1.FileStore {
return tsm1.NewFileStore(dir, tsm1.EngineTags{})
}
type mockObserver struct {
fileFinishing func(path string) error
fileUnlinking func(path string) error
@ -2788,7 +2792,7 @@ func TestFileStore_Observer(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
fs.WithObserver(m)
// Setup 3 files
@ -2962,7 +2966,7 @@ func BenchmarkFileStore_Stats(b *testing.B) {
b.Fatalf("creating benchmark files %v", err)
}
fs := tsm1.NewFileStore(dir)
fs := newTestFileStore(dir)
fs.WithLogger(zaptest.NewLogger(b))
if err := fs.Open(context.Background()); err != nil {