test: replace influxlogger with zaptest logger (#20589)

pull/20307/head^2
Daniel Moran 2021-02-11 10:12:39 -05:00 committed by GitHub
parent 07e62e1cfd
commit 727a7b58c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 288 additions and 261 deletions

View File

@ -15,13 +15,21 @@ import (
)
var (
log = influxlogger.New(os.Stdout)
addr string
)
func main() {
logconf := influxlogger.NewConfig()
log, err := logconf.New(os.Stdout)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to configure logger: %v", err)
os.Exit(1)
}
prog := &cli.Program{
Run: run,
Run: func() error {
return run(log)
},
Name: "telemetryd",
Opts: []cli.Opt{
{
@ -32,6 +40,7 @@ func main() {
},
},
}
v := viper.New()
cmd := cli.NewCommand(v, prog)
@ -49,8 +58,8 @@ func main() {
os.Exit(exitCode)
}
func run() error {
log := log.With(zap.String("service", "telemetryd"))
func run(log *zap.Logger) error {
log = log.With(zap.String("service", "telemetryd"))
store := telemetry.NewLogStore(log)
svc := telemetry.NewPushGateway(log, store)
// Print data as line protocol

View File

@ -3,15 +3,14 @@ package gather
import (
"context"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/mock"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func TestScheduler(t *testing.T) {
@ -19,7 +18,7 @@ func TestScheduler(t *testing.T) {
totalGatherJobs := 3
// Create top level logger
logger := influxlogger.New(os.Stdout)
logger := zaptest.NewLogger(t)
ts := httptest.NewServer(&mockHTTPHandler{
responseMap: map[string]string{
"/metrics": sampleRespSmall,

View File

@ -71,14 +71,17 @@ func (h baseHandler) panic(w http.ResponseWriter, r *http.Request, rcv interface
h.HandleHTTPError(ctx, pe, w)
}
var panicLogger *zap.Logger
var panicLogger = zap.NewNop()
var panicLoggerOnce sync.Once
// getPanicLogger returns a logger for panicHandler.
func getPanicLogger() *zap.Logger {
panicLoggerOnce.Do(func() {
panicLogger = influxlogger.New(os.Stderr)
panicLogger = panicLogger.With(zap.String("handler", "panic"))
conf := influxlogger.NewConfig()
logger, err := conf.New(os.Stderr)
if err == nil {
panicLogger = logger.With(zap.String("handler", "panic"))
}
})
return panicLogger

View File

@ -129,14 +129,17 @@ func panicMW(api *kithttp.API) func(http.Handler) http.Handler {
}
}
var panicLogger *zap.Logger
var panicLogger = zap.NewNop()
var panicLoggerOnce sync.Once
// getPanicLogger returns a logger for panicHandler.
func getPanicLogger() *zap.Logger {
panicLoggerOnce.Do(func() {
panicLogger = influxlogger.New(os.Stderr)
panicLogger = panicLogger.With(zap.String("handler", "panic"))
conf := influxlogger.NewConfig()
logger, err := conf.New(os.Stderr)
if err == nil {
panicLogger = logger.With(zap.String("handler", "panic"))
}
})
return panicLogger

View File

@ -13,12 +13,6 @@ import (
const TimeFormat = "2006-01-02T15:04:05.000000Z07:00"
func New(w io.Writer) *zap.Logger {
config := NewConfig()
l, _ := config.New(w)
return l
}
func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) {
w := defaultOutput
format := c.Format

View File

@ -7,10 +7,10 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
@ -21,7 +21,7 @@ func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := NewSeriesFile(tmpDir)
sfile := NewSeriesFile(t, tmpDir)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -79,13 +79,15 @@ func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
}
// NewSeriesFile returns a new instance of SeriesFile with a temporary file path.
func NewSeriesFile(tmpDir string) *tsdb.SeriesFile {
func NewSeriesFile(tb testing.TB, tmpDir string) *tsdb.SeriesFile {
tb.Helper()
dir, err := ioutil.TempDir(tmpDir, "tsdb-series-file-")
if err != nil {
panic(err)
}
f := tsdb.NewSeriesFile(dir)
f.Logger = logger.New(os.Stdout)
f.Logger = zaptest.NewLogger(tb)
if err := f.Open(); err != nil {
panic(err)
}

View File

@ -21,20 +21,20 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/deep"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
"github.com/influxdata/influxql"
"go.uber.org/zap/zaptest"
)
// Ensure that deletes only sent to the WAL will clear out the data from the cache on restart
func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
if err := e.WritePointsString(
@ -70,7 +70,7 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
if err := e.WritePointsString(
@ -174,7 +174,7 @@ func seriesExist(e *Engine, m string, dims []string) (int, error) {
// Ensure that the engine can write & read shard digest files.
func TestEngine_Digest(t *testing.T) {
e := MustOpenEngine(tsi1.IndexName)
e := MustOpenEngine(t, tsi1.IndexName)
defer e.Close()
if err := e.Open(); err != nil {
@ -322,7 +322,7 @@ type span struct {
// Ensure engine handles concurrent calls to Digest().
func TestEngine_Digest_Concurrent(t *testing.T) {
e := MustOpenEngine(tsi1.IndexName)
e := MustOpenEngine(t, tsi1.IndexName)
defer e.Close()
if err := e.Open(); err != nil {
@ -748,7 +748,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -805,7 +805,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -861,7 +861,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -919,7 +919,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -977,7 +977,7 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -1037,7 +1037,7 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -1093,8 +1093,8 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
// Test that series id set gets updated and returned appropriately.
func TestIndex_SeriesIDSet(t *testing.T) {
test := func(index string) error {
engine := MustOpenEngine(index)
test := func(t *testing.T, index string) error {
engine := MustOpenEngine(t, index)
defer engine.Close()
// Add some series.
@ -1180,7 +1180,7 @@ func TestIndex_SeriesIDSet(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
if err := test(t, index); err != nil {
t.Error(err)
}
})
@ -1197,7 +1197,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
e, err := NewEngine(index)
e, err := NewEngine(t, index)
if err != nil {
t.Fatal(err)
}
@ -1252,7 +1252,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
p7 := MustParsePointString("mem,host=C value=1.3 1000000000") // Should not be deleted
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
e, err := NewEngine(index)
e, err := NewEngine(t, index)
if err != nil {
t.Fatal(err)
}
@ -1362,7 +1362,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
e, err := NewEngine(index)
e, err := NewEngine(t, index)
if err != nil {
t.Fatal(err)
}
@ -1488,7 +1488,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) {
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
e, err := NewEngine(index)
e, err := NewEngine(t, index)
if err != nil {
t.Fatal(err)
}
@ -1574,7 +1574,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) {
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
e, err := NewEngine(index)
e, err := NewEngine(t, index)
if err != nil {
t.Fatal(err)
}
@ -1693,7 +1693,7 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") // Should not be deleted
e, err := NewEngine(index)
e, err := NewEngine(t, index)
if err != nil {
t.Fatal(err)
}
@ -1774,7 +1774,7 @@ func TestEngine_LastModified(t *testing.T) {
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
e, err := NewEngine(index)
e, err := NewEngine(t, index)
if err != nil {
t.Fatal(err)
}
@ -1865,7 +1865,7 @@ func TestEngine_SnapshotsDisabled(t *testing.T) {
func TestEngine_ShouldCompactCache(t *testing.T) {
nowTime := time.Now()
e, err := NewEngine(tsi1.IndexName)
e, err := NewEngine(t, tsi1.IndexName)
if err != nil {
t.Fatal(err)
}
@ -1910,7 +1910,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -1970,7 +1970,7 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -2052,7 +2052,7 @@ func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
var wg sync.WaitGroup
@ -2097,7 +2097,7 @@ func TestEngine_WritePoints_TypeConflict(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
if err := e.WritePointsString(
@ -2133,7 +2133,7 @@ func TestEngine_WritePoints_Reload(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
e := MustOpenEngine(t, index)
defer e.Close()
if err := e.WritePointsString(
@ -2176,7 +2176,7 @@ func TestEngine_Invalid_UTF8(t *testing.T) {
field := []byte{255, 110, 101, 116} // A known invalid UTF-8 string
p := MustParsePointString(fmt.Sprintf("%s,host=A %s=1.1 6000000000", name, field))
e, err := NewEngine(index)
e, err := NewEngine(t, index)
if err != nil {
t.Fatal(err)
}
@ -2207,7 +2207,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) {
batchSizes := []int{10, 100, 1000, 5000, 10000}
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(index)
e := MustOpenEngine(b, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
pp := make([]models.Point, 0, sz)
for i := 0; i < sz; i++ {
@ -2233,7 +2233,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
batchSizes := []int{1000, 5000, 10000, 25000, 50000, 75000, 100000, 200000}
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(index)
e := MustOpenEngine(b, index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
cpus := runtime.GOMAXPROCS(0)
@ -2412,7 +2412,7 @@ var benchmarkVariants = []struct {
func BenchmarkEngine_CreateIterator(b *testing.B) {
engines := make([]*benchmarkEngine, len(sizes))
for i, size := range sizes {
engines[i] = MustInitDefaultBenchmarkEngine(size.name, size.sz)
engines[i] = MustInitDefaultBenchmarkEngine(b, size.name, size.sz)
}
for _, tt := range benchmarks {
@ -2458,13 +2458,13 @@ var (
// MustInitDefaultBenchmarkEngine creates a new engine using the default index
// and fills it with points. Reuses previous engine if the same parameters
// were used.
func MustInitDefaultBenchmarkEngine(name string, pointN int) *benchmarkEngine {
func MustInitDefaultBenchmarkEngine(tb testing.TB, name string, pointN int) *benchmarkEngine {
const batchSize = 1000
if pointN%batchSize != 0 {
panic(fmt.Sprintf("point count (%d) must be a multiple of batch size (%d)", pointN, batchSize))
}
e := MustOpenEngine(tsdb.DefaultIndex)
e := MustOpenEngine(tb, tsdb.DefaultIndex)
// Initialize metadata.
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
@ -2516,7 +2516,9 @@ type Engine struct {
}
// NewEngine returns a new instance of Engine at a temporary location.
func NewEngine(index string) (*Engine, error) {
func NewEngine(tb testing.TB, index string) (*Engine, error) {
tb.Helper()
root, err := ioutil.TempDir("", "tsm1-")
if err != nil {
panic(err)
@ -2531,7 +2533,7 @@ func NewEngine(index string) (*Engine, error) {
// Setup series file.
sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory))
sfile.Logger = logger.New(os.Stdout)
sfile.Logger = zaptest.NewLogger(tb)
if err = sfile.Open(); err != nil {
return nil, err
}
@ -2559,8 +2561,10 @@ func NewEngine(index string) (*Engine, error) {
}
// MustOpenEngine returns a new, open instance of Engine.
func MustOpenEngine(index string) *Engine {
e, err := NewEngine(index)
func MustOpenEngine(tb testing.TB, index string) *Engine {
tb.Helper()
e, err := NewEngine(tb, index)
if err != nil {
panic(err)
}

View File

@ -12,8 +12,8 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"go.uber.org/zap/zaptest"
)
func TestFileStore_Read(t *testing.T) {
@ -2964,9 +2964,7 @@ func BenchmarkFileStore_Stats(b *testing.B) {
}
fs := tsm1.NewFileStore(dir)
if testing.Verbose() {
fs.WithLogger(logger.New(os.Stderr))
}
fs.WithLogger(zaptest.NewLogger(b))
if err := fs.Open(); err != nil {
b.Fatalf("opening file store %v", err)

View File

@ -1,14 +1,13 @@
package tsm1
import (
"os"
"runtime"
"testing"
"time"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxql"
"go.uber.org/zap/zaptest"
)
func BenchmarkIntegerIterator_Next(b *testing.B) {
@ -71,7 +70,7 @@ func TestFinalizerIterator(t *testing.T) {
step3 = make(chan struct{})
)
l := logger.New(os.Stderr)
l := zaptest.NewLogger(t)
done := make(chan struct{})
func() {
itr := &testFinalizerIterator{

View File

@ -2,12 +2,11 @@ package tsi1_test
import (
"bytes"
"os"
"testing"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
"go.uber.org/zap/zaptest"
)
func TestSQLIndexExporter_ExportIndex(t *testing.T) {
@ -40,7 +39,7 @@ COMMIT;
var buf bytes.Buffer
e := tsi1.NewSQLIndexExporter(&buf)
e.ShowSchema = false
e.Logger = logger.New(os.Stderr)
e.Logger = zaptest.NewLogger(t)
if err := e.ExportIndex(idx.Index); err != nil {
t.Fatal(err)
} else if err := e.Close(); err != nil {

View File

@ -12,12 +12,12 @@ import (
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/internal"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
"github.com/influxdata/influxql"
"go.uber.org/zap/zaptest"
)
// Ensure iterator can merge multiple iterators together.
@ -60,7 +60,7 @@ func TestIndexSet_MeasurementNamesByExpr(t *testing.T) {
// Setup indexes
indexes := map[string]*Index{}
for _, name := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(name)
idx := MustOpenNewIndex(t, name)
idx.AddSeries("cpu", map[string]string{"region": "east"})
idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"})
idx.AddSeries("disk", map[string]string{"secret": "foo"})
@ -140,7 +140,7 @@ func TestIndexSet_MeasurementNamesByPredicate(t *testing.T) {
// Setup indexes
indexes := map[string]*Index{}
for _, name := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(name)
idx := MustOpenNewIndex(t, name)
idx.AddSeries("cpu", map[string]string{"region": "east"})
idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"})
idx.AddSeries("disk", map[string]string{"secret": "foo"})
@ -271,7 +271,7 @@ func TestIndex_Sketches(t *testing.T) {
}
test := func(t *testing.T, index string) error {
idx := MustNewIndex(index)
idx := MustNewIndex(t, index)
if index, ok := idx.Index.(*tsi1.Index); ok {
// Override the log file max size to force a log file compaction sooner.
// This way, we will test the sketches are correct when they have been
@ -359,7 +359,9 @@ var DisableTSICache = func() EngineOption {
// everything under the same root directory so it can be cleanly removed on Close.
//
// The index will not be opened.
func MustNewIndex(index string, eopts ...EngineOption) *Index {
func MustNewIndex(tb testing.TB, index string, eopts ...EngineOption) *Index {
tb.Helper()
opts := tsdb.NewEngineOptions()
opts.IndexVersion = index
@ -386,10 +388,7 @@ func MustNewIndex(index string, eopts ...EngineOption) *Index {
if err != nil {
panic(err)
}
if testing.Verbose() {
i.WithLogger(logger.New(os.Stderr))
}
i.WithLogger(zaptest.NewLogger(tb))
idx := &Index{
Index: i,
@ -402,8 +401,10 @@ func MustNewIndex(index string, eopts ...EngineOption) *Index {
// MustOpenNewIndex will initialize a new index using the provide type and opens
// it.
func MustOpenNewIndex(index string, opts ...EngineOption) *Index {
idx := MustNewIndex(index, opts...)
func MustOpenNewIndex(tb testing.TB, index string, opts ...EngineOption) *Index {
tb.Helper()
idx := MustNewIndex(tb, index, opts...)
idx.MustOpen()
return idx
}
@ -532,7 +533,7 @@ func BenchmarkIndexSet_TagSets(b *testing.B) {
b.Run("1M series", func(b *testing.B) {
b.ReportAllocs()
for _, indexType := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(indexType)
idx := MustOpenNewIndex(b, indexType)
setup(idx)
name := []byte("m4")
@ -612,9 +613,9 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
runBenchmark := func(b *testing.B, index string, queryN int, useTSICache bool) {
var idx *Index
if !useTSICache {
idx = MustOpenNewIndex(index, DisableTSICache())
idx = MustOpenNewIndex(b, index, DisableTSICache())
} else {
idx = MustOpenNewIndex(index)
idx = MustOpenNewIndex(b, index)
}
var wg sync.WaitGroup
@ -674,7 +675,7 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
}
// Re-open everything
idx = MustOpenNewIndex(index)
idx = MustOpenNewIndex(b, index)
wg.Add(1)
begin = make(chan struct{})
once = sync.Once{}

View File

@ -8,9 +8,9 @@ import (
"path"
"testing"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
)
@ -55,7 +55,7 @@ func TestParseSeriesKeyInto(t *testing.T) {
func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) {
f := NewBrokenSeriesFile([]byte{0, 0, 0, 0, 0})
defer f.Close()
f.Logger = logger.New(os.Stdout)
f.Logger = zaptest.NewLogger(t)
err := f.Open()
@ -66,7 +66,7 @@ func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) {
// Ensure series file contains the correct set of series.
func TestSeriesFile_Series(t *testing.T) {
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
series := []Series{
@ -100,7 +100,7 @@ func TestSeriesFile_Series(t *testing.T) {
// Ensure series file can be compacted.
func TestSeriesFileCompactor(t *testing.T) {
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
// Disable automatic compactions.
@ -141,7 +141,7 @@ func TestSeriesFileCompactor(t *testing.T) {
// Ensure series file deletions persist across compactions.
func TestSeriesFile_DeleteSeriesID(t *testing.T) {
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil})
@ -176,7 +176,7 @@ func TestSeriesFile_DeleteSeriesID(t *testing.T) {
}
func TestSeriesFile_Compaction(t *testing.T) {
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
// Generate a bunch of keys.
@ -261,7 +261,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {
const n = 1000000
if cachedCompactionSeriesFile == nil {
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(b)
// Generate a bunch of keys.
var ids []uint64
@ -342,9 +342,11 @@ func NewBrokenSeriesFile(content []byte) *SeriesFile {
}
// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error.
func MustOpenSeriesFile() *SeriesFile {
func MustOpenSeriesFile(tb testing.TB) *SeriesFile {
tb.Helper()
f := NewSeriesFile()
f.Logger = logger.New(os.Stdout)
f.Logger = zaptest.NewLogger(tb)
if err := f.Open(); err != nil {
panic(err)
}

View File

@ -13,16 +13,16 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxql"
"go.uber.org/zap/zaptest"
)
func TestShard_MapType(t *testing.T) {
var sh *TempShard
setup := func(index string) {
sh = NewTempShard(index)
sh = NewTempShard(t, index)
if err := sh.Open(); err != nil {
t.Fatal(err)
@ -155,7 +155,7 @@ _reserved,region=uswest value="foo" 0
func TestShard_MeasurementsByRegex(t *testing.T) {
var sh *TempShard
setup := func(index string) {
sh = NewTempShard(index)
sh = NewTempShard(t, index)
if err := sh.Open(); err != nil {
t.Fatal(err)
}
@ -212,7 +212,9 @@ type TempShard struct {
}
// NewTempShard returns a new instance of TempShard with temp paths.
func NewTempShard(index string) *TempShard {
func NewTempShard(tb testing.TB, index string) *TempShard {
tb.Helper()
// Create temporary path for data and WAL.
dir, err := ioutil.TempDir("", "influxdb-tsdb-")
if err != nil {
@ -221,7 +223,7 @@ func NewTempShard(index string) *TempShard {
// Create series file.
sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileDirectory))
sfile.Logger = logger.New(os.Stdout)
sfile.Logger = zaptest.NewLogger(tb)
if err := sfile.Open(); err != nil {
panic(err)
}

View File

@ -37,7 +37,7 @@ func TestShardWriteAndIndex(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -106,7 +106,7 @@ func TestShardRebuildIndex(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -184,7 +184,7 @@ func TestShard_Open_CorruptFieldsIndex(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -234,7 +234,7 @@ func TestWriteTimeTag(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -284,7 +284,7 @@ func TestWriteTimeField(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -319,7 +319,7 @@ func TestShardWriteAddNewField(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -371,7 +371,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -459,7 +459,7 @@ func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -609,7 +609,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
opts := tsdb.NewEngineOptions()
@ -649,7 +649,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
func TestShard_CreateIterator_Ascending(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
sh := NewShard(index)
sh := NewShard(t, index)
defer sh.Close()
// Calling CreateIterator when the engine is not open will return
@ -732,8 +732,8 @@ func TestShard_CreateIterator_Descending(t *testing.T) {
var sh *Shard
var itr query.Iterator
test := func(index string) {
sh = NewShard(index)
test := func(t *testing.T, index string) {
sh = NewShard(t, index)
// Calling CreateIterator when the engine is not open will return
// ErrEngineClosed.
@ -808,7 +808,7 @@ cpu,host=serverB,region=uswest value=25 0
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
sh.Close()
itr.Close()
}
@ -834,8 +834,8 @@ func TestShard_CreateIterator_Series_Auth(t *testing.T) {
},
}
test := func(index string, v variant) error {
sh := MustNewOpenShard(index)
test := func(t *testing.T, index string, v variant) error {
sh := MustNewOpenShard(t, index)
defer sh.Close()
sh.MustWritePointsString(`
cpu,host=serverA,region=uswest value=100 0
@ -952,7 +952,7 @@ cpu,secret=foo value=100 0
for _, index := range tsdb.RegisteredIndexes() {
for _, example := range examples {
t.Run(index+"_"+example.name, func(t *testing.T) {
if err := test(index, example); err != nil {
if err := test(t, index, example); err != nil {
t.Fatal(err)
}
})
@ -963,8 +963,8 @@ cpu,secret=foo value=100 0
func TestShard_Disabled_WriteQuery(t *testing.T) {
var sh *Shard
test := func(index string) {
sh = NewShard(index)
test := func(t *testing.T, index string) {
sh = NewShard(t, index)
if err := sh.Open(); err != nil {
t.Fatal(err)
}
@ -1007,15 +1007,15 @@ func TestShard_Disabled_WriteQuery(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
sh.Close()
}
}
func TestShard_Closed_Functions(t *testing.T) {
var sh *Shard
test := func(index string) {
sh = NewShard(index)
test := func(t *testing.T, index string) {
sh = NewShard(t, index)
if err := sh.Open(); err != nil {
t.Fatal(err)
}
@ -1040,18 +1040,18 @@ func TestShard_Closed_Functions(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
func TestShard_FieldDimensions(t *testing.T) {
var sh *Shard
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(t)
defer sfile.Close()
setup := func(index string) {
sh = NewShard(index)
sh = NewShard(t, index)
if err := sh.Open(); err != nil {
t.Fatal(err)
@ -1167,7 +1167,7 @@ func TestShards_FieldKeysByMeasurement(t *testing.T) {
var shards Shards
setup := func(index string) {
shards = NewShards(index, 2)
shards = NewShards(t, index, 2)
shards.MustOpen()
shards[0].MustWritePointsString(`cpu,host=serverA,region=uswest a=2.2,b=33.3,value=100 0`)
@ -1203,7 +1203,7 @@ func TestShards_FieldDimensions(t *testing.T) {
var shard1, shard2 *Shard
setup := func(index string) {
shard1 = NewShard(index)
shard1 = NewShard(t, index)
if err := shard1.Open(); err != nil {
t.Fatal(err)
}
@ -1214,7 +1214,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
`)
shard2 = NewShard(index)
shard2 = NewShard(t, index)
if err := shard2.Open(); err != nil {
t.Fatal(err)
}
@ -1328,7 +1328,7 @@ func TestShards_MapType(t *testing.T) {
var shard1, shard2 *Shard
setup := func(index string) {
shard1 = NewShard(index)
shard1 = NewShard(t, index)
if err := shard1.Open(); err != nil {
t.Fatal(err)
}
@ -1339,7 +1339,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
`)
shard2 = NewShard(index)
shard2 = NewShard(t, index)
if err := shard2.Open(); err != nil {
t.Fatal(err)
}
@ -1467,7 +1467,7 @@ func TestShards_MeasurementsByRegex(t *testing.T) {
var shard1, shard2 *Shard
setup := func(index string) {
shard1 = NewShard(index)
shard1 = NewShard(t, index)
if err := shard1.Open(); err != nil {
t.Fatal(err)
}
@ -1478,7 +1478,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
`)
shard2 = NewShard(index)
shard2 = NewShard(t, index)
if err := shard2.Open(); err != nil {
t.Fatal(err)
}
@ -1828,7 +1828,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
b.StopTimer()
b.ResetTimer()
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(b)
defer sfile.Close()
// Run the benchmark loop.
@ -1866,7 +1866,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
}
}
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(b)
defer sfile.Close()
shard, tmpDir, err := openShard(sfile)
@ -1912,7 +1912,7 @@ func benchmarkWritePointsExistingSeriesFields(b *testing.B, mCnt, tkCnt, tvCnt,
}
}
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(b)
defer func() {
_ = sfile.Close()
}()
@ -1957,7 +1957,7 @@ func benchmarkWritePointsExistingSeriesEqualBatches(b *testing.B, mCnt, tkCnt, t
}
}
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(b)
defer sfile.Close()
shard, tmpDir, err := openShard(sfile)
@ -2037,7 +2037,7 @@ func BenchmarkCreateIterator(b *testing.B) {
var shards Shards
for i := 1; i <= 5; i++ {
name := fmt.Sprintf("%s_shards_%d", index, i)
shards = NewShards(index, i)
shards = NewShards(b, index, i)
shards.MustOpen()
setup(index, shards)
@ -2111,13 +2111,15 @@ type Shard struct {
type Shards []*Shard
// NewShard returns a new instance of Shard with temp paths.
func NewShard(index string) *Shard {
return NewShards(index, 1)[0]
func NewShard(tb testing.TB, index string) *Shard {
tb.Helper()
return NewShards(tb, index, 1)[0]
}
// MustNewOpenShard creates and opens a shard with the provided index.
func MustNewOpenShard(index string) *Shard {
sh := NewShard(index)
func MustNewOpenShard(tb testing.TB, index string) *Shard {
tb.Helper()
sh := NewShard(tb, index)
if err := sh.Open(); err != nil {
panic(err)
}
@ -2136,14 +2138,16 @@ func (sh *Shard) Close() error {
}
// NewShards create several shards all sharing the same
func NewShards(index string, n int) Shards {
func NewShards(tb testing.TB, index string, n int) Shards {
tb.Helper()
// Create temporary path for data and WAL.
dir, err := ioutil.TempDir("", "influxdb-tsdb-")
if err != nil {
panic(err)
}
sfile := MustOpenSeriesFile()
sfile := MustOpenSeriesFile(tb)
var shards []*Shard
var idSets []*tsdb.SeriesIDSet

View File

@ -22,20 +22,20 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/internal"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/deep"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxql"
"go.uber.org/zap/zaptest"
)
// Ensure the store can delete a retention policy and all shards under
// it.
func TestStore_DeleteRetentionPolicy(t *testing.T) {
test := func(index string) {
s := MustOpenStore(index)
test := func(t *testing.T, index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
@ -86,7 +86,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
}
// Reopen other shard and check it still exists.
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Error(err)
} else if sh := s.Shard(3); sh == nil {
t.Errorf("shard 3 does not exist")
@ -102,15 +102,15 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
// Ensure the store can create a new shard.
func TestStore_CreateShard(t *testing.T) {
test := func(index string) {
s := MustOpenStore(index)
test := func(t *testing.T, index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
@ -128,7 +128,7 @@ func TestStore_CreateShard(t *testing.T) {
}
// Reopen shard and recheck.
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard(1)")
@ -138,14 +138,14 @@ func TestStore_CreateShard(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
func TestStore_CreateMixedShards(t *testing.T) {
test := func(index1 string, index2 string) {
s := MustOpenStore(index1)
test := func(t *testing.T, index1 string, index2 string) {
s := MustOpenStore(t, index1)
defer s.Close()
// Create a new shard and verify that it exists.
@ -157,7 +157,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
s.EngineOptions.IndexVersion = index2
s.index = index2
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
}
@ -169,7 +169,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
}
// Reopen shard and recheck.
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard(1)")
@ -193,14 +193,14 @@ func TestStore_CreateMixedShards(t *testing.T) {
j := (i + 1) % len(indexes)
index1 := indexes[i]
index2 := indexes[j]
t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) })
t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(t, index1, index2) })
}
}
func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
test := func(index string) {
s := MustOpenStore(index)
test := func(t *testing.T, index string) {
s := MustOpenStore(t, index)
defer s.Close()
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
@ -255,14 +255,14 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
func TestStore_WriteMixedShards(t *testing.T) {
test := func(index1 string, index2 string) {
s := MustOpenStore(index1)
test := func(t *testing.T, index1 string, index2 string) {
s := MustOpenStore(t, index1)
defer s.Close()
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
@ -273,7 +273,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
s.EngineOptions.IndexVersion = index2
s.index = index2
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
}
@ -326,15 +326,15 @@ func TestStore_WriteMixedShards(t *testing.T) {
j := (i + 1) % len(indexes)
index1 := indexes[i]
index2 := indexes[j]
t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) })
t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(t, index1, index2) })
}
}
// Ensure the store does not return an error when delete from a non-existent db.
func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
test := func(index string) {
s := MustOpenStore(index)
test := func(t *testing.T, index string) {
s := MustOpenStore(t, index)
defer s.Close()
if err := s.DeleteSeries("db0", nil, nil); err != nil {
@ -343,15 +343,15 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
// Ensure the store can delete an existing shard.
func TestStore_DeleteShard(t *testing.T) {
test := func(index string) error {
s := MustOpenStore(index)
test := func(t *testing.T, index string) error {
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
@ -383,7 +383,7 @@ func TestStore_DeleteShard(t *testing.T) {
s.MustWriteToShardString(3, "cpu,serverb=b v=1")
// Reopen the store and check all shards still exist
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
return err
}
for i := uint64(1); i <= 3; i++ {
@ -428,7 +428,7 @@ func TestStore_DeleteShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
if err := test(t, index); err != nil {
t.Error(err)
}
})
@ -438,8 +438,8 @@ func TestStore_DeleteShard(t *testing.T) {
// Ensure the store can create a snapshot to a shard.
func TestStore_CreateShardSnapShot(t *testing.T) {
test := func(index string) {
s := MustOpenStore(index)
test := func(t *testing.T, index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
@ -459,14 +459,14 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
func TestStore_Open(t *testing.T) {
test := func(index string) {
s := NewStore(index)
test := func(t *testing.T, index string) {
s := NewStore(t, index)
defer s.Close()
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
@ -500,15 +500,15 @@ func TestStore_Open(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
// Ensure the store reports an error when it can't open a database directory.
func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
test := func(index string) {
s := NewStore(index)
test := func(t *testing.T, index string) {
s := NewStore(t, index)
defer s.Close()
// Create a file instead of a directory for a database.
@ -525,15 +525,15 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
// Ensure the store reports an error when it can't open a retention policy.
func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
test := func(index string) {
s := NewStore(index)
test := func(t *testing.T, index string) {
s := NewStore(t, index)
defer s.Close()
// Create an RP file instead of a directory.
@ -554,15 +554,15 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
// Ensure the store reports an error when it can't open a retention policy.
func TestStore_Open_InvalidShard(t *testing.T) {
test := func(index string) {
s := NewStore(index)
test := func(t *testing.T, index string) {
s := NewStore(t, index)
defer s.Close()
// Create a non-numeric shard file.
@ -583,15 +583,15 @@ func TestStore_Open_InvalidShard(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
// Ensure shards can create iterators.
func TestShards_CreateIterator(t *testing.T) {
test := func(index string) {
s := MustOpenStore(index)
test := func(t *testing.T, index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create shard #0 with data.
@ -666,14 +666,14 @@ func TestShards_CreateIterator(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
// Ensure the store can backup a shard and another store can restore it.
func TestStore_BackupRestoreShard(t *testing.T) {
test := func(index string) {
s0, s1 := MustOpenStore(index), MustOpenStore(index)
test := func(t *testing.T, index string) {
s0, s1 := MustOpenStore(t, index), MustOpenStore(t, index)
defer s0.Close()
defer s1.Close()
@ -684,7 +684,7 @@ func TestStore_BackupRestoreShard(t *testing.T) {
`cpu value=3 20`,
)
if err := s0.Reopen(); err != nil {
if err := s0.Reopen(t); err != nil {
t.Fatal(err)
}
@ -742,14 +742,14 @@ func TestStore_BackupRestoreShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
test(index)
test(t, index)
})
}
}
func TestStore_Shard_SeriesN(t *testing.T) {
test := func(index string) error {
s := MustOpenStore(index)
test := func(t *testing.T, index string) error {
s := MustOpenStore(t, index)
defer s.Close()
// Create shard with data.
@ -774,7 +774,7 @@ func TestStore_Shard_SeriesN(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
if err := test(t, index); err != nil {
t.Error(err)
}
})
@ -783,8 +783,8 @@ func TestStore_Shard_SeriesN(t *testing.T) {
func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
test := func(index string) {
s := MustOpenStore(index)
test := func(t *testing.T, index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create shard with data.
@ -816,7 +816,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
@ -883,8 +883,8 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) {
t.Skip("Skipping test in short, race, circleci and appveyor mode.")
}
test := func(index string) {
store := NewStore(index)
test := func(t *testing.T, index string) {
store := NewStore(t, index)
if err := store.Open(); err != nil {
panic(err)
}
@ -893,7 +893,7 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
@ -947,8 +947,8 @@ func TestStore_Cardinality_Unique(t *testing.T) {
t.Skip("Skipping test in short, race, circleci and appveyor mode.")
}
test := func(index string) {
store := NewStore(index)
test := func(t *testing.T, index string) {
store := NewStore(t, index)
if err := store.Open(); err != nil {
panic(err)
}
@ -957,7 +957,7 @@ func TestStore_Cardinality_Unique(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
@ -1027,8 +1027,8 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
t.Skip("Skipping test in short, race, circleci and appveyor mode.")
}
test := func(index string) {
store := NewStore(index)
test := func(t *testing.T, index string) {
store := NewStore(t, index)
if err := store.Open(); err != nil {
panic(err)
}
@ -1037,7 +1037,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) { test(t, index) })
}
}
@ -1094,8 +1094,8 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
t.Skip("Skipping test in short, race, circleci and appveyor mode.")
}
test := func(index string) error {
store := NewStore(index)
test := func(t *testing.T, index string) error {
store := NewStore(t, index)
if err := store.Open(); err != nil {
panic(err)
}
@ -1105,7 +1105,7 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
if err := test(t, index); err != nil {
t.Fatal(err)
}
})
@ -1156,8 +1156,8 @@ func TestStore_Sketches(t *testing.T) {
return nil
}
test := func(index string) error {
store := MustOpenStore(index)
test := func(t *testing.T, index string) error {
store := MustOpenStore(t, index)
defer store.Close()
// Generate point data to write to the shards.
@ -1186,7 +1186,7 @@ func TestStore_Sketches(t *testing.T) {
}
// Reopen the store.
if err := store.Reopen(); err != nil {
if err := store.Reopen(t); err != nil {
return err
}
@ -1216,7 +1216,7 @@ func TestStore_Sketches(t *testing.T) {
}
// Reopen the store.
if err := store.Reopen(); err != nil {
if err := store.Reopen(t); err != nil {
return err
}
@ -1231,7 +1231,7 @@ func TestStore_Sketches(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
if err := test(t, index); err != nil {
t.Fatal(err)
}
})
@ -1323,8 +1323,8 @@ func TestStore_TagValues(t *testing.T) {
}
var s *Store
setup := func(index string) []uint64 { // returns shard ids
s = MustOpenStore(index)
setup := func(t *testing.T, index string) []uint64 { // returns shard ids
s = MustOpenStore(t, index)
fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
cpu1%[1]d,host=nofoo value=1 %[4]d
@ -1354,8 +1354,8 @@ func TestStore_TagValues(t *testing.T) {
for _, example := range examples {
for _, index := range tsdb.RegisteredIndexes() {
shardIDs := setup(index)
t.Run(example.Name+"_"+index, func(t *testing.T) {
shardIDs := setup(t, index)
got, err := s.TagValues(nil, shardIDs, example.Expr)
if err != nil {
t.Fatal(err)
@ -1373,8 +1373,8 @@ func TestStore_TagValues(t *testing.T) {
func TestStore_Measurements_Auth(t *testing.T) {
test := func(index string) error {
s := MustOpenStore(index)
test := func(t *testing.T, index string) error {
s := MustOpenStore(t, index)
defer s.Close()
// Create shard #0 with data.
@ -1451,7 +1451,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
if err := test(t, index); err != nil {
t.Fatal(err)
}
})
@ -1461,8 +1461,8 @@ func TestStore_Measurements_Auth(t *testing.T) {
func TestStore_TagKeys_Auth(t *testing.T) {
test := func(index string) error {
s := MustOpenStore(index)
test := func(t *testing.T, index string) error {
s := MustOpenStore(t, index)
defer s.Close()
// Create shard #0 with data.
@ -1548,7 +1548,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
if err := test(t, index); err != nil {
t.Fatal(err)
}
})
@ -1558,8 +1558,8 @@ func TestStore_TagKeys_Auth(t *testing.T) {
func TestStore_TagValues_Auth(t *testing.T) {
test := func(index string) error {
s := MustOpenStore(index)
test := func(t *testing.T, index string) error {
s := MustOpenStore(t, index)
defer s.Close()
// Create shard #0 with data.
@ -1657,7 +1657,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
if err := test(t, index); err != nil {
t.Fatal(err)
}
})
@ -1690,7 +1690,7 @@ func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues {
func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
shardN := 10
@ -1775,7 +1775,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
shardN := 10
@ -1866,7 +1866,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
shardN := 10
@ -1970,7 +1970,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
for _, index := range tsdb.RegisteredIndexes() {
store := NewStore(index)
store := NewStore(b, index)
if err := store.Open(); err != nil {
panic(err)
}
@ -2001,7 +2001,7 @@ func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
var store *Store
setup := func(index string) error {
store := MustOpenStore(index)
store := MustOpenStore(b, index)
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
@ -2073,7 +2073,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
var s *Store
setup := func(shards, measurements, tagValues int, index string, useRandom bool) []uint64 { // returns shard ids
s := NewStore(index)
s := NewStore(b, index)
if err := s.Open(); err != nil {
panic(err)
}
@ -2179,7 +2179,9 @@ type Store struct {
}
// NewStore returns a new instance of Store with a temporary path.
func NewStore(index string) *Store {
func NewStore(tb testing.TB, index string) *Store {
tb.Helper()
path, err := ioutil.TempDir("", "influxdb-tsdb-")
if err != nil {
panic(err)
@ -2189,18 +2191,17 @@ func NewStore(index string) *Store {
s.EngineOptions.IndexVersion = index
s.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
s.EngineOptions.Config.TraceLoggingEnabled = true
if testing.Verbose() {
s.WithLogger(logger.New(os.Stdout))
}
s.WithLogger(zaptest.NewLogger(tb))
return s
}
// MustOpenStore returns a new, open Store using the specified index,
// at a temporary path.
func MustOpenStore(index string) *Store {
s := NewStore(index)
func MustOpenStore(tb testing.TB, index string) *Store {
tb.Helper()
s := NewStore(tb, index)
if err := s.Open(); err != nil {
panic(err)
@ -2209,7 +2210,9 @@ func MustOpenStore(index string) *Store {
}
// Reopen closes and reopens the store as a new store.
func (s *Store) Reopen() error {
func (s *Store) Reopen(tb testing.TB) error {
tb.Helper()
if err := s.Store.Close(); err != nil {
return err
}
@ -2218,10 +2221,8 @@ func (s *Store) Reopen() error {
s.EngineOptions.IndexVersion = s.index
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
s.EngineOptions.Config.TraceLoggingEnabled = true
s.WithLogger(zaptest.NewLogger(tb))
if testing.Verbose() {
s.WithLogger(logger.New(os.Stdout))
}
return s.Store.Open()
}

View File

@ -2,14 +2,13 @@ package precreator_test
import (
"context"
"os"
"testing"
"time"
"github.com/influxdata/influxdb/v2/internal"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/toml"
"github.com/influxdata/influxdb/v2/v1/services/precreator"
"go.uber.org/zap/zaptest"
)
func TestShardPrecreation(t *testing.T) {
@ -25,7 +24,7 @@ func TestShardPrecreation(t *testing.T) {
return nil
}
s := NewTestService()
s := NewTestService(t)
s.MetaClient = &mc
if err := s.Open(context.Background()); err != nil {
@ -46,11 +45,13 @@ func TestShardPrecreation(t *testing.T) {
}
}
func NewTestService() *precreator.Service {
func NewTestService(tb testing.TB) *precreator.Service {
tb.Helper()
config := precreator.NewConfig()
config.CheckInterval = toml.Duration(10 * time.Millisecond)
s := precreator.NewService(config)
s.WithLogger(logger.New(os.Stderr))
s.WithLogger(zaptest.NewLogger(tb))
return s
}

View File

@ -1,7 +1,6 @@
package retention_test
import (
"bytes"
"context"
"fmt"
"reflect"
@ -10,37 +9,39 @@ import (
"time"
"github.com/influxdata/influxdb/v2/internal"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/toml"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/influxdata/influxdb/v2/v1/services/retention"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)
func TestService_OpenDisabled(t *testing.T) {
// Opening a disabled service should be a no-op.
c := retention.NewConfig()
c.Enabled = false
s := NewService(c)
s := NewService(t, c)
if err := s.Open(context.Background()); err != nil {
t.Fatal(err)
}
if s.LogBuf.String() != "" {
t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String())
if s.LogBuf.Len() > 0 {
t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.All())
}
}
func TestService_OpenClose(t *testing.T) {
// Opening a disabled service should be a no-op.
s := NewService(retention.NewConfig())
s := NewService(t, retention.NewConfig())
ctx := context.Background()
if err := s.Open(ctx); err != nil {
t.Fatal(err)
}
if s.LogBuf.String() == "" {
if s.LogBuf.Len() == 0 {
t.Fatal("service didn't log anything on open")
}
@ -117,7 +118,7 @@ func TestService_CheckShards(t *testing.T) {
config := retention.NewConfig()
config.CheckInterval = toml.Duration(10 * time.Millisecond)
s := NewService(config)
s := NewService(t, config)
s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
return data
}
@ -233,7 +234,7 @@ func TestService_8819_repro(t *testing.T) {
func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) {
c := retention.NewConfig()
c.CheckInterval = toml.Duration(time.Millisecond)
s := NewService(c)
s := NewService(t, c)
errC := make(chan error, 1) // Buffer Important to prevent deadlock.
done := make(chan struct{})
@ -379,19 +380,24 @@ type Service struct {
MetaClient *internal.MetaClientMock
TSDBStore *internal.TSDBStoreMock
LogBuf bytes.Buffer
LogBuf *observer.ObservedLogs
*retention.Service
}
func NewService(c retention.Config) *Service {
func NewService(tb testing.TB, c retention.Config) *Service {
tb.Helper()
s := &Service{
MetaClient: &internal.MetaClientMock{},
TSDBStore: &internal.TSDBStoreMock{},
Service: retention.NewService(c),
}
l := logger.New(&s.LogBuf)
s.WithLogger(l)
logcore, logbuf := observer.New(zapcore.InfoLevel)
log := zap.New(logcore)
s.LogBuf = logbuf
s.WithLogger(log)
s.Service.MetaClient = s.MetaClient
s.Service.TSDBStore = s.TSDBStore