From 8e24128c5d40c32116e618ff41969a08bee88957 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Fri, 18 Dec 2020 10:55:29 -0800 Subject: [PATCH 1/4] build: save cache and artifacts after gotest in CI (#20370) --- .circleci/config.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index ed907243fc..f704f7c6b4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -263,6 +263,23 @@ jobs: TESTFILES=$(go list ./... | circleci tests split --split-by=timings) echo $TESTFILES GOTRACEBACK=all GO111MODULE=on ./env gotestsum --format standard-quiet --junitfile /tmp/test-results/gotestsum.xml -- -race -count=1 $TESTFILES + - save_cache: + name: Saving GOCACHE + key: influxdb-gocache-{{ .Branch }}-{{ .Revision }} + paths: + - /tmp/go-cache + when: always + - save_cache: + name: Saving GOPATH/pkg/mod + key: influxdb-gomod-{{ checksum "go.sum" }} + paths: + - /go/pkg/mod + when: always + - store_artifacts: # Upload test summary for display in Artifacts: https://circleci.com/docs/2.0/artifacts/ + path: /tmp/test-results + destination: raw-test-output + - store_test_results: # Upload test results for display in Test Summary: https://circleci.com/docs/2.0/collect-test-data/ + path: /tmp/test-results influxql_validation: docker: From 68f4df204f9ffda0fc8d977ca0a4b721dac55926 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Mon, 21 Dec 2020 08:44:50 -0800 Subject: [PATCH 2/4] fix(cmd/influx): improve CLI error returned on org-not-found (#20387) --- CHANGELOG.md | 1 + cmd/influx/main.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c049d019f..1a4dd1a797 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ 1. [20317](https://github.com/influxdata/influxdb/pull/20317): Don't ignore failures to set password during initial user onboarding. 1. [20362](https://github.com/influxdata/influxdb/pull/20362): Don't overwrite stack name/description on `influx stack update`. 1. [20355](https://github.com/influxdata/influxdb/pull/20355): Fix timeout setup for `influxd` graceful shutdown. +1. [20387](https://github.com/influxdata/influxdb/pull/20387): Improve error message shown when `influx` CLI can't find an org by name. ## v2.0.3 [2020-12-14] diff --git a/cmd/influx/main.go b/cmd/influx/main.go index f5729a9847..9ada530afe 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -515,7 +515,7 @@ func (o *organization) getID(orgSVC influxdb.OrganizationService) (influxdb.ID, if o.id != "" { influxOrgID, err := influxdb.IDFromString(o.id) if err != nil { - return 0, fmt.Errorf("invalid org ID '%s' provided: %w (did you pass an org name instead of an ID?)", o.id, err) + return 0, fmt.Errorf("invalid org ID '%s' provided (did you pass an org name instead of an ID?): %w", o.id, err) } return *influxOrgID, nil } @@ -525,7 +525,7 @@ func (o *organization) getID(orgSVC influxdb.OrganizationService) (influxdb.ID, Name: &name, }) if err != nil { - return 0, err + return 0, fmt.Errorf("failed to get ID for org '%s' (do you have org-level read permission?): %w", name, err) } return org.ID, nil } From 1b48c32b4349ed0f86e6c9c2f3e4d64fa491805d Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Mon, 21 Dec 2020 10:21:26 -0800 Subject: [PATCH 3/4] fix: don't duplicate text in task errors (#20380) --- CHANGELOG.md | 1 + task_errors.go | 25 ++++++++----------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a4dd1a797..50392cf19c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ 1. [20362](https://github.com/influxdata/influxdb/pull/20362): Don't overwrite stack name/description on `influx stack update`. 1. [20355](https://github.com/influxdata/influxdb/pull/20355): Fix timeout setup for `influxd` graceful shutdown. 1. [20387](https://github.com/influxdata/influxdb/pull/20387): Improve error message shown when `influx` CLI can't find an org by name. +1. [20380](https://github.com/influxdata/influxdb/pull/20380): Remove duplication from task error messages ## v2.0.3 [2020-12-14] diff --git a/task_errors.go b/task_errors.go index c02998bfb0..8c498d906c 100644 --- a/task_errors.go +++ b/task_errors.go @@ -94,7 +94,7 @@ var ( func ErrFluxParseError(err error) *Error { return &Error{ Code: EInvalid, - Msg: fmt.Sprintf("could not parse Flux script; Err: %v", err), + Msg: "could not parse Flux script", Op: "taskExecutor", Err: err, } @@ -104,7 +104,7 @@ func ErrFluxParseError(err error) *Error { func ErrQueryError(err error) *Error { return &Error{ Code: EInternal, - Msg: fmt.Sprintf("unexpected error from queryd; Err: %v", err), + Msg: "unexpected error from queryd", Op: "taskExecutor", Err: err, } @@ -114,7 +114,7 @@ func ErrQueryError(err error) *Error { func ErrResultIteratorError(err error) *Error { return &Error{ Code: EInvalid, - Msg: fmt.Sprintf("Error exhausting result iterator; Err: %v", err), + Msg: "error exhausting result iterator", Op: "taskExecutor", Err: err, } @@ -123,7 +123,7 @@ func ErrResultIteratorError(err error) *Error { func ErrInternalTaskServiceError(err error) *Error { return &Error{ Code: EInternal, - Msg: fmt.Sprintf("unexpected error in tasks; Err: %v", err), + Msg: "unexpected error in tasks", Op: "task", Err: err, } @@ -133,7 +133,7 @@ func ErrInternalTaskServiceError(err error) *Error { func ErrUnexpectedTaskBucketErr(err error) *Error { return &Error{ Code: EInternal, - Msg: fmt.Sprintf("unexpected error retrieving task bucket; Err: %v", err), + Msg: "unexpected error retrieving task bucket", Op: "taskBucket", Err: err, } @@ -143,7 +143,7 @@ func ErrUnexpectedTaskBucketErr(err error) *Error { func ErrTaskTimeParse(err error) *Error { return &Error{ Code: EInternal, - Msg: fmt.Sprintf("unexpected error parsing time; Err: %v", err), + Msg: "unexpected error parsing time", Op: "taskCron", Err: err, } @@ -152,25 +152,16 @@ func ErrTaskTimeParse(err error) *Error { func ErrTaskOptionParse(err error) *Error { return &Error{ Code: EInvalid, - Msg: fmt.Sprintf("invalid options; Err: %v", err), + Msg: "invalid options", Op: "taskOptions", Err: err, } } -func ErrJsonMarshalError(err error) *Error { - return &Error{ - Code: EInvalid, - Msg: fmt.Sprintf("unable to marshal JSON; Err: %v", err), - Op: "taskScheduler", - Err: err, - } -} - func ErrRunExecutionError(err error) *Error { return &Error{ Code: EInternal, - Msg: fmt.Sprintf("could not execute task run; Err: %v", err), + Msg: "could not execute task run", Op: "taskExecutor", Err: err, } From afdcb186552839dc52f696955036c6ecfc68e02a Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Mon, 21 Dec 2020 11:15:08 -0800 Subject: [PATCH 4/4] refactor: simplify how we set the top-level influxd logger (#20374) --- cmd/influxd/launcher/cmd.go | 17 +++++- cmd/influxd/launcher/launcher.go | 51 ++++++++--------- cmd/influxd/launcher/launcher_helpers.go | 67 ++++++++++++----------- cmd/influxd/launcher/launcher_test.go | 11 ++-- cmd/influxd/launcher/pkger_test.go | 4 +- cmd/influxd/launcher/query_test.go | 42 +++++--------- cmd/influxd/launcher/storage_test.go | 18 +++--- cmd/influxd/upgrade/database_test.go | 4 +- influxql/_v1tests/server_helpers.go | 10 ++-- influxql/_v1tests/server_test.go | 10 +++- influxql/_v1validation/validation_test.go | 8 +-- query/stdlib/testing/end_to_end_test.go | 9 ++- tests/pipeline_helpers.go | 53 ++++++------------ tests/pipeline_option.go | 63 --------------------- 14 files changed, 139 insertions(+), 228 deletions(-) delete mode 100644 tests/pipeline_option.go diff --git a/cmd/influxd/launcher/cmd.go b/cmd/influxd/launcher/cmd.go index 3681d41e48..fbcb0800c9 100644 --- a/cmd/influxd/launcher/cmd.go +++ b/cmd/influxd/launcher/cmd.go @@ -3,6 +3,7 @@ package launcher import ( "context" "fmt" + "os" "path/filepath" "strconv" "time" @@ -12,6 +13,7 @@ import ( "github.com/influxdata/influxdb/v2/internal/fs" "github.com/influxdata/influxdb/v2/kit/cli" "github.com/influxdata/influxdb/v2/kit/signals" + influxlogger "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/storage" "github.com/influxdata/influxdb/v2/v1/coordinator" "github.com/influxdata/influxdb/v2/vault" @@ -72,6 +74,17 @@ func cmdRunE(ctx context.Context, o *InfluxdOpts) func() error { l := NewLauncher() + // Create top level logger + logconf := &influxlogger.Config{ + Format: "auto", + Level: o.LogLevel, + } + logger, err := logconf.New(os.Stdout) + if err != nil { + return err + } + l.log = logger + // Start the launcher and wait for it to exit on SIGINT or SIGTERM. runCtx := signals.WithStandardSignals(ctx) if err := l.run(runCtx, o); err != nil { @@ -83,9 +96,7 @@ func cmdRunE(ctx context.Context, o *InfluxdOpts) func() error { // in-progress requests. shutdownCtx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - l.Shutdown(shutdownCtx) - - return nil + return l.Shutdown(shutdownCtx) } } diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index d6394a7f19..0373f3c2e4 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -11,6 +11,7 @@ import ( _ "net/http/pprof" // needed to add pprof to our binary. "os" "path/filepath" + "strings" "sync" "time" @@ -40,7 +41,6 @@ import ( "github.com/influxdata/influxdb/v2/kv/migration" "github.com/influxdata/influxdb/v2/kv/migration/all" "github.com/influxdata/influxdb/v2/label" - influxlogger "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/nats" endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service" ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service" @@ -122,9 +122,6 @@ type Launcher struct { log *zap.Logger reg *prom.Registry - Stdin io.Reader - Stdout io.Writer - Stderr io.Writer apibackend *http.APIBackend } @@ -133,15 +130,11 @@ type stoppingScheduler interface { Stop() } -// NewLauncher returns a new instance of Launcher connected to standard in/out/err. +// NewLauncher returns a new instance of Launcher with a no-op logger. func NewLauncher() *Launcher { - l := &Launcher{ - Stdin: os.Stdin, - Stdout: os.Stdout, - Stderr: os.Stderr, + return &Launcher{ + log: zap.NewNop(), } - - return l } // Registry returns the prometheus metrics registry. @@ -166,9 +159,12 @@ func (m *Launcher) Engine() Engine { } // Shutdown shuts down the HTTP server and waits for all services to clean up. -func (m *Launcher) Shutdown(ctx context.Context) { +func (m *Launcher) Shutdown(ctx context.Context) error { + var errs []string + if err := m.httpServer.Shutdown(ctx); err != nil { m.log.Error("Failed to close HTTP server", zap.Error(err)) + errs = append(errs, err.Error()) } m.log.Info("Stopping", zap.String("service", "task")) @@ -180,28 +176,39 @@ func (m *Launcher) Shutdown(ctx context.Context) { m.log.Info("Stopping", zap.String("service", "bolt")) if err := m.boltClient.Close(); err != nil { - m.log.Info("Failed closing bolt", zap.Error(err)) + m.log.Error("Failed closing bolt", zap.Error(err)) + errs = append(errs, err.Error()) } m.log.Info("Stopping", zap.String("service", "query")) if err := m.queryController.Shutdown(ctx); err != nil && err != context.Canceled { - m.log.Info("Failed closing query service", zap.Error(err)) + m.log.Error("Failed closing query service", zap.Error(err)) + errs = append(errs, err.Error()) } m.log.Info("Stopping", zap.String("service", "storage-engine")) if err := m.engine.Close(); err != nil { m.log.Error("Failed to close engine", zap.Error(err)) + errs = append(errs, err.Error()) } m.wg.Wait() if m.jaegerTracerCloser != nil { if err := m.jaegerTracerCloser.Close(); err != nil { - m.log.Warn("Failed to closer Jaeger tracer", zap.Error(err)) + m.log.Error("Failed to closer Jaeger tracer", zap.Error(err)) + errs = append(errs, err.Error()) } } - m.log.Sync() + if err := m.log.Sync(); err != nil { + errs = append(errs, err.Error()) + } + + if len(errs) > 0 { + return fmt.Errorf("failed to shut down server: [%s]", strings.Join(errs, ",")) + } + return nil } // Cancel executes the context cancel on the program. Used for testing. @@ -213,18 +220,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { ctx, m.cancel = context.WithCancel(ctx) - if m.log == nil { - // Create top level logger - logconf := &influxlogger.Config{ - Format: "auto", - Level: opts.LogLevel, - } - m.log, err = logconf.New(m.Stdout) - if err != nil { - return err - } - } - info := platform.GetBuildInfo() m.log.Info("Welcome to InfluxDB", zap.String("version", info.Version), diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index dc8ae1cc84..593ff36845 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "io" "io/ioutil" nethttp "net/http" "os" @@ -32,6 +31,7 @@ import ( "github.com/prometheus/common/expfmt" "github.com/spf13/viper" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) // TestLauncher is a test wrapper for launcher.Launcher. @@ -49,26 +49,30 @@ type TestLauncher struct { httpClient *httpc.Client - // Standard in/out/err buffers. - Stdin bytes.Buffer - Stdout bytes.Buffer - Stderr bytes.Buffer - // Flag to act as standard server: disk store, no-e2e testing flag realServer bool } +// RunAndSetupNewLauncherOrFail shorcuts the most common pattern used in testing, +// building a new TestLauncher, running it, and setting it up with an initial user. +func RunAndSetupNewLauncherOrFail(ctx context.Context, tb testing.TB, setters ...OptSetter) *TestLauncher { + tb.Helper() + + l := NewTestLauncher() + l.RunOrFail(tb, ctx, setters...) + defer func() { + // If setup fails, shut down the launcher. + if tb.Failed() { + l.Shutdown(ctx) + } + }() + l.SetupOrFail(tb) + return l +} + // NewTestLauncher returns a new instance of TestLauncher. -func NewTestLauncher(flagger feature.Flagger) *TestLauncher { +func NewTestLauncher() *TestLauncher { l := &TestLauncher{Launcher: NewLauncher()} - l.Launcher.Stdin = &l.Stdin - l.Launcher.Stdout = &l.Stdout - l.Launcher.Stderr = &l.Stderr - l.Launcher.flagger = flagger - if testing.Verbose() { - l.Launcher.Stdout = io.MultiWriter(l.Launcher.Stdout, os.Stdout) - l.Launcher.Stderr = io.MultiWriter(l.Launcher.Stderr, os.Stderr) - } path, err := ioutil.TempDir("", "") if err != nil { @@ -79,33 +83,28 @@ func NewTestLauncher(flagger feature.Flagger) *TestLauncher { } // NewTestLauncherServer returns a new instance of TestLauncher configured as real server (disk store, no e2e flag). -func NewTestLauncherServer(flagger feature.Flagger) *TestLauncher { - l := NewTestLauncher(flagger) +func NewTestLauncherServer() *TestLauncher { + l := NewTestLauncher() l.realServer = true return l } -// RunTestLauncherOrFail initializes and starts the server. -func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, flagger feature.Flagger, setters ...OptSetter) *TestLauncher { - tb.Helper() - l := NewTestLauncher(flagger) +type OptSetter = func(o *InfluxdOpts) - if err := l.Run(ctx, setters...); err != nil { +func (tl *TestLauncher) SetFlagger(flagger feature.Flagger) { + tl.Launcher.flagger = flagger +} + +// Run executes the program, failing the test if the launcher fails to start. +func (tl *TestLauncher) RunOrFail(tb testing.TB, ctx context.Context, setters ...OptSetter) { + if err := tl.Run(tb, ctx, setters...); err != nil { tb.Fatal(err) } - return l } -// SetLogger sets the logger for the underlying program. -func (tl *TestLauncher) SetLogger(logger *zap.Logger) { - tl.Launcher.log = logger -} - -type OptSetter = func(o *InfluxdOpts) - // Run executes the program with additional arguments to set paths and ports. // Passed arguments will overwrite/add to the default ones. -func (tl *TestLauncher) Run(ctx context.Context, setters ...OptSetter) error { +func (tl *TestLauncher) Run(tb testing.TB, ctx context.Context, setters ...OptSetter) error { opts := newOpts(viper.New()) if !tl.realServer { opts.StoreType = "memory" @@ -122,14 +121,16 @@ func (tl *TestLauncher) Run(ctx context.Context, setters ...OptSetter) error { setter(opts) } + // Set up top-level logger to write into the test-case. + tl.Launcher.log = zaptest.NewLogger(tb, zaptest.Level(opts.LogLevel)).With(zap.String("test_name", tb.Name())) return tl.Launcher.run(ctx, opts) } // Shutdown stops the program and cleans up temporary paths. func (tl *TestLauncher) Shutdown(ctx context.Context) error { + defer os.RemoveAll(tl.Path) tl.Cancel() - tl.Launcher.Shutdown(ctx) - return os.RemoveAll(tl.Path) + return tl.Launcher.Shutdown(ctx) } // ShutdownOrFail stops the program and cleans up temporary paths. Fail on error. diff --git a/cmd/influxd/launcher/launcher_test.go b/cmd/influxd/launcher/launcher_test.go index 9032c98037..e3bb964abe 100644 --- a/cmd/influxd/launcher/launcher_test.go +++ b/cmd/influxd/launcher/launcher_test.go @@ -18,11 +18,9 @@ import ( var ctx = context.Background() func TestLauncher_Setup(t *testing.T) { - l := launcher.NewTestLauncher(nil) - if err := l.Run(ctx); err != nil { - t.Fatal(err) - } - defer l.Shutdown(ctx) + l := launcher.NewTestLauncher() + l.RunOrFail(t, ctx) + defer l.ShutdownOrFail(t, ctx) client, err := http.NewHTTPClient(l.URL(), "", false) if err != nil { @@ -51,8 +49,7 @@ func TestLauncher_Setup(t *testing.T) { // This is to mimic chronograf using cookies as sessions // rather than authorizations func TestLauncher_SetupWithUsers(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) r, err := nethttp.NewRequest("POST", l.URL()+"/api/v2/signin", nil) diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 4881017f92..5e73a32b66 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -27,11 +27,11 @@ import ( var ctx = context.Background() func TestLauncher_Pkger(t *testing.T) { - l := RunTestLauncherOrFail(t, ctx, nil, func(o *InfluxdOpts) { + l := RunAndSetupNewLauncherOrFail(ctx, t, func(o *InfluxdOpts) { o.LogLevel = zap.ErrorLevel }) - l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) + require.NoError(t, l.BucketService(t).DeleteBucket(ctx, l.Bucket.ID)) svc := l.PkgerService(t) diff --git a/cmd/influxd/launcher/query_test.go b/cmd/influxd/launcher/query_test.go index d24ea75657..251b31ad50 100644 --- a/cmd/influxd/launcher/query_test.go +++ b/cmd/influxd/launcher/query_test.go @@ -27,16 +27,13 @@ import ( "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/cmd/influxd/launcher" phttp "github.com/influxdata/influxdb/v2/http" - "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kit/prom" - "github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/query" "go.uber.org/zap" ) func TestLauncher_Write_Query_FieldKey(t *testing.T) { - be := launcher.RunTestLauncherOrFail(t, ctx, nil) - be.SetupOrFail(t) + be := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer be.ShutdownOrFail(t, ctx) resp, err := nethttp.DefaultClient.Do( @@ -81,8 +78,7 @@ mem,server=b value=45.2`)) // and checks that the queried results contain the expected number of tables // and expected number of columns. func TestLauncher_WriteV2_Query(t *testing.T) { - be := launcher.RunTestLauncherOrFail(t, ctx, nil) - be.SetupOrFail(t) + be := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer be.ShutdownOrFail(t, ctx) // The default gateway instance inserts some values directly such that ID lookups seem to break, @@ -301,8 +297,7 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil, tc.setOpts) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, tc.setOpts) defer l.ShutdownOrFail(t, ctx) const tagValue = "t0" @@ -339,14 +334,13 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) { func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) { t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\" on OK query") - l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) { + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { o.LogLevel = zap.ErrorLevel o.ConcurrencyQuota = 1 o.InitialMemoryBytesQuotaPerQuery = 100 o.MemoryBytesQuotaPerQuery = 50000 o.MaxMemoryBytes = 200000 }) - l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) // One tag does not exceed memory. @@ -384,14 +378,13 @@ func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) { func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) { t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\"") - l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) { + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { o.LogLevel = zap.ErrorLevel o.ConcurrencyQuota = 1 o.InitialMemoryBytesQuotaPerQuery = 100 o.MemoryBytesQuotaPerQuery = 50000 o.MaxMemoryBytes = 200000 }) - l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) const tag = "t0" @@ -428,7 +421,7 @@ func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) { func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) { t.Skip("this test is flaky, occasionally get error: \"dial tcp 127.0.0.1:59654: connect: connection reset by peer\"") - l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) { + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { o.LogLevel = zap.ErrorLevel o.QueueSize = 1024 o.ConcurrencyQuota = 1 @@ -436,7 +429,6 @@ func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) { o.MemoryBytesQuotaPerQuery = 50000 o.MaxMemoryBytes = 200000 }) - l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) // One tag does not exceed memory. @@ -502,8 +494,7 @@ func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) { } func TestLauncher_Query_LoadSecret_Success(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) const key, value = "mytoken", "secrettoken" @@ -552,8 +543,7 @@ from(bucket: "%s") } func TestLauncher_Query_LoadSecret_Forbidden(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) const key, value = "mytoken", "secrettoken" @@ -611,8 +601,7 @@ from(bucket: "%s") // This will change once we make side effects drive execution and remove from/to concurrency in our e2e tests. // See https://github.com/influxdata/flux/issues/1799. func TestLauncher_DynamicQuery(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) l.WritePointsOrFail(t, ` @@ -686,8 +675,7 @@ stream2 |> filter(fn: (r) => contains(value: r._value, set: col)) |> group() |> } func TestLauncher_Query_ExperimentalTo(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) // Last row of data tests nil field value @@ -920,9 +908,7 @@ error2","query plan",109,110 for _, tc := range testcases { tc := tc t.Run(tc.name, func(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) l.WritePointsOrFail(t, strings.Join(tc.data, "\n")) @@ -2530,9 +2516,8 @@ from(bucket: v.bucket) if tc.skip != "" { t.Skip(tc.skip) } - l := launcher.RunTestLauncherOrFail(t, ctx, mock.NewFlagger(map[feature.Flag]interface{}{})) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) l.WritePointsOrFail(t, strings.Join(tc.data, "\n")) @@ -2562,8 +2547,7 @@ from(bucket: v.bucket) } func TestLauncher_Query_Buckets_MultiplePages(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) // Create a large number of buckets. This is above the default diff --git a/cmd/influxd/launcher/storage_test.go b/cmd/influxd/launcher/storage_test.go index d9913fa8c0..f028d01b8a 100644 --- a/cmd/influxd/launcher/storage_test.go +++ b/cmd/influxd/launcher/storage_test.go @@ -17,7 +17,9 @@ import ( ) func TestStorage_WriteAndQuery(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) + l := launcher.NewTestLauncher() + l.RunOrFail(t, ctx) + defer l.ShutdownOrFail(t, ctx) org1 := l.OnBoardOrFail(t, &influxdb.OnboardingRequest{ User: "USER-1", @@ -32,8 +34,6 @@ func TestStorage_WriteAndQuery(t *testing.T) { Bucket: "BUCKET", }) - defer l.ShutdownOrFail(t, ctx) - // Execute single write against the server. l.WriteOrFail(t, org1, `m,k=v1 f=100i 946684800000000000`) l.WriteOrFail(t, org2, `m,k=v2 f=200i 946684800000000000`) @@ -54,8 +54,7 @@ func TestStorage_WriteAndQuery(t *testing.T) { } func TestLauncher_WriteAndQuery(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) // Execute single write against the server. @@ -92,8 +91,7 @@ func TestLauncher_WriteAndQuery(t *testing.T) { } func TestLauncher_BucketDelete(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) // Execute single write against the server. @@ -158,8 +156,7 @@ func TestLauncher_BucketDelete(t *testing.T) { } func TestLauncher_DeleteWithPredicate(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) // Write data to server. @@ -203,8 +200,7 @@ func TestLauncher_DeleteWithPredicate(t *testing.T) { } func TestLauncher_UpdateRetentionPolicy(t *testing.T) { - l := launcher.RunTestLauncherOrFail(t, ctx, nil) - l.SetupOrFail(t) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) bucket, err := l.BucketService(t).FindBucket(ctx, influxdb.BucketFilter{ID: &l.Bucket.ID}) diff --git a/cmd/influxd/upgrade/database_test.go b/cmd/influxd/upgrade/database_test.go index eaf18f65a0..6a3e80db0b 100644 --- a/cmd/influxd/upgrade/database_test.go +++ b/cmd/influxd/upgrade/database_test.go @@ -33,7 +33,7 @@ func TestUpgradeRealDB(t *testing.T) { err = testutil.Unzip("testdata/v1db.zip", tmpdir) require.Nil(t, err) - tl := launcher.NewTestLauncherServer(nil) + tl := launcher.NewTestLauncherServer() defer tl.ShutdownOrFail(t, ctx) boltPath := filepath.Join(tl.Path, bolt.DefaultFilename) @@ -174,7 +174,7 @@ func TestUpgradeRealDB(t *testing.T) { require.Nil(t, err) // start server - err = tl.Run(ctx) + err = tl.Run(t, ctx) require.Nil(t, err) respBody := mustRunQuery(t, tl, "test", "select count(avg) from stat") diff --git a/influxql/_v1tests/server_helpers.go b/influxql/_v1tests/server_helpers.go index 0ec0598550..93dfbb335c 100644 --- a/influxql/_v1tests/server_helpers.go +++ b/influxql/_v1tests/server_helpers.go @@ -11,20 +11,22 @@ import ( "time" "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/cmd/influxd/launcher" icontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/tests" "github.com/influxdata/influxdb/v2/tests/pipeline" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest" ) -func OpenServer(t *testing.T, extra ...tests.PipelineOption) *tests.DefaultPipeline { +func OpenServer(t *testing.T, extra ...launcher.OptSetter) *tests.DefaultPipeline { t.Helper() - defaults := []tests.PipelineOption{ - tests.WithLogger(zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel))), + defaults := []launcher.OptSetter{ + func(o *launcher.InfluxdOpts) { + o.LogLevel = zapcore.ErrorLevel + }, } p := tests.NewDefaultPipeline(t, append(defaults, extra...)...) diff --git a/influxql/_v1tests/server_test.go b/influxql/_v1tests/server_test.go index 2453907d3d..b3ce2ee115 100644 --- a/influxql/_v1tests/server_test.go +++ b/influxql/_v1tests/server_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/v2/cmd/influxd/launcher" "github.com/influxdata/influxdb/v2/models" - "github.com/influxdata/influxdb/v2/tests" ) const ( @@ -635,7 +635,9 @@ func TestServer_Query_Count(t *testing.T) { // Ensure the server can limit concurrent series. func TestServer_Query_MaxSelectSeriesN(t *testing.T) { t.Parallel() - s := OpenServer(t, tests.WithInfluxQLMaxSelectSeriesN(3)) + s := OpenServer(t, func(o *launcher.InfluxdOpts) { + o.CoordinatorConfig.MaxSelectSeriesN = 3 + }) defer s.Close() test := NewTest("db0", "rp0") @@ -6242,7 +6244,9 @@ func TestServer_Query_With_EmptyTags(t *testing.T) { func TestServer_Query_ImplicitFill(t *testing.T) { t.Parallel() - s := OpenServer(t, tests.WithInfluxQLMaxSelectBucketsN(5)) + s := OpenServer(t, func(o *launcher.InfluxdOpts) { + o.CoordinatorConfig.MaxSelectBucketsN = 5 + }) defer s.Close() writes := []string{ diff --git a/influxql/_v1validation/validation_test.go b/influxql/_v1validation/validation_test.go index 454045d01b..64ac46a576 100644 --- a/influxql/_v1validation/validation_test.go +++ b/influxql/_v1validation/validation_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/cmd/influxd/launcher" icontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/mock" datagen "github.com/influxdata/influxdb/v2/pkg/data/gen" @@ -21,7 +22,6 @@ import ( "github.com/influxdata/influxdb/v2/tests/pipeline" "github.com/stretchr/testify/assert" "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest" "gopkg.in/yaml.v2" ) @@ -91,9 +91,9 @@ func testSuiteFromPath(t *testing.T, path string) *TestSuite { func validate(t *testing.T, gf *TestSuite) { t.Helper() ctx := context.Background() - p := tests.NewDefaultPipeline(t, - tests.WithLogger(zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel))), - ) + p := tests.NewDefaultPipeline(t, func(o *launcher.InfluxdOpts) { + o.LogLevel = zapcore.ErrorLevel + }) p.MustOpen() defer p.MustClose() orgID := p.DefaultOrgID diff --git a/query/stdlib/testing/end_to_end_test.go b/query/stdlib/testing/end_to_end_test.go index 4ff0f9b30b..a1281aadbe 100644 --- a/query/stdlib/testing/end_to_end_test.go +++ b/query/stdlib/testing/end_to_end_test.go @@ -88,10 +88,15 @@ func BenchmarkFluxEndToEnd(b *testing.B) { } func runEndToEnd(t *testing.T, pkgs []*ast.Package) { + l := launcher.NewTestLauncher() + flagger := newFlagger(itesting.FluxEndToEndFeatureFlags) - l := launcher.RunTestLauncherOrFail(t, ctx, flagger) - l.SetupOrFail(t) + l.SetFlagger(flagger) + + l.RunOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx) + l.SetupOrFail(t) + for _, pkg := range pkgs { test := func(t *testing.T, f func(t *testing.T)) { t.Run(pkg.Path, f) diff --git a/tests/pipeline_helpers.go b/tests/pipeline_helpers.go index c8bbea7ee4..7e5e13d907 100644 --- a/tests/pipeline_helpers.go +++ b/tests/pipeline_helpers.go @@ -6,9 +6,7 @@ import ( "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/cmd/influxd/launcher" - "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.uber.org/zap/zaptest" ) // A Pipeline is responsible for configuring launcher.TestLauncher @@ -22,57 +20,38 @@ type Pipeline struct { DefaultUserID influxdb.ID } -// pipelineConfig tracks the pre-configuration for a pipeline. -type pipelineConfig struct { - logger *zap.Logger -} - // NewDefaultPipeline creates a Pipeline with default // values. // // It is retained for compatibility with cloud tests. -func NewDefaultPipeline(t *testing.T, opts ...PipelineOption) *DefaultPipeline { - opts = append(WithDefaults(), opts...) +func NewDefaultPipeline(t *testing.T, opts ...launcher.OptSetter) *DefaultPipeline { + setDefaultLogLevel := func(o *launcher.InfluxdOpts) { + // This is left here mainly for retro compatibility + if VeryVerbose { + o.LogLevel = zap.DebugLevel + } else { + o.LogLevel = zap.InfoLevel + } + + } + // Set the default log level as the FIRST option here so users can override + // it with passed-in setters. + opts = append([]launcher.OptSetter{setDefaultLogLevel}, opts...) return &DefaultPipeline{Pipeline: NewPipeline(t, opts...)} } // NewPipeline returns a pipeline with the given options applied to the configuration as appropriate. // // A single user, org, bucket and token are created. -func NewPipeline(tb testing.TB, opts ...PipelineOption) *Pipeline { +func NewPipeline(tb testing.TB, opts ...launcher.OptSetter) *Pipeline { tb.Helper() - var conf pipelineConfig - for _, o := range opts { - o.applyConfig(&conf) - } - - logger := conf.logger - if logger == nil { - // This is left here mainly for retro compatibility - var logLevel zaptest.LoggerOption - if VeryVerbose { - logLevel = zaptest.Level(zap.DebugLevel) - } else { - logLevel = zaptest.Level(zap.InfoLevel) - } - - logger = zaptest.NewLogger(tb, logLevel).With(zap.String("test_name", tb.Name())) - } - - tl := launcher.NewTestLauncher(nil) - tl.SetLogger(logger) - + tl := launcher.NewTestLauncher() p := &Pipeline{ Launcher: tl, } - err := tl.Run(context.Background(), func(o *launcher.InfluxdOpts) { - for _, opt := range opts { - opt.applyOptSetter(o) - } - }) - require.NoError(tb, err) + tl.RunOrFail(tb, context.Background(), opts...) // setup default operator res := p.Launcher.OnBoardOrFail(tb, &influxdb.OnboardingRequest{ diff --git a/tests/pipeline_option.go b/tests/pipeline_option.go deleted file mode 100644 index 7ba11d5570..0000000000 --- a/tests/pipeline_option.go +++ /dev/null @@ -1,63 +0,0 @@ -package tests - -import ( - "github.com/influxdata/influxdb/v2/cmd/influxd/launcher" - "go.uber.org/zap" -) - -// PipelineOption configures a pipeline. -type PipelineOption interface { - applyConfig(*pipelineConfig) - applyOptSetter(*launcher.InfluxdOpts) -} - -type pipelineOption struct { - applyConfigFn func(*pipelineConfig) - optSetter launcher.OptSetter -} - -var _ PipelineOption = pipelineOption{} - -func (o pipelineOption) applyConfig(pc *pipelineConfig) { - if o.applyConfigFn != nil { - o.applyConfigFn(pc) - } -} - -func (o pipelineOption) applyOptSetter(opts *launcher.InfluxdOpts) { - if o.optSetter != nil { - o.optSetter(opts) - } -} - -// WithDefaults returns a slice of options for a default pipeline. -func WithDefaults() []PipelineOption { - return []PipelineOption{} -} - -// WithLogger sets the logger for the pipeline itself, and the underlying launcher. -func WithLogger(logger *zap.Logger) PipelineOption { - return pipelineOption{ - applyConfigFn: func(config *pipelineConfig) { - config.logger = logger - }, - } -} - -// WithInfluxQLMaxSelectSeriesN configures the maximum number of series returned by a select statement. -func WithInfluxQLMaxSelectSeriesN(n int) PipelineOption { - return pipelineOption{ - optSetter: func(o *launcher.InfluxdOpts) { - o.CoordinatorConfig.MaxSelectSeriesN = n - }, - } -} - -// WithInfluxQLMaxSelectBucketsN configures the maximum number of buckets returned by a select statement. -func WithInfluxQLMaxSelectBucketsN(n int) PipelineOption { - return pipelineOption{ - optSetter: func(o *launcher.InfluxdOpts) { - o.CoordinatorConfig.MaxSelectBucketsN = n - }, - } -}