refactor: simplify how we set the top-level influxd logger (#20374)

pull/19811/head
Daniel Moran 2020-12-21 11:15:08 -08:00 committed by dubsky
parent c67ecc4641
commit ade300839e
14 changed files with 139 additions and 228 deletions

View File

@ -3,6 +3,7 @@ package launcher
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"time" "time"
@ -12,6 +13,7 @@ import (
"github.com/influxdata/influxdb/v2/internal/fs" "github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/kit/cli" "github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/kit/signals" "github.com/influxdata/influxdb/v2/kit/signals"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/storage" "github.com/influxdata/influxdb/v2/storage"
"github.com/influxdata/influxdb/v2/v1/coordinator" "github.com/influxdata/influxdb/v2/v1/coordinator"
"github.com/influxdata/influxdb/v2/vault" "github.com/influxdata/influxdb/v2/vault"
@ -72,6 +74,17 @@ func cmdRunE(ctx context.Context, o *InfluxdOpts) func() error {
l := NewLauncher() l := NewLauncher()
// Create top level logger
logconf := &influxlogger.Config{
Format: "auto",
Level: o.LogLevel,
}
logger, err := logconf.New(os.Stdout)
if err != nil {
return err
}
l.log = logger
// Start the launcher and wait for it to exit on SIGINT or SIGTERM. // Start the launcher and wait for it to exit on SIGINT or SIGTERM.
runCtx := signals.WithStandardSignals(ctx) runCtx := signals.WithStandardSignals(ctx)
if err := l.run(runCtx, o); err != nil { if err := l.run(runCtx, o); err != nil {
@ -83,9 +96,7 @@ func cmdRunE(ctx context.Context, o *InfluxdOpts) func() error {
// in-progress requests. // in-progress requests.
shutdownCtx, cancel := context.WithTimeout(ctx, 2*time.Second) shutdownCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel() defer cancel()
l.Shutdown(shutdownCtx) return l.Shutdown(shutdownCtx)
return nil
} }
} }

View File

@ -11,6 +11,7 @@ import (
_ "net/http/pprof" // needed to add pprof to our binary. _ "net/http/pprof" // needed to add pprof to our binary.
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"time" "time"
@ -40,7 +41,6 @@ import (
"github.com/influxdata/influxdb/v2/kv/migration" "github.com/influxdata/influxdb/v2/kv/migration"
"github.com/influxdata/influxdb/v2/kv/migration/all" "github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/label" "github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats" "github.com/influxdata/influxdb/v2/nats"
endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service" endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service"
ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service" ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service"
@ -122,9 +122,6 @@ type Launcher struct {
log *zap.Logger log *zap.Logger
reg *prom.Registry reg *prom.Registry
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
apibackend *http.APIBackend apibackend *http.APIBackend
} }
@ -133,15 +130,11 @@ type stoppingScheduler interface {
Stop() Stop()
} }
// NewLauncher returns a new instance of Launcher connected to standard in/out/err. // NewLauncher returns a new instance of Launcher with a no-op logger.
func NewLauncher() *Launcher { func NewLauncher() *Launcher {
l := &Launcher{ return &Launcher{
Stdin: os.Stdin, log: zap.NewNop(),
Stdout: os.Stdout,
Stderr: os.Stderr,
} }
return l
} }
// Registry returns the prometheus metrics registry. // Registry returns the prometheus metrics registry.
@ -166,9 +159,12 @@ func (m *Launcher) Engine() Engine {
} }
// Shutdown shuts down the HTTP server and waits for all services to clean up. // Shutdown shuts down the HTTP server and waits for all services to clean up.
func (m *Launcher) Shutdown(ctx context.Context) { func (m *Launcher) Shutdown(ctx context.Context) error {
var errs []string
if err := m.httpServer.Shutdown(ctx); err != nil { if err := m.httpServer.Shutdown(ctx); err != nil {
m.log.Error("Failed to close HTTP server", zap.Error(err)) m.log.Error("Failed to close HTTP server", zap.Error(err))
errs = append(errs, err.Error())
} }
m.log.Info("Stopping", zap.String("service", "task")) m.log.Info("Stopping", zap.String("service", "task"))
@ -180,28 +176,39 @@ func (m *Launcher) Shutdown(ctx context.Context) {
m.log.Info("Stopping", zap.String("service", "bolt")) m.log.Info("Stopping", zap.String("service", "bolt"))
if err := m.boltClient.Close(); err != nil { if err := m.boltClient.Close(); err != nil {
m.log.Info("Failed closing bolt", zap.Error(err)) m.log.Error("Failed closing bolt", zap.Error(err))
errs = append(errs, err.Error())
} }
m.log.Info("Stopping", zap.String("service", "query")) m.log.Info("Stopping", zap.String("service", "query"))
if err := m.queryController.Shutdown(ctx); err != nil && err != context.Canceled { if err := m.queryController.Shutdown(ctx); err != nil && err != context.Canceled {
m.log.Info("Failed closing query service", zap.Error(err)) m.log.Error("Failed closing query service", zap.Error(err))
errs = append(errs, err.Error())
} }
m.log.Info("Stopping", zap.String("service", "storage-engine")) m.log.Info("Stopping", zap.String("service", "storage-engine"))
if err := m.engine.Close(); err != nil { if err := m.engine.Close(); err != nil {
m.log.Error("Failed to close engine", zap.Error(err)) m.log.Error("Failed to close engine", zap.Error(err))
errs = append(errs, err.Error())
} }
m.wg.Wait() m.wg.Wait()
if m.jaegerTracerCloser != nil { if m.jaegerTracerCloser != nil {
if err := m.jaegerTracerCloser.Close(); err != nil { if err := m.jaegerTracerCloser.Close(); err != nil {
m.log.Warn("Failed to closer Jaeger tracer", zap.Error(err)) m.log.Error("Failed to closer Jaeger tracer", zap.Error(err))
errs = append(errs, err.Error())
} }
} }
m.log.Sync() if err := m.log.Sync(); err != nil {
errs = append(errs, err.Error())
}
if len(errs) > 0 {
return fmt.Errorf("failed to shut down server: [%s]", strings.Join(errs, ","))
}
return nil
} }
// Cancel executes the context cancel on the program. Used for testing. // Cancel executes the context cancel on the program. Used for testing.
@ -213,18 +220,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
ctx, m.cancel = context.WithCancel(ctx) ctx, m.cancel = context.WithCancel(ctx)
if m.log == nil {
// Create top level logger
logconf := &influxlogger.Config{
Format: "auto",
Level: opts.LogLevel,
}
m.log, err = logconf.New(m.Stdout)
if err != nil {
return err
}
}
info := platform.GetBuildInfo() info := platform.GetBuildInfo()
m.log.Info("Welcome to InfluxDB", m.log.Info("Welcome to InfluxDB",
zap.String("version", info.Version), zap.String("version", info.Version),

View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
nethttp "net/http" nethttp "net/http"
"os" "os"
@ -32,6 +31,7 @@ import (
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zaptest"
) )
// TestLauncher is a test wrapper for launcher.Launcher. // TestLauncher is a test wrapper for launcher.Launcher.
@ -49,26 +49,30 @@ type TestLauncher struct {
httpClient *httpc.Client httpClient *httpc.Client
// Standard in/out/err buffers.
Stdin bytes.Buffer
Stdout bytes.Buffer
Stderr bytes.Buffer
// Flag to act as standard server: disk store, no-e2e testing flag // Flag to act as standard server: disk store, no-e2e testing flag
realServer bool realServer bool
} }
// NewTestLauncher returns a new instance of TestLauncher. // RunAndSetupNewLauncherOrFail shorcuts the most common pattern used in testing,
func NewTestLauncher(flagger feature.Flagger) *TestLauncher { // building a new TestLauncher, running it, and setting it up with an initial user.
l := &TestLauncher{Launcher: NewLauncher()} func RunAndSetupNewLauncherOrFail(ctx context.Context, tb testing.TB, setters ...OptSetter) *TestLauncher {
l.Launcher.Stdin = &l.Stdin tb.Helper()
l.Launcher.Stdout = &l.Stdout
l.Launcher.Stderr = &l.Stderr l := NewTestLauncher()
l.Launcher.flagger = flagger l.RunOrFail(tb, ctx, setters...)
if testing.Verbose() { defer func() {
l.Launcher.Stdout = io.MultiWriter(l.Launcher.Stdout, os.Stdout) // If setup fails, shut down the launcher.
l.Launcher.Stderr = io.MultiWriter(l.Launcher.Stderr, os.Stderr) if tb.Failed() {
l.Shutdown(ctx)
} }
}()
l.SetupOrFail(tb)
return l
}
// NewTestLauncher returns a new instance of TestLauncher.
func NewTestLauncher() *TestLauncher {
l := &TestLauncher{Launcher: NewLauncher()}
path, err := ioutil.TempDir("", "") path, err := ioutil.TempDir("", "")
if err != nil { if err != nil {
@ -79,33 +83,28 @@ func NewTestLauncher(flagger feature.Flagger) *TestLauncher {
} }
// NewTestLauncherServer returns a new instance of TestLauncher configured as real server (disk store, no e2e flag). // NewTestLauncherServer returns a new instance of TestLauncher configured as real server (disk store, no e2e flag).
func NewTestLauncherServer(flagger feature.Flagger) *TestLauncher { func NewTestLauncherServer() *TestLauncher {
l := NewTestLauncher(flagger) l := NewTestLauncher()
l.realServer = true l.realServer = true
return l return l
} }
// RunTestLauncherOrFail initializes and starts the server. type OptSetter = func(o *InfluxdOpts)
func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, flagger feature.Flagger, setters ...OptSetter) *TestLauncher {
tb.Helper()
l := NewTestLauncher(flagger)
if err := l.Run(ctx, setters...); err != nil { func (tl *TestLauncher) SetFlagger(flagger feature.Flagger) {
tl.Launcher.flagger = flagger
}
// Run executes the program, failing the test if the launcher fails to start.
func (tl *TestLauncher) RunOrFail(tb testing.TB, ctx context.Context, setters ...OptSetter) {
if err := tl.Run(tb, ctx, setters...); err != nil {
tb.Fatal(err) tb.Fatal(err)
} }
return l
} }
// SetLogger sets the logger for the underlying program.
func (tl *TestLauncher) SetLogger(logger *zap.Logger) {
tl.Launcher.log = logger
}
type OptSetter = func(o *InfluxdOpts)
// Run executes the program with additional arguments to set paths and ports. // Run executes the program with additional arguments to set paths and ports.
// Passed arguments will overwrite/add to the default ones. // Passed arguments will overwrite/add to the default ones.
func (tl *TestLauncher) Run(ctx context.Context, setters ...OptSetter) error { func (tl *TestLauncher) Run(tb testing.TB, ctx context.Context, setters ...OptSetter) error {
opts := newOpts(viper.New()) opts := newOpts(viper.New())
if !tl.realServer { if !tl.realServer {
opts.StoreType = "memory" opts.StoreType = "memory"
@ -122,14 +121,16 @@ func (tl *TestLauncher) Run(ctx context.Context, setters ...OptSetter) error {
setter(opts) setter(opts)
} }
// Set up top-level logger to write into the test-case.
tl.Launcher.log = zaptest.NewLogger(tb, zaptest.Level(opts.LogLevel)).With(zap.String("test_name", tb.Name()))
return tl.Launcher.run(ctx, opts) return tl.Launcher.run(ctx, opts)
} }
// Shutdown stops the program and cleans up temporary paths. // Shutdown stops the program and cleans up temporary paths.
func (tl *TestLauncher) Shutdown(ctx context.Context) error { func (tl *TestLauncher) Shutdown(ctx context.Context) error {
defer os.RemoveAll(tl.Path)
tl.Cancel() tl.Cancel()
tl.Launcher.Shutdown(ctx) return tl.Launcher.Shutdown(ctx)
return os.RemoveAll(tl.Path)
} }
// ShutdownOrFail stops the program and cleans up temporary paths. Fail on error. // ShutdownOrFail stops the program and cleans up temporary paths. Fail on error.

View File

@ -18,11 +18,9 @@ import (
var ctx = context.Background() var ctx = context.Background()
func TestLauncher_Setup(t *testing.T) { func TestLauncher_Setup(t *testing.T) {
l := launcher.NewTestLauncher(nil) l := launcher.NewTestLauncher()
if err := l.Run(ctx); err != nil { l.RunOrFail(t, ctx)
t.Fatal(err) defer l.ShutdownOrFail(t, ctx)
}
defer l.Shutdown(ctx)
client, err := http.NewHTTPClient(l.URL(), "", false) client, err := http.NewHTTPClient(l.URL(), "", false)
if err != nil { if err != nil {
@ -51,8 +49,7 @@ func TestLauncher_Setup(t *testing.T) {
// This is to mimic chronograf using cookies as sessions // This is to mimic chronograf using cookies as sessions
// rather than authorizations // rather than authorizations
func TestLauncher_SetupWithUsers(t *testing.T) { func TestLauncher_SetupWithUsers(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
r, err := nethttp.NewRequest("POST", l.URL()+"/api/v2/signin", nil) r, err := nethttp.NewRequest("POST", l.URL()+"/api/v2/signin", nil)

View File

@ -27,11 +27,11 @@ import (
var ctx = context.Background() var ctx = context.Background()
func TestLauncher_Pkger(t *testing.T) { func TestLauncher_Pkger(t *testing.T) {
l := RunTestLauncherOrFail(t, ctx, nil, func(o *InfluxdOpts) { l := RunAndSetupNewLauncherOrFail(ctx, t, func(o *InfluxdOpts) {
o.LogLevel = zap.ErrorLevel o.LogLevel = zap.ErrorLevel
}) })
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
require.NoError(t, l.BucketService(t).DeleteBucket(ctx, l.Bucket.ID)) require.NoError(t, l.BucketService(t).DeleteBucket(ctx, l.Bucket.ID))
svc := l.PkgerService(t) svc := l.PkgerService(t)

View File

@ -27,16 +27,13 @@ import (
"github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher" "github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
phttp "github.com/influxdata/influxdb/v2/http" phttp "github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/prom" "github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query" "github.com/influxdata/influxdb/v2/query"
"go.uber.org/zap" "go.uber.org/zap"
) )
func TestLauncher_Write_Query_FieldKey(t *testing.T) { func TestLauncher_Write_Query_FieldKey(t *testing.T) {
be := launcher.RunTestLauncherOrFail(t, ctx, nil) be := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
be.SetupOrFail(t)
defer be.ShutdownOrFail(t, ctx) defer be.ShutdownOrFail(t, ctx)
resp, err := nethttp.DefaultClient.Do( resp, err := nethttp.DefaultClient.Do(
@ -81,8 +78,7 @@ mem,server=b value=45.2`))
// and checks that the queried results contain the expected number of tables // and checks that the queried results contain the expected number of tables
// and expected number of columns. // and expected number of columns.
func TestLauncher_WriteV2_Query(t *testing.T) { func TestLauncher_WriteV2_Query(t *testing.T) {
be := launcher.RunTestLauncherOrFail(t, ctx, nil) be := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
be.SetupOrFail(t)
defer be.ShutdownOrFail(t, ctx) defer be.ShutdownOrFail(t, ctx)
// The default gateway instance inserts some values directly such that ID lookups seem to break, // The default gateway instance inserts some values directly such that ID lookups seem to break,
@ -301,8 +297,7 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
for _, tc := range tcs { for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil, tc.setOpts) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, tc.setOpts)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
const tagValue = "t0" const tagValue = "t0"
@ -339,14 +334,13 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) { func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) {
t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\" on OK query") t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\" on OK query")
l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) { l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.LogLevel = zap.ErrorLevel o.LogLevel = zap.ErrorLevel
o.ConcurrencyQuota = 1 o.ConcurrencyQuota = 1
o.InitialMemoryBytesQuotaPerQuery = 100 o.InitialMemoryBytesQuotaPerQuery = 100
o.MemoryBytesQuotaPerQuery = 50000 o.MemoryBytesQuotaPerQuery = 50000
o.MaxMemoryBytes = 200000 o.MaxMemoryBytes = 200000
}) })
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
// One tag does not exceed memory. // One tag does not exceed memory.
@ -384,14 +378,13 @@ func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) {
func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) { func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) {
t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\"") t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\"")
l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) { l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.LogLevel = zap.ErrorLevel o.LogLevel = zap.ErrorLevel
o.ConcurrencyQuota = 1 o.ConcurrencyQuota = 1
o.InitialMemoryBytesQuotaPerQuery = 100 o.InitialMemoryBytesQuotaPerQuery = 100
o.MemoryBytesQuotaPerQuery = 50000 o.MemoryBytesQuotaPerQuery = 50000
o.MaxMemoryBytes = 200000 o.MaxMemoryBytes = 200000
}) })
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
const tag = "t0" const tag = "t0"
@ -428,7 +421,7 @@ func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) {
func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) { func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) {
t.Skip("this test is flaky, occasionally get error: \"dial tcp 127.0.0.1:59654: connect: connection reset by peer\"") t.Skip("this test is flaky, occasionally get error: \"dial tcp 127.0.0.1:59654: connect: connection reset by peer\"")
l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) { l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.LogLevel = zap.ErrorLevel o.LogLevel = zap.ErrorLevel
o.QueueSize = 1024 o.QueueSize = 1024
o.ConcurrencyQuota = 1 o.ConcurrencyQuota = 1
@ -436,7 +429,6 @@ func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) {
o.MemoryBytesQuotaPerQuery = 50000 o.MemoryBytesQuotaPerQuery = 50000
o.MaxMemoryBytes = 200000 o.MaxMemoryBytes = 200000
}) })
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
// One tag does not exceed memory. // One tag does not exceed memory.
@ -502,8 +494,7 @@ func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) {
} }
func TestLauncher_Query_LoadSecret_Success(t *testing.T) { func TestLauncher_Query_LoadSecret_Success(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
const key, value = "mytoken", "secrettoken" const key, value = "mytoken", "secrettoken"
@ -552,8 +543,7 @@ from(bucket: "%s")
} }
func TestLauncher_Query_LoadSecret_Forbidden(t *testing.T) { func TestLauncher_Query_LoadSecret_Forbidden(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
const key, value = "mytoken", "secrettoken" const key, value = "mytoken", "secrettoken"
@ -611,8 +601,7 @@ from(bucket: "%s")
// This will change once we make side effects drive execution and remove from/to concurrency in our e2e tests. // This will change once we make side effects drive execution and remove from/to concurrency in our e2e tests.
// See https://github.com/influxdata/flux/issues/1799. // See https://github.com/influxdata/flux/issues/1799.
func TestLauncher_DynamicQuery(t *testing.T) { func TestLauncher_DynamicQuery(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
l.WritePointsOrFail(t, ` l.WritePointsOrFail(t, `
@ -686,8 +675,7 @@ stream2 |> filter(fn: (r) => contains(value: r._value, set: col)) |> group() |>
} }
func TestLauncher_Query_ExperimentalTo(t *testing.T) { func TestLauncher_Query_ExperimentalTo(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
// Last row of data tests nil field value // Last row of data tests nil field value
@ -920,9 +908,7 @@ error2","query plan",109,110
for _, tc := range testcases { for _, tc := range testcases {
tc := tc tc := tc
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
l.WritePointsOrFail(t, strings.Join(tc.data, "\n")) l.WritePointsOrFail(t, strings.Join(tc.data, "\n"))
@ -2530,9 +2516,8 @@ from(bucket: v.bucket)
if tc.skip != "" { if tc.skip != "" {
t.Skip(tc.skip) t.Skip(tc.skip)
} }
l := launcher.RunTestLauncherOrFail(t, ctx, mock.NewFlagger(map[feature.Flag]interface{}{}))
l.SetupOrFail(t) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
l.WritePointsOrFail(t, strings.Join(tc.data, "\n")) l.WritePointsOrFail(t, strings.Join(tc.data, "\n"))
@ -2562,8 +2547,7 @@ from(bucket: v.bucket)
} }
func TestLauncher_Query_Buckets_MultiplePages(t *testing.T) { func TestLauncher_Query_Buckets_MultiplePages(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
// Create a large number of buckets. This is above the default // Create a large number of buckets. This is above the default

View File

@ -17,7 +17,9 @@ import (
) )
func TestStorage_WriteAndQuery(t *testing.T) { func TestStorage_WriteAndQuery(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.NewTestLauncher()
l.RunOrFail(t, ctx)
defer l.ShutdownOrFail(t, ctx)
org1 := l.OnBoardOrFail(t, &influxdb.OnboardingRequest{ org1 := l.OnBoardOrFail(t, &influxdb.OnboardingRequest{
User: "USER-1", User: "USER-1",
@ -32,8 +34,6 @@ func TestStorage_WriteAndQuery(t *testing.T) {
Bucket: "BUCKET", Bucket: "BUCKET",
}) })
defer l.ShutdownOrFail(t, ctx)
// Execute single write against the server. // Execute single write against the server.
l.WriteOrFail(t, org1, `m,k=v1 f=100i 946684800000000000`) l.WriteOrFail(t, org1, `m,k=v1 f=100i 946684800000000000`)
l.WriteOrFail(t, org2, `m,k=v2 f=200i 946684800000000000`) l.WriteOrFail(t, org2, `m,k=v2 f=200i 946684800000000000`)
@ -54,8 +54,7 @@ func TestStorage_WriteAndQuery(t *testing.T) {
} }
func TestLauncher_WriteAndQuery(t *testing.T) { func TestLauncher_WriteAndQuery(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
// Execute single write against the server. // Execute single write against the server.
@ -92,8 +91,7 @@ func TestLauncher_WriteAndQuery(t *testing.T) {
} }
func TestLauncher_BucketDelete(t *testing.T) { func TestLauncher_BucketDelete(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
// Execute single write against the server. // Execute single write against the server.
@ -158,8 +156,7 @@ func TestLauncher_BucketDelete(t *testing.T) {
} }
func TestLauncher_DeleteWithPredicate(t *testing.T) { func TestLauncher_DeleteWithPredicate(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
// Write data to server. // Write data to server.
@ -203,8 +200,7 @@ func TestLauncher_DeleteWithPredicate(t *testing.T) {
} }
func TestLauncher_UpdateRetentionPolicy(t *testing.T) { func TestLauncher_UpdateRetentionPolicy(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
bucket, err := l.BucketService(t).FindBucket(ctx, influxdb.BucketFilter{ID: &l.Bucket.ID}) bucket, err := l.BucketService(t).FindBucket(ctx, influxdb.BucketFilter{ID: &l.Bucket.ID})

View File

@ -33,7 +33,7 @@ func TestUpgradeRealDB(t *testing.T) {
err = testutil.Unzip("testdata/v1db.zip", tmpdir) err = testutil.Unzip("testdata/v1db.zip", tmpdir)
require.Nil(t, err) require.Nil(t, err)
tl := launcher.NewTestLauncherServer(nil) tl := launcher.NewTestLauncherServer()
defer tl.ShutdownOrFail(t, ctx) defer tl.ShutdownOrFail(t, ctx)
boltPath := filepath.Join(tl.Path, bolt.DefaultFilename) boltPath := filepath.Join(tl.Path, bolt.DefaultFilename)
@ -174,7 +174,7 @@ func TestUpgradeRealDB(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
// start server // start server
err = tl.Run(ctx) err = tl.Run(t, ctx)
require.Nil(t, err) require.Nil(t, err)
respBody := mustRunQuery(t, tl, "test", "select count(avg) from stat") respBody := mustRunQuery(t, tl, "test", "select count(avg) from stat")

View File

@ -11,20 +11,22 @@ import (
"time" "time"
"github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
icontext "github.com/influxdata/influxdb/v2/context" icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/tests" "github.com/influxdata/influxdb/v2/tests"
"github.com/influxdata/influxdb/v2/tests/pipeline" "github.com/influxdata/influxdb/v2/tests/pipeline"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
) )
func OpenServer(t *testing.T, extra ...tests.PipelineOption) *tests.DefaultPipeline { func OpenServer(t *testing.T, extra ...launcher.OptSetter) *tests.DefaultPipeline {
t.Helper() t.Helper()
defaults := []tests.PipelineOption{ defaults := []launcher.OptSetter{
tests.WithLogger(zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel))), func(o *launcher.InfluxdOpts) {
o.LogLevel = zapcore.ErrorLevel
},
} }
p := tests.NewDefaultPipeline(t, append(defaults, extra...)...) p := tests.NewDefaultPipeline(t, append(defaults, extra...)...)

View File

@ -9,8 +9,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tests"
) )
const ( const (
@ -635,7 +635,9 @@ func TestServer_Query_Count(t *testing.T) {
// Ensure the server can limit concurrent series. // Ensure the server can limit concurrent series.
func TestServer_Query_MaxSelectSeriesN(t *testing.T) { func TestServer_Query_MaxSelectSeriesN(t *testing.T) {
t.Parallel() t.Parallel()
s := OpenServer(t, tests.WithInfluxQLMaxSelectSeriesN(3)) s := OpenServer(t, func(o *launcher.InfluxdOpts) {
o.CoordinatorConfig.MaxSelectSeriesN = 3
})
defer s.Close() defer s.Close()
test := NewTest("db0", "rp0") test := NewTest("db0", "rp0")
@ -6242,7 +6244,9 @@ func TestServer_Query_With_EmptyTags(t *testing.T) {
func TestServer_Query_ImplicitFill(t *testing.T) { func TestServer_Query_ImplicitFill(t *testing.T) {
t.Parallel() t.Parallel()
s := OpenServer(t, tests.WithInfluxQLMaxSelectBucketsN(5)) s := OpenServer(t, func(o *launcher.InfluxdOpts) {
o.CoordinatorConfig.MaxSelectBucketsN = 5
})
defer s.Close() defer s.Close()
writes := []string{ writes := []string{

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
icontext "github.com/influxdata/influxdb/v2/context" icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/mock"
datagen "github.com/influxdata/influxdb/v2/pkg/data/gen" datagen "github.com/influxdata/influxdb/v2/pkg/data/gen"
@ -21,7 +22,6 @@ import (
"github.com/influxdata/influxdb/v2/tests/pipeline" "github.com/influxdata/influxdb/v2/tests/pipeline"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@ -91,9 +91,9 @@ func testSuiteFromPath(t *testing.T, path string) *TestSuite {
func validate(t *testing.T, gf *TestSuite) { func validate(t *testing.T, gf *TestSuite) {
t.Helper() t.Helper()
ctx := context.Background() ctx := context.Background()
p := tests.NewDefaultPipeline(t, p := tests.NewDefaultPipeline(t, func(o *launcher.InfluxdOpts) {
tests.WithLogger(zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel))), o.LogLevel = zapcore.ErrorLevel
) })
p.MustOpen() p.MustOpen()
defer p.MustClose() defer p.MustClose()
orgID := p.DefaultOrgID orgID := p.DefaultOrgID

View File

@ -88,10 +88,15 @@ func BenchmarkFluxEndToEnd(b *testing.B) {
} }
func runEndToEnd(t *testing.T, pkgs []*ast.Package) { func runEndToEnd(t *testing.T, pkgs []*ast.Package) {
l := launcher.NewTestLauncher()
flagger := newFlagger(itesting.FluxEndToEndFeatureFlags) flagger := newFlagger(itesting.FluxEndToEndFeatureFlags)
l := launcher.RunTestLauncherOrFail(t, ctx, flagger) l.SetFlagger(flagger)
l.SetupOrFail(t)
l.RunOrFail(t, ctx)
defer l.ShutdownOrFail(t, ctx) defer l.ShutdownOrFail(t, ctx)
l.SetupOrFail(t)
for _, pkg := range pkgs { for _, pkg := range pkgs {
test := func(t *testing.T, f func(t *testing.T)) { test := func(t *testing.T, f func(t *testing.T)) {
t.Run(pkg.Path, f) t.Run(pkg.Path, f)

View File

@ -6,9 +6,7 @@ import (
"github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher" "github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zaptest"
) )
// A Pipeline is responsible for configuring launcher.TestLauncher // A Pipeline is responsible for configuring launcher.TestLauncher
@ -22,57 +20,38 @@ type Pipeline struct {
DefaultUserID influxdb.ID DefaultUserID influxdb.ID
} }
// pipelineConfig tracks the pre-configuration for a pipeline.
type pipelineConfig struct {
logger *zap.Logger
}
// NewDefaultPipeline creates a Pipeline with default // NewDefaultPipeline creates a Pipeline with default
// values. // values.
// //
// It is retained for compatibility with cloud tests. // It is retained for compatibility with cloud tests.
func NewDefaultPipeline(t *testing.T, opts ...PipelineOption) *DefaultPipeline { func NewDefaultPipeline(t *testing.T, opts ...launcher.OptSetter) *DefaultPipeline {
opts = append(WithDefaults(), opts...) setDefaultLogLevel := func(o *launcher.InfluxdOpts) {
// This is left here mainly for retro compatibility
if VeryVerbose {
o.LogLevel = zap.DebugLevel
} else {
o.LogLevel = zap.InfoLevel
}
}
// Set the default log level as the FIRST option here so users can override
// it with passed-in setters.
opts = append([]launcher.OptSetter{setDefaultLogLevel}, opts...)
return &DefaultPipeline{Pipeline: NewPipeline(t, opts...)} return &DefaultPipeline{Pipeline: NewPipeline(t, opts...)}
} }
// NewPipeline returns a pipeline with the given options applied to the configuration as appropriate. // NewPipeline returns a pipeline with the given options applied to the configuration as appropriate.
// //
// A single user, org, bucket and token are created. // A single user, org, bucket and token are created.
func NewPipeline(tb testing.TB, opts ...PipelineOption) *Pipeline { func NewPipeline(tb testing.TB, opts ...launcher.OptSetter) *Pipeline {
tb.Helper() tb.Helper()
var conf pipelineConfig tl := launcher.NewTestLauncher()
for _, o := range opts {
o.applyConfig(&conf)
}
logger := conf.logger
if logger == nil {
// This is left here mainly for retro compatibility
var logLevel zaptest.LoggerOption
if VeryVerbose {
logLevel = zaptest.Level(zap.DebugLevel)
} else {
logLevel = zaptest.Level(zap.InfoLevel)
}
logger = zaptest.NewLogger(tb, logLevel).With(zap.String("test_name", tb.Name()))
}
tl := launcher.NewTestLauncher(nil)
tl.SetLogger(logger)
p := &Pipeline{ p := &Pipeline{
Launcher: tl, Launcher: tl,
} }
err := tl.Run(context.Background(), func(o *launcher.InfluxdOpts) { tl.RunOrFail(tb, context.Background(), opts...)
for _, opt := range opts {
opt.applyOptSetter(o)
}
})
require.NoError(tb, err)
// setup default operator // setup default operator
res := p.Launcher.OnBoardOrFail(tb, &influxdb.OnboardingRequest{ res := p.Launcher.OnBoardOrFail(tb, &influxdb.OnboardingRequest{

View File

@ -1,63 +0,0 @@
package tests
import (
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"go.uber.org/zap"
)
// PipelineOption configures a pipeline.
type PipelineOption interface {
applyConfig(*pipelineConfig)
applyOptSetter(*launcher.InfluxdOpts)
}
type pipelineOption struct {
applyConfigFn func(*pipelineConfig)
optSetter launcher.OptSetter
}
var _ PipelineOption = pipelineOption{}
func (o pipelineOption) applyConfig(pc *pipelineConfig) {
if o.applyConfigFn != nil {
o.applyConfigFn(pc)
}
}
func (o pipelineOption) applyOptSetter(opts *launcher.InfluxdOpts) {
if o.optSetter != nil {
o.optSetter(opts)
}
}
// WithDefaults returns a slice of options for a default pipeline.
func WithDefaults() []PipelineOption {
return []PipelineOption{}
}
// WithLogger sets the logger for the pipeline itself, and the underlying launcher.
func WithLogger(logger *zap.Logger) PipelineOption {
return pipelineOption{
applyConfigFn: func(config *pipelineConfig) {
config.logger = logger
},
}
}
// WithInfluxQLMaxSelectSeriesN configures the maximum number of series returned by a select statement.
func WithInfluxQLMaxSelectSeriesN(n int) PipelineOption {
return pipelineOption{
optSetter: func(o *launcher.InfluxdOpts) {
o.CoordinatorConfig.MaxSelectSeriesN = n
},
}
}
// WithInfluxQLMaxSelectBucketsN configures the maximum number of buckets returned by a select statement.
func WithInfluxQLMaxSelectBucketsN(n int) PipelineOption {
return pipelineOption{
optSetter: func(o *launcher.InfluxdOpts) {
o.CoordinatorConfig.MaxSelectBucketsN = n
},
}
}