diff --git a/cmd/telemetryd/main.go b/cmd/telemetryd/main.go index 59b11b0a5f..8b988f6b86 100644 --- a/cmd/telemetryd/main.go +++ b/cmd/telemetryd/main.go @@ -15,13 +15,21 @@ import ( ) var ( - log = influxlogger.New(os.Stdout) addr string ) func main() { + logconf := influxlogger.NewConfig() + log, err := logconf.New(os.Stdout) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Failed to configure logger: %v", err) + os.Exit(1) + } + prog := &cli.Program{ - Run: run, + Run: func() error { + return run(log) + }, Name: "telemetryd", Opts: []cli.Opt{ { @@ -32,6 +40,7 @@ func main() { }, }, } + v := viper.New() cmd := cli.NewCommand(v, prog) @@ -49,8 +58,8 @@ func main() { os.Exit(exitCode) } -func run() error { - log := log.With(zap.String("service", "telemetryd")) +func run(log *zap.Logger) error { + log = log.With(zap.String("service", "telemetryd")) store := telemetry.NewLogStore(log) svc := telemetry.NewPushGateway(log, store) // Print data as line protocol diff --git a/gather/scheduler_test.go b/gather/scheduler_test.go index 339495554b..446b8f201e 100644 --- a/gather/scheduler_test.go +++ b/gather/scheduler_test.go @@ -3,15 +3,14 @@ package gather import ( "context" "net/http/httptest" - "os" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/influxdata/influxdb/v2" - influxlogger "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/mock" influxdbtesting "github.com/influxdata/influxdb/v2/testing" + "go.uber.org/zap/zaptest" ) func TestScheduler(t *testing.T) { @@ -19,7 +18,7 @@ func TestScheduler(t *testing.T) { totalGatherJobs := 3 // Create top level logger - logger := influxlogger.New(os.Stdout) + logger := zaptest.NewLogger(t) ts := httptest.NewServer(&mockHTTPHandler{ responseMap: map[string]string{ "/metrics": sampleRespSmall, diff --git a/http/legacy/router.go b/http/legacy/router.go index db6df167d4..32deff4f62 100644 --- a/http/legacy/router.go +++ b/http/legacy/router.go @@ -71,14 +71,17 @@ func (h baseHandler) panic(w http.ResponseWriter, r *http.Request, rcv interface h.HandleHTTPError(ctx, pe, w) } -var panicLogger *zap.Logger +var panicLogger = zap.NewNop() var panicLoggerOnce sync.Once // getPanicLogger returns a logger for panicHandler. func getPanicLogger() *zap.Logger { panicLoggerOnce.Do(func() { - panicLogger = influxlogger.New(os.Stderr) - panicLogger = panicLogger.With(zap.String("handler", "panic")) + conf := influxlogger.NewConfig() + logger, err := conf.New(os.Stderr) + if err == nil { + panicLogger = logger.With(zap.String("handler", "panic")) + } }) return panicLogger diff --git a/http/router.go b/http/router.go index 3c2cae18fd..7315ff0468 100644 --- a/http/router.go +++ b/http/router.go @@ -129,14 +129,17 @@ func panicMW(api *kithttp.API) func(http.Handler) http.Handler { } } -var panicLogger *zap.Logger +var panicLogger = zap.NewNop() var panicLoggerOnce sync.Once // getPanicLogger returns a logger for panicHandler. func getPanicLogger() *zap.Logger { panicLoggerOnce.Do(func() { - panicLogger = influxlogger.New(os.Stderr) - panicLogger = panicLogger.With(zap.String("handler", "panic")) + conf := influxlogger.NewConfig() + logger, err := conf.New(os.Stderr) + if err == nil { + panicLogger = logger.With(zap.String("handler", "panic")) + } }) return panicLogger diff --git a/logger/logger.go b/logger/logger.go index c90bc40f07..167fa76660 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -13,12 +13,6 @@ import ( const TimeFormat = "2006-01-02T15:04:05.000000Z07:00" -func New(w io.Writer) *zap.Logger { - config := NewConfig() - l, _ := config.New(w) - return l -} - func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) { w := defaultOutput format := c.Format diff --git a/tsdb/engine/tsm1/engine_internal_test.go b/tsdb/engine/tsm1/engine_internal_test.go index 2ba840b529..3b22fee518 100644 --- a/tsdb/engine/tsm1/engine_internal_test.go +++ b/tsdb/engine/tsm1/engine_internal_test.go @@ -7,10 +7,10 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/tsdb" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) func TestEngine_ConcurrentShardSnapshots(t *testing.T) { @@ -21,7 +21,7 @@ func TestEngine_ConcurrentShardSnapshots(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := NewSeriesFile(tmpDir) + sfile := NewSeriesFile(t, tmpDir) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -79,13 +79,15 @@ func TestEngine_ConcurrentShardSnapshots(t *testing.T) { } // NewSeriesFile returns a new instance of SeriesFile with a temporary file path. -func NewSeriesFile(tmpDir string) *tsdb.SeriesFile { +func NewSeriesFile(tb testing.TB, tmpDir string) *tsdb.SeriesFile { + tb.Helper() + dir, err := ioutil.TempDir(tmpDir, "tsdb-series-file-") if err != nil { panic(err) } f := tsdb.NewSeriesFile(dir) - f.Logger = logger.New(os.Stdout) + f.Logger = zaptest.NewLogger(tb) if err := f.Open(); err != nil { panic(err) } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index af5dcd19aa..28e6b0c766 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -21,20 +21,20 @@ import ( "github.com/google/go-cmp/cmp" "github.com/influxdata/influxdb/v2/influxql/query" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/deep" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" "github.com/influxdata/influxql" + "go.uber.org/zap/zaptest" ) // Ensure that deletes only sent to the WAL will clear out the data from the cache on restart func TestEngine_DeleteWALLoadMetadata(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() if err := e.WritePointsString( @@ -70,7 +70,7 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) { func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() if err := e.WritePointsString( @@ -174,7 +174,7 @@ func seriesExist(e *Engine, m string, dims []string) (int, error) { // Ensure that the engine can write & read shard digest files. func TestEngine_Digest(t *testing.T) { - e := MustOpenEngine(tsi1.IndexName) + e := MustOpenEngine(t, tsi1.IndexName) defer e.Close() if err := e.Open(); err != nil { @@ -322,7 +322,7 @@ type span struct { // Ensure engine handles concurrent calls to Digest(). func TestEngine_Digest_Concurrent(t *testing.T) { - e := MustOpenEngine(tsi1.IndexName) + e := MustOpenEngine(t, tsi1.IndexName) defer e.Close() if err := e.Open(); err != nil { @@ -748,7 +748,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -805,7 +805,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -861,7 +861,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -919,7 +919,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -977,7 +977,7 @@ func TestEngine_CreateIterator_Aux(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -1037,7 +1037,7 @@ func TestEngine_CreateIterator_Condition(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -1093,8 +1093,8 @@ func TestEngine_CreateIterator_Condition(t *testing.T) { // Test that series id set gets updated and returned appropriately. func TestIndex_SeriesIDSet(t *testing.T) { - test := func(index string) error { - engine := MustOpenEngine(index) + test := func(t *testing.T, index string) error { + engine := MustOpenEngine(t, index) defer engine.Close() // Add some series. @@ -1180,7 +1180,7 @@ func TestIndex_SeriesIDSet(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - if err := test(index); err != nil { + if err := test(t, index); err != nil { t.Error(err) } }) @@ -1197,7 +1197,7 @@ func TestEngine_DeleteSeries(t *testing.T) { p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") - e, err := NewEngine(index) + e, err := NewEngine(t, index) if err != nil { t.Fatal(err) } @@ -1252,7 +1252,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) { p7 := MustParsePointString("mem,host=C value=1.3 1000000000") // Should not be deleted p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted - e, err := NewEngine(index) + e, err := NewEngine(t, index) if err != nil { t.Fatal(err) } @@ -1362,7 +1362,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) { p7 := MustParsePointString("mem,host=C value=1.3 1000000000") p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted - e, err := NewEngine(index) + e, err := NewEngine(t, index) if err != nil { t.Fatal(err) } @@ -1488,7 +1488,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) { p7 := MustParsePointString("mem,host=C value=1.3 1000000000") p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted - e, err := NewEngine(index) + e, err := NewEngine(t, index) if err != nil { t.Fatal(err) } @@ -1574,7 +1574,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) { p7 := MustParsePointString("mem,host=C value=1.3 1000000000") p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted - e, err := NewEngine(index) + e, err := NewEngine(t, index) if err != nil { t.Fatal(err) } @@ -1693,7 +1693,7 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) { // Create a few points. p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") // Should not be deleted - e, err := NewEngine(index) + e, err := NewEngine(t, index) if err != nil { t.Fatal(err) } @@ -1774,7 +1774,7 @@ func TestEngine_LastModified(t *testing.T) { p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") - e, err := NewEngine(index) + e, err := NewEngine(t, index) if err != nil { t.Fatal(err) } @@ -1865,7 +1865,7 @@ func TestEngine_SnapshotsDisabled(t *testing.T) { func TestEngine_ShouldCompactCache(t *testing.T) { nowTime := time.Now() - e, err := NewEngine(tsi1.IndexName) + e, err := NewEngine(t, tsi1.IndexName) if err != nil { t.Fatal(err) } @@ -1910,7 +1910,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -1970,7 +1970,7 @@ func TestEngine_CreateCursor_Descending(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -2052,7 +2052,7 @@ func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() var wg sync.WaitGroup @@ -2097,7 +2097,7 @@ func TestEngine_WritePoints_TypeConflict(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() if err := e.WritePointsString( @@ -2133,7 +2133,7 @@ func TestEngine_WritePoints_Reload(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - e := MustOpenEngine(index) + e := MustOpenEngine(t, index) defer e.Close() if err := e.WritePointsString( @@ -2176,7 +2176,7 @@ func TestEngine_Invalid_UTF8(t *testing.T) { field := []byte{255, 110, 101, 116} // A known invalid UTF-8 string p := MustParsePointString(fmt.Sprintf("%s,host=A %s=1.1 6000000000", name, field)) - e, err := NewEngine(index) + e, err := NewEngine(t, index) if err != nil { t.Fatal(err) } @@ -2207,7 +2207,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) { batchSizes := []int{10, 100, 1000, 5000, 10000} for _, sz := range batchSizes { for _, index := range tsdb.RegisteredIndexes() { - e := MustOpenEngine(index) + e := MustOpenEngine(b, index) e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) pp := make([]models.Point, 0, sz) for i := 0; i < sz; i++ { @@ -2233,7 +2233,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) { batchSizes := []int{1000, 5000, 10000, 25000, 50000, 75000, 100000, 200000} for _, sz := range batchSizes { for _, index := range tsdb.RegisteredIndexes() { - e := MustOpenEngine(index) + e := MustOpenEngine(b, index) e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) cpus := runtime.GOMAXPROCS(0) @@ -2412,7 +2412,7 @@ var benchmarkVariants = []struct { func BenchmarkEngine_CreateIterator(b *testing.B) { engines := make([]*benchmarkEngine, len(sizes)) for i, size := range sizes { - engines[i] = MustInitDefaultBenchmarkEngine(size.name, size.sz) + engines[i] = MustInitDefaultBenchmarkEngine(b, size.name, size.sz) } for _, tt := range benchmarks { @@ -2458,13 +2458,13 @@ var ( // MustInitDefaultBenchmarkEngine creates a new engine using the default index // and fills it with points. Reuses previous engine if the same parameters // were used. -func MustInitDefaultBenchmarkEngine(name string, pointN int) *benchmarkEngine { +func MustInitDefaultBenchmarkEngine(tb testing.TB, name string, pointN int) *benchmarkEngine { const batchSize = 1000 if pointN%batchSize != 0 { panic(fmt.Sprintf("point count (%d) must be a multiple of batch size (%d)", pointN, batchSize)) } - e := MustOpenEngine(tsdb.DefaultIndex) + e := MustOpenEngine(tb, tsdb.DefaultIndex) // Initialize metadata. e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) @@ -2516,7 +2516,9 @@ type Engine struct { } // NewEngine returns a new instance of Engine at a temporary location. -func NewEngine(index string) (*Engine, error) { +func NewEngine(tb testing.TB, index string) (*Engine, error) { + tb.Helper() + root, err := ioutil.TempDir("", "tsm1-") if err != nil { panic(err) @@ -2531,7 +2533,7 @@ func NewEngine(index string) (*Engine, error) { // Setup series file. sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory)) - sfile.Logger = logger.New(os.Stdout) + sfile.Logger = zaptest.NewLogger(tb) if err = sfile.Open(); err != nil { return nil, err } @@ -2559,8 +2561,10 @@ func NewEngine(index string) (*Engine, error) { } // MustOpenEngine returns a new, open instance of Engine. -func MustOpenEngine(index string) *Engine { - e, err := NewEngine(index) +func MustOpenEngine(tb testing.TB, index string) *Engine { + tb.Helper() + + e, err := NewEngine(tb, index) if err != nil { panic(err) } diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 8ecdf9e65e..c5965d6bb2 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -12,8 +12,8 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "go.uber.org/zap/zaptest" ) func TestFileStore_Read(t *testing.T) { @@ -2964,9 +2964,7 @@ func BenchmarkFileStore_Stats(b *testing.B) { } fs := tsm1.NewFileStore(dir) - if testing.Verbose() { - fs.WithLogger(logger.New(os.Stderr)) - } + fs.WithLogger(zaptest.NewLogger(b)) if err := fs.Open(); err != nil { b.Fatalf("opening file store %v", err) diff --git a/tsdb/engine/tsm1/iterator_test.go b/tsdb/engine/tsm1/iterator_test.go index 1487163808..342e41a420 100644 --- a/tsdb/engine/tsm1/iterator_test.go +++ b/tsdb/engine/tsm1/iterator_test.go @@ -1,14 +1,13 @@ package tsm1 import ( - "os" "runtime" "testing" "time" "github.com/influxdata/influxdb/v2/influxql/query" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxql" + "go.uber.org/zap/zaptest" ) func BenchmarkIntegerIterator_Next(b *testing.B) { @@ -71,7 +70,7 @@ func TestFinalizerIterator(t *testing.T) { step3 = make(chan struct{}) ) - l := logger.New(os.Stderr) + l := zaptest.NewLogger(t) done := make(chan struct{}) func() { itr := &testFinalizerIterator{ diff --git a/tsdb/index/tsi1/sql_index_exporter_test.go b/tsdb/index/tsi1/sql_index_exporter_test.go index 8af3214dd9..1ab15d5a58 100644 --- a/tsdb/index/tsi1/sql_index_exporter_test.go +++ b/tsdb/index/tsi1/sql_index_exporter_test.go @@ -2,12 +2,11 @@ package tsi1_test import ( "bytes" - "os" "testing" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" + "go.uber.org/zap/zaptest" ) func TestSQLIndexExporter_ExportIndex(t *testing.T) { @@ -40,7 +39,7 @@ COMMIT; var buf bytes.Buffer e := tsi1.NewSQLIndexExporter(&buf) e.ShowSchema = false - e.Logger = logger.New(os.Stderr) + e.Logger = zaptest.NewLogger(t) if err := e.ExportIndex(idx.Index); err != nil { t.Fatal(err) } else if err := e.Close(); err != nil { diff --git a/tsdb/index_test.go b/tsdb/index_test.go index 6f50e64bb3..833f95ddfc 100644 --- a/tsdb/index_test.go +++ b/tsdb/index_test.go @@ -12,12 +12,12 @@ import ( "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/internal" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/slices" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" "github.com/influxdata/influxql" + "go.uber.org/zap/zaptest" ) // Ensure iterator can merge multiple iterators together. @@ -60,7 +60,7 @@ func TestIndexSet_MeasurementNamesByExpr(t *testing.T) { // Setup indexes indexes := map[string]*Index{} for _, name := range tsdb.RegisteredIndexes() { - idx := MustOpenNewIndex(name) + idx := MustOpenNewIndex(t, name) idx.AddSeries("cpu", map[string]string{"region": "east"}) idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"}) idx.AddSeries("disk", map[string]string{"secret": "foo"}) @@ -140,7 +140,7 @@ func TestIndexSet_MeasurementNamesByPredicate(t *testing.T) { // Setup indexes indexes := map[string]*Index{} for _, name := range tsdb.RegisteredIndexes() { - idx := MustOpenNewIndex(name) + idx := MustOpenNewIndex(t, name) idx.AddSeries("cpu", map[string]string{"region": "east"}) idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"}) idx.AddSeries("disk", map[string]string{"secret": "foo"}) @@ -271,7 +271,7 @@ func TestIndex_Sketches(t *testing.T) { } test := func(t *testing.T, index string) error { - idx := MustNewIndex(index) + idx := MustNewIndex(t, index) if index, ok := idx.Index.(*tsi1.Index); ok { // Override the log file max size to force a log file compaction sooner. // This way, we will test the sketches are correct when they have been @@ -359,7 +359,9 @@ var DisableTSICache = func() EngineOption { // everything under the same root directory so it can be cleanly removed on Close. // // The index will not be opened. -func MustNewIndex(index string, eopts ...EngineOption) *Index { +func MustNewIndex(tb testing.TB, index string, eopts ...EngineOption) *Index { + tb.Helper() + opts := tsdb.NewEngineOptions() opts.IndexVersion = index @@ -386,10 +388,7 @@ func MustNewIndex(index string, eopts ...EngineOption) *Index { if err != nil { panic(err) } - - if testing.Verbose() { - i.WithLogger(logger.New(os.Stderr)) - } + i.WithLogger(zaptest.NewLogger(tb)) idx := &Index{ Index: i, @@ -402,8 +401,10 @@ func MustNewIndex(index string, eopts ...EngineOption) *Index { // MustOpenNewIndex will initialize a new index using the provide type and opens // it. -func MustOpenNewIndex(index string, opts ...EngineOption) *Index { - idx := MustNewIndex(index, opts...) +func MustOpenNewIndex(tb testing.TB, index string, opts ...EngineOption) *Index { + tb.Helper() + + idx := MustNewIndex(tb, index, opts...) idx.MustOpen() return idx } @@ -532,7 +533,7 @@ func BenchmarkIndexSet_TagSets(b *testing.B) { b.Run("1M series", func(b *testing.B) { b.ReportAllocs() for _, indexType := range tsdb.RegisteredIndexes() { - idx := MustOpenNewIndex(indexType) + idx := MustOpenNewIndex(b, indexType) setup(idx) name := []byte("m4") @@ -612,9 +613,9 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) { runBenchmark := func(b *testing.B, index string, queryN int, useTSICache bool) { var idx *Index if !useTSICache { - idx = MustOpenNewIndex(index, DisableTSICache()) + idx = MustOpenNewIndex(b, index, DisableTSICache()) } else { - idx = MustOpenNewIndex(index) + idx = MustOpenNewIndex(b, index) } var wg sync.WaitGroup @@ -674,7 +675,7 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) { } // Re-open everything - idx = MustOpenNewIndex(index) + idx = MustOpenNewIndex(b, index) wg.Add(1) begin = make(chan struct{}) once = sync.Once{} diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index 9f0864ee4a..032e1eb782 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -8,9 +8,9 @@ import ( "path" "testing" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/tsdb" + "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" ) @@ -55,7 +55,7 @@ func TestParseSeriesKeyInto(t *testing.T) { func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) { f := NewBrokenSeriesFile([]byte{0, 0, 0, 0, 0}) defer f.Close() - f.Logger = logger.New(os.Stdout) + f.Logger = zaptest.NewLogger(t) err := f.Open() @@ -66,7 +66,7 @@ func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) { // Ensure series file contains the correct set of series. func TestSeriesFile_Series(t *testing.T) { - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() series := []Series{ @@ -100,7 +100,7 @@ func TestSeriesFile_Series(t *testing.T) { // Ensure series file can be compacted. func TestSeriesFileCompactor(t *testing.T) { - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() // Disable automatic compactions. @@ -141,7 +141,7 @@ func TestSeriesFileCompactor(t *testing.T) { // Ensure series file deletions persist across compactions. func TestSeriesFile_DeleteSeriesID(t *testing.T) { - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}) @@ -176,7 +176,7 @@ func TestSeriesFile_DeleteSeriesID(t *testing.T) { } func TestSeriesFile_Compaction(t *testing.T) { - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() // Generate a bunch of keys. @@ -261,7 +261,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) { const n = 1000000 if cachedCompactionSeriesFile == nil { - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(b) // Generate a bunch of keys. var ids []uint64 @@ -342,9 +342,11 @@ func NewBrokenSeriesFile(content []byte) *SeriesFile { } // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. -func MustOpenSeriesFile() *SeriesFile { +func MustOpenSeriesFile(tb testing.TB) *SeriesFile { + tb.Helper() + f := NewSeriesFile() - f.Logger = logger.New(os.Stdout) + f.Logger = zaptest.NewLogger(tb) if err := f.Open(); err != nil { panic(err) } diff --git a/tsdb/shard_internal_test.go b/tsdb/shard_internal_test.go index a1dac59fcd..8858371d79 100644 --- a/tsdb/shard_internal_test.go +++ b/tsdb/shard_internal_test.go @@ -13,16 +13,16 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxql" + "go.uber.org/zap/zaptest" ) func TestShard_MapType(t *testing.T) { var sh *TempShard setup := func(index string) { - sh = NewTempShard(index) + sh = NewTempShard(t, index) if err := sh.Open(); err != nil { t.Fatal(err) @@ -155,7 +155,7 @@ _reserved,region=uswest value="foo" 0 func TestShard_MeasurementsByRegex(t *testing.T) { var sh *TempShard setup := func(index string) { - sh = NewTempShard(index) + sh = NewTempShard(t, index) if err := sh.Open(); err != nil { t.Fatal(err) } @@ -212,7 +212,9 @@ type TempShard struct { } // NewTempShard returns a new instance of TempShard with temp paths. -func NewTempShard(index string) *TempShard { +func NewTempShard(tb testing.TB, index string) *TempShard { + tb.Helper() + // Create temporary path for data and WAL. dir, err := ioutil.TempDir("", "influxdb-tsdb-") if err != nil { @@ -221,7 +223,7 @@ func NewTempShard(index string) *TempShard { // Create series file. sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileDirectory)) - sfile.Logger = logger.New(os.Stdout) + sfile.Logger = zaptest.NewLogger(tb) if err := sfile.Open(); err != nil { panic(err) } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index baee72eb35..9d704aadda 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -37,7 +37,7 @@ func TestShardWriteAndIndex(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -106,7 +106,7 @@ func TestShardRebuildIndex(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -184,7 +184,7 @@ func TestShard_Open_CorruptFieldsIndex(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -234,7 +234,7 @@ func TestWriteTimeTag(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -284,7 +284,7 @@ func TestWriteTimeField(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -319,7 +319,7 @@ func TestShardWriteAddNewField(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -371,7 +371,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -459,7 +459,7 @@ func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -609,7 +609,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) { tmpShard := filepath.Join(tmpDir, "shard") tmpWal := filepath.Join(tmpDir, "wal") - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() opts := tsdb.NewEngineOptions() @@ -649,7 +649,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) { func TestShard_CreateIterator_Ascending(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - sh := NewShard(index) + sh := NewShard(t, index) defer sh.Close() // Calling CreateIterator when the engine is not open will return @@ -732,8 +732,8 @@ func TestShard_CreateIterator_Descending(t *testing.T) { var sh *Shard var itr query.Iterator - test := func(index string) { - sh = NewShard(index) + test := func(t *testing.T, index string) { + sh = NewShard(t, index) // Calling CreateIterator when the engine is not open will return // ErrEngineClosed. @@ -808,7 +808,7 @@ cpu,host=serverB,region=uswest value=25 0 } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) sh.Close() itr.Close() } @@ -834,8 +834,8 @@ func TestShard_CreateIterator_Series_Auth(t *testing.T) { }, } - test := func(index string, v variant) error { - sh := MustNewOpenShard(index) + test := func(t *testing.T, index string, v variant) error { + sh := MustNewOpenShard(t, index) defer sh.Close() sh.MustWritePointsString(` cpu,host=serverA,region=uswest value=100 0 @@ -952,7 +952,7 @@ cpu,secret=foo value=100 0 for _, index := range tsdb.RegisteredIndexes() { for _, example := range examples { t.Run(index+"_"+example.name, func(t *testing.T) { - if err := test(index, example); err != nil { + if err := test(t, index, example); err != nil { t.Fatal(err) } }) @@ -963,8 +963,8 @@ cpu,secret=foo value=100 0 func TestShard_Disabled_WriteQuery(t *testing.T) { var sh *Shard - test := func(index string) { - sh = NewShard(index) + test := func(t *testing.T, index string) { + sh = NewShard(t, index) if err := sh.Open(); err != nil { t.Fatal(err) } @@ -1007,15 +1007,15 @@ func TestShard_Disabled_WriteQuery(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) sh.Close() } } func TestShard_Closed_Functions(t *testing.T) { var sh *Shard - test := func(index string) { - sh = NewShard(index) + test := func(t *testing.T, index string) { + sh = NewShard(t, index) if err := sh.Open(); err != nil { t.Fatal(err) } @@ -1040,18 +1040,18 @@ func TestShard_Closed_Functions(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } func TestShard_FieldDimensions(t *testing.T) { var sh *Shard - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(t) defer sfile.Close() setup := func(index string) { - sh = NewShard(index) + sh = NewShard(t, index) if err := sh.Open(); err != nil { t.Fatal(err) @@ -1167,7 +1167,7 @@ func TestShards_FieldKeysByMeasurement(t *testing.T) { var shards Shards setup := func(index string) { - shards = NewShards(index, 2) + shards = NewShards(t, index, 2) shards.MustOpen() shards[0].MustWritePointsString(`cpu,host=serverA,region=uswest a=2.2,b=33.3,value=100 0`) @@ -1203,7 +1203,7 @@ func TestShards_FieldDimensions(t *testing.T) { var shard1, shard2 *Shard setup := func(index string) { - shard1 = NewShard(index) + shard1 = NewShard(t, index) if err := shard1.Open(); err != nil { t.Fatal(err) } @@ -1214,7 +1214,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 `) - shard2 = NewShard(index) + shard2 = NewShard(t, index) if err := shard2.Open(); err != nil { t.Fatal(err) } @@ -1328,7 +1328,7 @@ func TestShards_MapType(t *testing.T) { var shard1, shard2 *Shard setup := func(index string) { - shard1 = NewShard(index) + shard1 = NewShard(t, index) if err := shard1.Open(); err != nil { t.Fatal(err) } @@ -1339,7 +1339,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 `) - shard2 = NewShard(index) + shard2 = NewShard(t, index) if err := shard2.Open(); err != nil { t.Fatal(err) } @@ -1467,7 +1467,7 @@ func TestShards_MeasurementsByRegex(t *testing.T) { var shard1, shard2 *Shard setup := func(index string) { - shard1 = NewShard(index) + shard1 = NewShard(t, index) if err := shard1.Open(); err != nil { t.Fatal(err) } @@ -1478,7 +1478,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10 cpu,host=serverB,region=uswest value=25 0 `) - shard2 = NewShard(index) + shard2 = NewShard(t, index) if err := shard2.Open(); err != nil { t.Fatal(err) } @@ -1828,7 +1828,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { b.StopTimer() b.ResetTimer() - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(b) defer sfile.Close() // Run the benchmark loop. @@ -1866,7 +1866,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt } } - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(b) defer sfile.Close() shard, tmpDir, err := openShard(sfile) @@ -1912,7 +1912,7 @@ func benchmarkWritePointsExistingSeriesFields(b *testing.B, mCnt, tkCnt, tvCnt, } } - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(b) defer func() { _ = sfile.Close() }() @@ -1957,7 +1957,7 @@ func benchmarkWritePointsExistingSeriesEqualBatches(b *testing.B, mCnt, tkCnt, t } } - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(b) defer sfile.Close() shard, tmpDir, err := openShard(sfile) @@ -2037,7 +2037,7 @@ func BenchmarkCreateIterator(b *testing.B) { var shards Shards for i := 1; i <= 5; i++ { name := fmt.Sprintf("%s_shards_%d", index, i) - shards = NewShards(index, i) + shards = NewShards(b, index, i) shards.MustOpen() setup(index, shards) @@ -2111,13 +2111,15 @@ type Shard struct { type Shards []*Shard // NewShard returns a new instance of Shard with temp paths. -func NewShard(index string) *Shard { - return NewShards(index, 1)[0] +func NewShard(tb testing.TB, index string) *Shard { + tb.Helper() + return NewShards(tb, index, 1)[0] } // MustNewOpenShard creates and opens a shard with the provided index. -func MustNewOpenShard(index string) *Shard { - sh := NewShard(index) +func MustNewOpenShard(tb testing.TB, index string) *Shard { + tb.Helper() + sh := NewShard(tb, index) if err := sh.Open(); err != nil { panic(err) } @@ -2136,14 +2138,16 @@ func (sh *Shard) Close() error { } // NewShards create several shards all sharing the same -func NewShards(index string, n int) Shards { +func NewShards(tb testing.TB, index string, n int) Shards { + tb.Helper() + // Create temporary path for data and WAL. dir, err := ioutil.TempDir("", "influxdb-tsdb-") if err != nil { panic(err) } - sfile := MustOpenSeriesFile() + sfile := MustOpenSeriesFile(tb) var shards []*Shard var idSets []*tsdb.SeriesIDSet diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 2ad9c5934d..afd5b5b7d2 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -22,20 +22,20 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/internal" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/deep" "github.com/influxdata/influxdb/v2/pkg/slices" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxql" + "go.uber.org/zap/zaptest" ) // Ensure the store can delete a retention policy and all shards under // it. func TestStore_DeleteRetentionPolicy(t *testing.T) { - test := func(index string) { - s := MustOpenStore(index) + test := func(t *testing.T, index string) { + s := MustOpenStore(t, index) defer s.Close() // Create a new shard and verify that it exists. @@ -86,7 +86,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) { } // Reopen other shard and check it still exists. - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Error(err) } else if sh := s.Shard(3); sh == nil { t.Errorf("shard 3 does not exist") @@ -102,15 +102,15 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } // Ensure the store can create a new shard. func TestStore_CreateShard(t *testing.T) { - test := func(index string) { - s := MustOpenStore(index) + test := func(t *testing.T, index string) { + s := MustOpenStore(t, index) defer s.Close() // Create a new shard and verify that it exists. @@ -128,7 +128,7 @@ func TestStore_CreateShard(t *testing.T) { } // Reopen shard and recheck. - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard(1)") @@ -138,14 +138,14 @@ func TestStore_CreateShard(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } func TestStore_CreateMixedShards(t *testing.T) { - test := func(index1 string, index2 string) { - s := MustOpenStore(index1) + test := func(t *testing.T, index1 string, index2 string) { + s := MustOpenStore(t, index1) defer s.Close() // Create a new shard and verify that it exists. @@ -157,7 +157,7 @@ func TestStore_CreateMixedShards(t *testing.T) { s.EngineOptions.IndexVersion = index2 s.index = index2 - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } @@ -169,7 +169,7 @@ func TestStore_CreateMixedShards(t *testing.T) { } // Reopen shard and recheck. - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard(1)") @@ -193,14 +193,14 @@ func TestStore_CreateMixedShards(t *testing.T) { j := (i + 1) % len(indexes) index1 := indexes[i] index2 := indexes[j] - t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) }) + t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(t, index1, index2) }) } } func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) { - test := func(index string) { - s := MustOpenStore(index) + test := func(t *testing.T, index string) { + s := MustOpenStore(t, index) defer s.Close() if err := s.CreateShard("db0", "rp0", 1, true); err != nil { @@ -255,14 +255,14 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } func TestStore_WriteMixedShards(t *testing.T) { - test := func(index1 string, index2 string) { - s := MustOpenStore(index1) + test := func(t *testing.T, index1 string, index2 string) { + s := MustOpenStore(t, index1) defer s.Close() if err := s.CreateShard("db0", "rp0", 1, true); err != nil { @@ -273,7 +273,7 @@ func TestStore_WriteMixedShards(t *testing.T) { s.EngineOptions.IndexVersion = index2 s.index = index2 - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } @@ -326,15 +326,15 @@ func TestStore_WriteMixedShards(t *testing.T) { j := (i + 1) % len(indexes) index1 := indexes[i] index2 := indexes[j] - t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) }) + t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(t, index1, index2) }) } } // Ensure the store does not return an error when delete from a non-existent db. func TestStore_DeleteSeries_NonExistentDB(t *testing.T) { - test := func(index string) { - s := MustOpenStore(index) + test := func(t *testing.T, index string) { + s := MustOpenStore(t, index) defer s.Close() if err := s.DeleteSeries("db0", nil, nil); err != nil { @@ -343,15 +343,15 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } // Ensure the store can delete an existing shard. func TestStore_DeleteShard(t *testing.T) { - test := func(index string) error { - s := MustOpenStore(index) + test := func(t *testing.T, index string) error { + s := MustOpenStore(t, index) defer s.Close() // Create a new shard and verify that it exists. @@ -383,7 +383,7 @@ func TestStore_DeleteShard(t *testing.T) { s.MustWriteToShardString(3, "cpu,serverb=b v=1") // Reopen the store and check all shards still exist - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { return err } for i := uint64(1); i <= 3; i++ { @@ -428,7 +428,7 @@ func TestStore_DeleteShard(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - if err := test(index); err != nil { + if err := test(t, index); err != nil { t.Error(err) } }) @@ -438,8 +438,8 @@ func TestStore_DeleteShard(t *testing.T) { // Ensure the store can create a snapshot to a shard. func TestStore_CreateShardSnapShot(t *testing.T) { - test := func(index string) { - s := MustOpenStore(index) + test := func(t *testing.T, index string) { + s := MustOpenStore(t, index) defer s.Close() // Create a new shard and verify that it exists. @@ -459,14 +459,14 @@ func TestStore_CreateShardSnapShot(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } func TestStore_Open(t *testing.T) { - test := func(index string) { - s := NewStore(index) + test := func(t *testing.T, index string) { + s := NewStore(t, index) defer s.Close() if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil { @@ -500,15 +500,15 @@ func TestStore_Open(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } // Ensure the store reports an error when it can't open a database directory. func TestStore_Open_InvalidDatabaseFile(t *testing.T) { - test := func(index string) { - s := NewStore(index) + test := func(t *testing.T, index string) { + s := NewStore(t, index) defer s.Close() // Create a file instead of a directory for a database. @@ -525,15 +525,15 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } // Ensure the store reports an error when it can't open a retention policy. func TestStore_Open_InvalidRetentionPolicy(t *testing.T) { - test := func(index string) { - s := NewStore(index) + test := func(t *testing.T, index string) { + s := NewStore(t, index) defer s.Close() // Create an RP file instead of a directory. @@ -554,15 +554,15 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } // Ensure the store reports an error when it can't open a retention policy. func TestStore_Open_InvalidShard(t *testing.T) { - test := func(index string) { - s := NewStore(index) + test := func(t *testing.T, index string) { + s := NewStore(t, index) defer s.Close() // Create a non-numeric shard file. @@ -583,15 +583,15 @@ func TestStore_Open_InvalidShard(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } // Ensure shards can create iterators. func TestShards_CreateIterator(t *testing.T) { - test := func(index string) { - s := MustOpenStore(index) + test := func(t *testing.T, index string) { + s := MustOpenStore(t, index) defer s.Close() // Create shard #0 with data. @@ -666,14 +666,14 @@ func TestShards_CreateIterator(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } // Ensure the store can backup a shard and another store can restore it. func TestStore_BackupRestoreShard(t *testing.T) { - test := func(index string) { - s0, s1 := MustOpenStore(index), MustOpenStore(index) + test := func(t *testing.T, index string) { + s0, s1 := MustOpenStore(t, index), MustOpenStore(t, index) defer s0.Close() defer s1.Close() @@ -684,7 +684,7 @@ func TestStore_BackupRestoreShard(t *testing.T) { `cpu value=3 20`, ) - if err := s0.Reopen(); err != nil { + if err := s0.Reopen(t); err != nil { t.Fatal(err) } @@ -742,14 +742,14 @@ func TestStore_BackupRestoreShard(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - test(index) + test(t, index) }) } } func TestStore_Shard_SeriesN(t *testing.T) { - test := func(index string) error { - s := MustOpenStore(index) + test := func(t *testing.T, index string) error { + s := MustOpenStore(t, index) defer s.Close() // Create shard with data. @@ -774,7 +774,7 @@ func TestStore_Shard_SeriesN(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - if err := test(index); err != nil { + if err := test(t, index); err != nil { t.Error(err) } }) @@ -783,8 +783,8 @@ func TestStore_Shard_SeriesN(t *testing.T) { func TestStore_MeasurementNames_Deduplicate(t *testing.T) { - test := func(index string) { - s := MustOpenStore(index) + test := func(t *testing.T, index string) { + s := MustOpenStore(t, index) defer s.Close() // Create shard with data. @@ -816,7 +816,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } @@ -883,8 +883,8 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) { t.Skip("Skipping test in short, race, circleci and appveyor mode.") } - test := func(index string) { - store := NewStore(index) + test := func(t *testing.T, index string) { + store := NewStore(t, index) if err := store.Open(); err != nil { panic(err) } @@ -893,7 +893,7 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } @@ -947,8 +947,8 @@ func TestStore_Cardinality_Unique(t *testing.T) { t.Skip("Skipping test in short, race, circleci and appveyor mode.") } - test := func(index string) { - store := NewStore(index) + test := func(t *testing.T, index string) { + store := NewStore(t, index) if err := store.Open(); err != nil { panic(err) } @@ -957,7 +957,7 @@ func TestStore_Cardinality_Unique(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } @@ -1027,8 +1027,8 @@ func TestStore_Cardinality_Duplicates(t *testing.T) { t.Skip("Skipping test in short, race, circleci and appveyor mode.") } - test := func(index string) { - store := NewStore(index) + test := func(t *testing.T, index string) { + store := NewStore(t, index) if err := store.Open(); err != nil { panic(err) } @@ -1037,7 +1037,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) { } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { test(t, index) }) } } @@ -1094,8 +1094,8 @@ func TestStore_Cardinality_Compactions(t *testing.T) { t.Skip("Skipping test in short, race, circleci and appveyor mode.") } - test := func(index string) error { - store := NewStore(index) + test := func(t *testing.T, index string) error { + store := NewStore(t, index) if err := store.Open(); err != nil { panic(err) } @@ -1105,7 +1105,7 @@ func TestStore_Cardinality_Compactions(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - if err := test(index); err != nil { + if err := test(t, index); err != nil { t.Fatal(err) } }) @@ -1156,8 +1156,8 @@ func TestStore_Sketches(t *testing.T) { return nil } - test := func(index string) error { - store := MustOpenStore(index) + test := func(t *testing.T, index string) error { + store := MustOpenStore(t, index) defer store.Close() // Generate point data to write to the shards. @@ -1186,7 +1186,7 @@ func TestStore_Sketches(t *testing.T) { } // Reopen the store. - if err := store.Reopen(); err != nil { + if err := store.Reopen(t); err != nil { return err } @@ -1216,7 +1216,7 @@ func TestStore_Sketches(t *testing.T) { } // Reopen the store. - if err := store.Reopen(); err != nil { + if err := store.Reopen(t); err != nil { return err } @@ -1231,7 +1231,7 @@ func TestStore_Sketches(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - if err := test(index); err != nil { + if err := test(t, index); err != nil { t.Fatal(err) } }) @@ -1323,8 +1323,8 @@ func TestStore_TagValues(t *testing.T) { } var s *Store - setup := func(index string) []uint64 { // returns shard ids - s = MustOpenStore(index) + setup := func(t *testing.T, index string) []uint64 { // returns shard ids + s = MustOpenStore(t, index) fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d cpu1%[1]d,host=nofoo value=1 %[4]d @@ -1354,8 +1354,8 @@ func TestStore_TagValues(t *testing.T) { for _, example := range examples { for _, index := range tsdb.RegisteredIndexes() { - shardIDs := setup(index) t.Run(example.Name+"_"+index, func(t *testing.T) { + shardIDs := setup(t, index) got, err := s.TagValues(nil, shardIDs, example.Expr) if err != nil { t.Fatal(err) @@ -1373,8 +1373,8 @@ func TestStore_TagValues(t *testing.T) { func TestStore_Measurements_Auth(t *testing.T) { - test := func(index string) error { - s := MustOpenStore(index) + test := func(t *testing.T, index string) error { + s := MustOpenStore(t, index) defer s.Close() // Create shard #0 with data. @@ -1451,7 +1451,7 @@ func TestStore_Measurements_Auth(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - if err := test(index); err != nil { + if err := test(t, index); err != nil { t.Fatal(err) } }) @@ -1461,8 +1461,8 @@ func TestStore_Measurements_Auth(t *testing.T) { func TestStore_TagKeys_Auth(t *testing.T) { - test := func(index string) error { - s := MustOpenStore(index) + test := func(t *testing.T, index string) error { + s := MustOpenStore(t, index) defer s.Close() // Create shard #0 with data. @@ -1548,7 +1548,7 @@ func TestStore_TagKeys_Auth(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - if err := test(index); err != nil { + if err := test(t, index); err != nil { t.Fatal(err) } }) @@ -1558,8 +1558,8 @@ func TestStore_TagKeys_Auth(t *testing.T) { func TestStore_TagValues_Auth(t *testing.T) { - test := func(index string) error { - s := MustOpenStore(index) + test := func(t *testing.T, index string) error { + s := MustOpenStore(t, index) defer s.Close() // Create shard #0 with data. @@ -1657,7 +1657,7 @@ func TestStore_TagValues_Auth(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { - if err := test(index); err != nil { + if err := test(t, index); err != nil { t.Fatal(err) } }) @@ -1690,7 +1690,7 @@ func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues { func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() shardN := 10 @@ -1775,7 +1775,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() shardN := 10 @@ -1866,7 +1866,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() shardN := 10 @@ -1970,7 +1970,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) { for _, index := range tsdb.RegisteredIndexes() { - store := NewStore(index) + store := NewStore(b, index) if err := store.Open(); err != nil { panic(err) } @@ -2001,7 +2001,7 @@ func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen( func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) { var store *Store setup := func(index string) error { - store := MustOpenStore(index) + store := MustOpenStore(b, index) // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) @@ -2073,7 +2073,7 @@ func BenchmarkStore_TagValues(b *testing.B) { var s *Store setup := func(shards, measurements, tagValues int, index string, useRandom bool) []uint64 { // returns shard ids - s := NewStore(index) + s := NewStore(b, index) if err := s.Open(); err != nil { panic(err) } @@ -2179,7 +2179,9 @@ type Store struct { } // NewStore returns a new instance of Store with a temporary path. -func NewStore(index string) *Store { +func NewStore(tb testing.TB, index string) *Store { + tb.Helper() + path, err := ioutil.TempDir("", "influxdb-tsdb-") if err != nil { panic(err) @@ -2189,18 +2191,17 @@ func NewStore(index string) *Store { s.EngineOptions.IndexVersion = index s.EngineOptions.Config.WALDir = filepath.Join(path, "wal") s.EngineOptions.Config.TraceLoggingEnabled = true - - if testing.Verbose() { - s.WithLogger(logger.New(os.Stdout)) - } + s.WithLogger(zaptest.NewLogger(tb)) return s } // MustOpenStore returns a new, open Store using the specified index, // at a temporary path. -func MustOpenStore(index string) *Store { - s := NewStore(index) +func MustOpenStore(tb testing.TB, index string) *Store { + tb.Helper() + + s := NewStore(tb, index) if err := s.Open(); err != nil { panic(err) @@ -2209,7 +2210,9 @@ func MustOpenStore(index string) *Store { } // Reopen closes and reopens the store as a new store. -func (s *Store) Reopen() error { +func (s *Store) Reopen(tb testing.TB) error { + tb.Helper() + if err := s.Store.Close(); err != nil { return err } @@ -2218,10 +2221,8 @@ func (s *Store) Reopen() error { s.EngineOptions.IndexVersion = s.index s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal") s.EngineOptions.Config.TraceLoggingEnabled = true + s.WithLogger(zaptest.NewLogger(tb)) - if testing.Verbose() { - s.WithLogger(logger.New(os.Stdout)) - } return s.Store.Open() } diff --git a/v1/services/precreator/service_test.go b/v1/services/precreator/service_test.go index 412f55d7b3..071d01b573 100644 --- a/v1/services/precreator/service_test.go +++ b/v1/services/precreator/service_test.go @@ -2,14 +2,13 @@ package precreator_test import ( "context" - "os" "testing" "time" "github.com/influxdata/influxdb/v2/internal" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/toml" "github.com/influxdata/influxdb/v2/v1/services/precreator" + "go.uber.org/zap/zaptest" ) func TestShardPrecreation(t *testing.T) { @@ -25,7 +24,7 @@ func TestShardPrecreation(t *testing.T) { return nil } - s := NewTestService() + s := NewTestService(t) s.MetaClient = &mc if err := s.Open(context.Background()); err != nil { @@ -46,11 +45,13 @@ func TestShardPrecreation(t *testing.T) { } } -func NewTestService() *precreator.Service { +func NewTestService(tb testing.TB) *precreator.Service { + tb.Helper() + config := precreator.NewConfig() config.CheckInterval = toml.Duration(10 * time.Millisecond) s := precreator.NewService(config) - s.WithLogger(logger.New(os.Stderr)) + s.WithLogger(zaptest.NewLogger(tb)) return s } diff --git a/v1/services/retention/service_test.go b/v1/services/retention/service_test.go index 4f940b8b44..3cbb7c6103 100644 --- a/v1/services/retention/service_test.go +++ b/v1/services/retention/service_test.go @@ -1,7 +1,6 @@ package retention_test import ( - "bytes" "context" "fmt" "reflect" @@ -10,37 +9,39 @@ import ( "time" "github.com/influxdata/influxdb/v2/internal" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/toml" "github.com/influxdata/influxdb/v2/v1/services/meta" "github.com/influxdata/influxdb/v2/v1/services/retention" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" ) func TestService_OpenDisabled(t *testing.T) { // Opening a disabled service should be a no-op. c := retention.NewConfig() c.Enabled = false - s := NewService(c) + s := NewService(t, c) if err := s.Open(context.Background()); err != nil { t.Fatal(err) } - if s.LogBuf.String() != "" { - t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String()) + if s.LogBuf.Len() > 0 { + t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.All()) } } func TestService_OpenClose(t *testing.T) { // Opening a disabled service should be a no-op. - s := NewService(retention.NewConfig()) + s := NewService(t, retention.NewConfig()) ctx := context.Background() if err := s.Open(ctx); err != nil { t.Fatal(err) } - if s.LogBuf.String() == "" { + if s.LogBuf.Len() == 0 { t.Fatal("service didn't log anything on open") } @@ -117,7 +118,7 @@ func TestService_CheckShards(t *testing.T) { config := retention.NewConfig() config.CheckInterval = toml.Duration(10 * time.Millisecond) - s := NewService(config) + s := NewService(t, config) s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { return data } @@ -233,7 +234,7 @@ func TestService_8819_repro(t *testing.T) { func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) { c := retention.NewConfig() c.CheckInterval = toml.Duration(time.Millisecond) - s := NewService(c) + s := NewService(t, c) errC := make(chan error, 1) // Buffer Important to prevent deadlock. done := make(chan struct{}) @@ -379,19 +380,24 @@ type Service struct { MetaClient *internal.MetaClientMock TSDBStore *internal.TSDBStoreMock - LogBuf bytes.Buffer + LogBuf *observer.ObservedLogs *retention.Service } -func NewService(c retention.Config) *Service { +func NewService(tb testing.TB, c retention.Config) *Service { + tb.Helper() + s := &Service{ MetaClient: &internal.MetaClientMock{}, TSDBStore: &internal.TSDBStoreMock{}, Service: retention.NewService(c), } - l := logger.New(&s.LogBuf) - s.WithLogger(l) + logcore, logbuf := observer.New(zapcore.InfoLevel) + log := zap.New(logcore) + + s.LogBuf = logbuf + s.WithLogger(log) s.Service.MetaClient = s.MetaClient s.Service.TSDBStore = s.TSDBStore