refactor(cmd/influxd): separate CLI parsing from core launcher (#20109)

pull/20196/head
Daniel Moran 2020-11-30 09:38:27 -05:00 committed by GitHub
parent bac5515a0a
commit 81a671894d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 630 additions and 674 deletions

463
cmd/influxd/launcher/cmd.go Normal file
View File

@ -0,0 +1,463 @@
package launcher
import (
"context"
"fmt"
"path/filepath"
"strconv"
"time"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/fluxinit"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/kit/signals"
"github.com/influxdata/influxdb/v2/storage"
"github.com/influxdata/influxdb/v2/v1/coordinator"
"github.com/influxdata/influxdb/v2/vault"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap/zapcore"
)
const (
// Max Integer
MaxInt = 1<<uint(strconv.IntSize-1) - 1
)
// NewInfluxdCommand constructs the root of the influxd CLI, along with a `run` subcommand.
// The `run` subcommand is set as the default to execute.
func NewInfluxdCommand(ctx context.Context, v *viper.Viper) *cobra.Command {
o := newOpts(v)
prog := cli.Program{
Name: "influxd",
Run: cmdRunE(ctx, o),
}
cmd := cli.NewCommand(o.Viper, &prog)
runCmd := &cobra.Command{
Use: "run",
RunE: cmd.RunE,
Args: cobra.NoArgs,
}
for _, c := range []*cobra.Command{cmd, runCmd} {
setCmdDescriptions(c)
o.bindCliOpts(c)
}
cmd.AddCommand(runCmd)
return cmd
}
func setCmdDescriptions(cmd *cobra.Command) {
cmd.Short = "Start the influxd server"
cmd.Long = `
Start up the daemon configured with flags/env vars/config file.
The order of precedence for config options are as follows (1 highest, 3 lowest):
1. flags
2. env vars
3. config file
A config file can be provided via the INFLUXD_CONFIG_PATH env var. If a file is
not provided via an env var, influxd will look in the current directory for a
config.{json|toml|yaml|yml} file. If one does not exist, then it will continue unchanged.
`
}
func cmdRunE(ctx context.Context, o *InfluxdOpts) func() error {
return func() error {
fluxinit.FluxInit()
// exit with SIGINT and SIGTERM
ctx = signals.WithStandardSignals(ctx)
l := NewLauncher()
if err := l.run(ctx, o); err != nil {
return err
}
<-ctx.Done()
// Attempt clean shutdown.
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
l.Shutdown(ctx)
return nil
}
}
// InfluxdOpts captures all arguments for running the InfluxDB server.
type InfluxdOpts struct {
Testing bool
TestingAlwaysAllowSetup bool
LogLevel string
TracingType string
ReportingDisabled bool
AssetsPath string
BoltPath string
EnginePath string
StoreType string
SecretStore string
VaultConfig vault.Config
HttpBindAddress string
HttpTLSCert string
HttpTLSKey string
HttpTLSMinVersion string
HttpTLSStrictCiphers bool
SessionLength int // in minutes
SessionRenewDisabled bool
NoTasks bool
FeatureFlags map[string]string
// Query options.
ConcurrencyQuota int32
InitialMemoryBytesQuotaPerQuery int64
MemoryBytesQuotaPerQuery int64
MaxMemoryBytes int64
QueueSize int32
CoordinatorConfig coordinator.Config
// Storage options.
StorageConfig storage.Config
Viper *viper.Viper
}
// newOpts constructs options with default values.
func newOpts(viper *viper.Viper) *InfluxdOpts {
dir, err := fs.InfluxDir()
if err != nil {
panic(fmt.Errorf("failed to determine influx directory: %v", err))
}
return &InfluxdOpts{
Viper: viper,
StorageConfig: storage.NewConfig(),
CoordinatorConfig: coordinator.NewConfig(),
LogLevel: zapcore.InfoLevel.String(),
ReportingDisabled: false,
BoltPath: filepath.Join(dir, bolt.DefaultFilename),
EnginePath: filepath.Join(dir, "engine"),
HttpBindAddress: ":8086",
HttpTLSMinVersion: "1.2",
HttpTLSStrictCiphers: false,
SessionLength: 60, // 60 minutes
SessionRenewDisabled: false,
StoreType: BoltStore,
SecretStore: BoltStore,
NoTasks: false,
ConcurrencyQuota: 10,
InitialMemoryBytesQuotaPerQuery: 0,
MemoryBytesQuotaPerQuery: MaxInt,
MaxMemoryBytes: 0,
QueueSize: 10,
Testing: false,
TestingAlwaysAllowSetup: false,
}
}
// bindCliOpts configures a cobra command to set server options based on CLI args.
func (o *InfluxdOpts) bindCliOpts(cmd *cobra.Command) {
opts := []cli.Opt{
{
DestP: &o.LogLevel,
Flag: "log-level",
Default: o.LogLevel,
Desc: "supported log levels are debug, info, and error",
},
{
DestP: &o.TracingType,
Flag: "tracing-type",
Desc: fmt.Sprintf("supported tracing types are %s, %s", LogTracing, JaegerTracing),
},
{
DestP: &o.HttpBindAddress,
Flag: "http-bind-address",
Default: o.HttpBindAddress,
Desc: "bind address for the REST HTTP API",
},
{
DestP: &o.BoltPath,
Flag: "bolt-path",
Default: o.BoltPath,
Desc: "path to boltdb database",
},
{
DestP: &o.AssetsPath,
Flag: "assets-path",
Desc: "override default assets by serving from a specific directory (developer mode)",
},
{
DestP: &o.StoreType,
Flag: "store",
Default: o.StoreType,
Desc: "backing store for REST resources (bolt or memory)",
},
{
DestP: &o.Testing,
Flag: "e2e-testing",
Default: o.Testing,
Desc: "add /debug/flush endpoint to clear stores; used for end-to-end tests",
},
{
DestP: &o.TestingAlwaysAllowSetup,
Flag: "testing-always-allow-setup",
Default: o.TestingAlwaysAllowSetup,
Desc: "ensures the /api/v2/setup endpoint always returns true to allow onboarding",
},
{
DestP: &o.EnginePath,
Flag: "engine-path",
Default: o.EnginePath,
Desc: "path to persistent engine files",
},
{
DestP: &o.SecretStore,
Flag: "secret-store",
Default: o.SecretStore,
Desc: "data store for secrets (bolt or vault)",
},
{
DestP: &o.ReportingDisabled,
Flag: "reporting-disabled",
Default: o.ReportingDisabled,
Desc: "disable sending telemetry data to https://telemetry.influxdata.com every 8 hours",
},
{
DestP: &o.SessionLength,
Flag: "session-length",
Default: o.SessionLength,
Desc: "ttl in minutes for newly created sessions",
},
{
DestP: &o.SessionRenewDisabled,
Flag: "session-renew-disabled",
Default: o.SessionRenewDisabled,
Desc: "disables automatically extending session ttl on request",
},
{
DestP: &o.VaultConfig.Address,
Flag: "vault-addr",
Desc: "address of the Vault server expressed as a URL and port, for example: https://127.0.0.1:8200/.",
},
{
DestP: &o.VaultConfig.ClientTimeout,
Flag: "vault-client-timeout",
Desc: "timeout variable. The default value is 60s.",
},
{
DestP: &o.VaultConfig.MaxRetries,
Flag: "vault-max-retries",
Desc: "maximum number of retries when a 5xx error code is encountered. The default is 2, for three total attempts. Set this to 0 or less to disable retrying.",
},
{
DestP: &o.VaultConfig.CACert,
Flag: "vault-cacert",
Desc: "path to a PEM-encoded CA certificate file on the local disk. This file is used to verify the Vault server's SSL certificate. This environment variable takes precedence over VAULT_CAPATH.",
},
{
DestP: &o.VaultConfig.CAPath,
Flag: "vault-capath",
Desc: "path to a directory of PEM-encoded CA certificate files on the local disk. These certificates are used to verify the Vault server's SSL certificate.",
},
{
DestP: &o.VaultConfig.ClientCert,
Flag: "vault-client-cert",
Desc: "path to a PEM-encoded client certificate on the local disk. This file is used for TLS communication with the Vault server.",
},
{
DestP: &o.VaultConfig.ClientKey,
Flag: "vault-client-key",
Desc: "path to an unencrypted, PEM-encoded private key on disk which corresponds to the matching client certificate.",
},
{
DestP: &o.VaultConfig.InsecureSkipVerify,
Flag: "vault-skip-verify",
Desc: "do not verify Vault's presented certificate before communicating with it. Setting this variable is not recommended and voids Vault's security model.",
},
{
DestP: &o.VaultConfig.TLSServerName,
Flag: "vault-tls-server-name",
Desc: "name to use as the SNI host when connecting via TLS.",
},
{
DestP: &o.VaultConfig.Token,
Flag: "vault-token",
Desc: "vault authentication token",
},
{
DestP: &o.HttpTLSCert,
Flag: "tls-cert",
Desc: "TLS certificate for HTTPs",
},
{
DestP: &o.HttpTLSKey,
Flag: "tls-key",
Desc: "TLS key for HTTPs",
},
{
DestP: &o.HttpTLSMinVersion,
Flag: "tls-min-version",
Default: o.HttpTLSMinVersion,
Desc: "Minimum accepted TLS version",
},
{
DestP: &o.HttpTLSStrictCiphers,
Flag: "tls-strict-ciphers",
Default: o.HttpTLSStrictCiphers,
Desc: "Restrict accept ciphers to: ECDHE_RSA_WITH_AES_256_GCM_SHA384, ECDHE_RSA_WITH_AES_256_CBC_SHA, RSA_WITH_AES_256_GCM_SHA384, RSA_WITH_AES_256_CBC_SHA",
},
{
DestP: &o.NoTasks,
Flag: "no-tasks",
Default: o.NoTasks,
Desc: "disables the task scheduler",
},
{
DestP: &o.ConcurrencyQuota,
Flag: "query-concurrency",
Default: o.ConcurrencyQuota,
Desc: "the number of queries that are allowed to execute concurrently",
},
{
DestP: &o.InitialMemoryBytesQuotaPerQuery,
Flag: "query-initial-memory-bytes",
Default: o.InitialMemoryBytesQuotaPerQuery,
Desc: "the initial number of bytes allocated for a query when it is started. If this is unset, then query-memory-bytes will be used",
},
{
DestP: &o.MemoryBytesQuotaPerQuery,
Flag: "query-memory-bytes",
Default: o.MemoryBytesQuotaPerQuery,
Desc: "maximum number of bytes a query is allowed to use at any given time. This must be greater or equal to query-initial-memory-bytes",
},
{
DestP: &o.MaxMemoryBytes,
Flag: "query-max-memory-bytes",
Default: o.MaxMemoryBytes,
Desc: "the maximum amount of memory used for queries. If this is unset, then this number is query-concurrency * query-memory-bytes",
},
{
DestP: &o.QueueSize,
Flag: "query-queue-size",
Default: o.QueueSize,
Desc: "the number of queries that are allowed to be awaiting execution before new queries are rejected",
},
{
DestP: &o.FeatureFlags,
Flag: "feature-flags",
Desc: "feature flag overrides",
},
// storage configuration
{
DestP: &o.StorageConfig.Data.WALFsyncDelay,
Flag: "storage-wal-fsync-delay",
Desc: "The amount of time that a write will wait before fsyncing. A duration greater than 0 can be used to batch up multiple fsync calls. This is useful for slower disks or when WAL write contention is seen.",
},
{
DestP: &o.StorageConfig.Data.ValidateKeys,
Flag: "storage-validate-keys",
Desc: "Validates incoming writes to ensure keys only have valid unicode characters.",
},
{
DestP: &o.StorageConfig.Data.CacheMaxMemorySize,
Flag: "storage-cache-max-memory-size",
Desc: "The maximum size a shard's cache can reach before it starts rejecting writes.",
},
{
DestP: &o.StorageConfig.Data.CacheSnapshotMemorySize,
Flag: "storage-cache-snapshot-memory-size",
Desc: "The size at which the engine will snapshot the cache and write it to a TSM file, freeing up memory.",
},
{
DestP: &o.StorageConfig.Data.CacheSnapshotWriteColdDuration,
Flag: "storage-cache-snapshot-write-cold-duration",
Desc: "The length of time at which the engine will snapshot the cache and write it to a new TSM file if the shard hasn't received writes or deletes.",
},
{
DestP: &o.StorageConfig.Data.CompactFullWriteColdDuration,
Flag: "storage-compact-full-write-cold-duration",
Desc: "The duration at which the engine will compact all TSM files in a shard if it hasn't received a write or delete.",
},
{
DestP: &o.StorageConfig.Data.CompactThroughputBurst,
Flag: "storage-compact-throughput-burst",
Desc: "The rate limit in bytes per second that we will allow TSM compactions to write to disk.",
},
// limits
{
DestP: &o.StorageConfig.Data.MaxConcurrentCompactions,
Flag: "storage-max-concurrent-compactions",
Desc: "The maximum number of concurrent full and level compactions that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. Any number greater than 0 limits compactions to that value. This setting does not apply to cache snapshotting.",
},
{
DestP: &o.StorageConfig.Data.MaxIndexLogFileSize,
Flag: "storage-max-index-log-file-size",
Desc: "The threshold, in bytes, when an index write-ahead log file will compact into an index file. Lower sizes will cause log files to be compacted more quickly and result in lower heap usage at the expense of write throughput.",
},
{
DestP: &o.StorageConfig.Data.SeriesIDSetCacheSize,
Flag: "storage-series-id-set-cache-size",
Desc: "The size of the internal cache used in the TSI index to store previously calculated series results.",
},
{
DestP: &o.StorageConfig.Data.SeriesFileMaxConcurrentSnapshotCompactions,
Flag: "storage-series-file-max-concurrent-snapshot-compactions",
Desc: "The maximum number of concurrent snapshot compactions that can be running at one time across all series partitions in a database.",
},
{
DestP: &o.StorageConfig.Data.TSMWillNeed,
Flag: "storage-tsm-use-madv-willneed",
Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.",
},
{
DestP: &o.StorageConfig.RetentionService.CheckInterval,
Flag: "storage-retention-check-interval",
Desc: "The interval of time when retention policy enforcement checks run.",
},
{
DestP: &o.StorageConfig.PrecreatorConfig.CheckInterval,
Flag: "storage-shard-precreator-check-interval",
Desc: "The interval of time when the check to pre-create new shards runs.",
},
{
DestP: &o.StorageConfig.PrecreatorConfig.AdvancePeriod,
Flag: "storage-shard-precreator-advance-period",
Desc: "The default period ahead of the endtime of a shard group that its successor group is created.",
},
// InfluxQL Coordinator Config
{
DestP: &o.CoordinatorConfig.MaxSelectPointN,
Flag: "influxql-max-select-point",
Desc: "The maximum number of points a SELECT can process. A value of 0 will make the maximum point count unlimited. This will only be checked every second so queries will not be aborted immediately when hitting the limit.",
},
{
DestP: &o.CoordinatorConfig.MaxSelectSeriesN,
Flag: "influxql-max-select-series",
Desc: "The maximum number of series a SELECT can run. A value of 0 will make the maximum series count unlimited.",
},
{
DestP: &o.CoordinatorConfig.MaxSelectBucketsN,
Flag: "influxql-max-select-buckets",
Desc: "The maximum number of group by time bucket a SELECT can create. A value of zero will max the maximum number of buckets unlimited.",
},
}
cli.BindOptions(o.Viper, cmd, opts)
}

View File

@ -11,7 +11,6 @@ import (
_ "net/http/pprof" // needed to add pprof to our binary.
"os"
"path/filepath"
"strconv"
"sync"
"time"
@ -25,20 +24,16 @@ import (
"github.com/influxdata/influxdb/v2/dashboards"
dashboardTransport "github.com/influxdata/influxdb/v2/dashboards/transport"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/fluxinit"
"github.com/influxdata/influxdb/v2/gather"
"github.com/influxdata/influxdb/v2/http"
iqlcontrol "github.com/influxdata/influxdb/v2/influxql/control"
iqlquery "github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/internal/resource"
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/kit/feature"
overrideflagger "github.com/influxdata/influxdb/v2/kit/feature/override"
"github.com/influxdata/influxdb/v2/kit/metric"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/signals"
"github.com/influxdata/influxdb/v2/kit/tracing"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
@ -80,8 +75,6 @@ import (
pzap "github.com/influxdata/influxdb/v2/zap"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
jaegerconfig "github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -97,443 +90,31 @@ const (
LogTracing = "log"
// JaegerTracing enables tracing via the Jaeger client library
JaegerTracing = "jaeger"
// Max Integer
MaxInt = 1<<uint(strconv.IntSize-1) - 1
)
func NewInfluxdCommand(ctx context.Context, v *viper.Viper) *cobra.Command {
l := NewLauncher(WithViper(v))
prog := cli.Program{
Name: "influxd",
Run: cmdRunE(ctx, l),
}
assignDescs := func(cmd *cobra.Command) {
cmd.Short = "Start the influxd server (default)"
cmd.Long = `
Start up the daemon configured with flags/env vars/config file.
The order of precedence for config options are as follows (1 highest, 3 lowest):
1. flags
2. env vars
3. config file
A config file can be provided via the INFLUXD_CONFIG_PATH env var. If a file is
not provided via an env var, influxd will look in the current directory for a
config.{json|toml|yaml|yml} file. If one does not exist, then it will continue unchanged.`
}
cmd := cli.NewCommand(l.Viper, &prog)
runCmd := &cobra.Command{
Use: "run",
RunE: cmd.RunE,
Args: cobra.NoArgs,
}
for _, c := range []*cobra.Command{cmd, runCmd} {
assignDescs(c)
setLauncherCMDOpts(l, c)
}
cmd.AddCommand(runCmd)
return cmd
}
func cmdRunE(ctx context.Context, l *Launcher) func() error {
return func() error {
fluxinit.FluxInit()
// exit with SIGINT and SIGTERM
ctx = signals.WithStandardSignals(ctx)
if err := l.run(ctx); err != nil {
return err
} else if !l.Running() {
return errors.New("the daemon is already running")
}
var wg sync.WaitGroup
if !l.ReportingDisabled() {
reporter := telemetry.NewReporter(l.Log(), l.Registry())
reporter.Interval = 8 * time.Hour
wg.Add(1)
go func() {
defer wg.Done()
reporter.Report(ctx)
}()
}
<-ctx.Done()
// Attempt clean shutdown.
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
l.Shutdown(ctx)
wg.Wait()
return nil
}
}
var vaultConfig vault.Config
func setLauncherCMDOpts(l *Launcher, cmd *cobra.Command) {
cli.BindOptions(l.Viper, cmd, launcherOpts(l))
}
func launcherOpts(l *Launcher) []cli.Opt {
dir, err := fs.InfluxDir()
if err != nil {
panic(fmt.Errorf("failed to determine influx directory: %v", err))
}
return []cli.Opt{
{
DestP: &l.logLevel,
Flag: "log-level",
Default: zapcore.InfoLevel.String(),
Desc: "supported log levels are debug, info, and error",
},
{
DestP: &l.tracingType,
Flag: "tracing-type",
Default: "",
Desc: fmt.Sprintf("supported tracing types are %s, %s", LogTracing, JaegerTracing),
},
{
DestP: &l.httpBindAddress,
Flag: "http-bind-address",
Default: ":8086",
Desc: "bind address for the REST HTTP API",
},
{
DestP: &l.boltPath,
Flag: "bolt-path",
Default: filepath.Join(dir, bolt.DefaultFilename),
Desc: "path to boltdb database",
},
{
DestP: &l.assetsPath,
Flag: "assets-path",
Desc: "override default assets by serving from a specific directory (developer mode)",
},
{
DestP: &l.storeType,
Flag: "store",
Default: "bolt",
Desc: "backing store for REST resources (bolt or memory)",
},
{
DestP: &l.testing,
Flag: "e2e-testing",
Default: false,
Desc: "add /debug/flush endpoint to clear stores; used for end-to-end tests",
},
{
DestP: &l.testingAlwaysAllowSetup,
Flag: "testing-always-allow-setup",
Default: false,
Desc: "ensures the /api/v2/setup endpoint always returns true to allow onboarding",
},
{
DestP: &l.enginePath,
Flag: "engine-path",
Default: filepath.Join(dir, "engine"),
Desc: "path to persistent engine files",
},
{
DestP: &l.secretStore,
Flag: "secret-store",
Default: "bolt",
Desc: "data store for secrets (bolt or vault)",
},
{
DestP: &l.reportingDisabled,
Flag: "reporting-disabled",
Default: false,
Desc: "disable sending telemetry data to https://telemetry.influxdata.com every 8 hours",
},
{
DestP: &l.sessionLength,
Flag: "session-length",
Default: 60, // 60 minutes
Desc: "ttl in minutes for newly created sessions",
},
{
DestP: &l.sessionRenewDisabled,
Flag: "session-renew-disabled",
Default: false,
Desc: "disables automatically extending session ttl on request",
},
{
DestP: &vaultConfig.Address,
Flag: "vault-addr",
Desc: "address of the Vault server expressed as a URL and port, for example: https://127.0.0.1:8200/.",
},
{
DestP: &vaultConfig.ClientTimeout,
Flag: "vault-client-timeout",
Desc: "timeout variable. The default value is 60s.",
},
{
DestP: &vaultConfig.MaxRetries,
Flag: "vault-max-retries",
Desc: "maximum number of retries when a 5xx error code is encountered. The default is 2, for three total attempts. Set this to 0 or less to disable retrying.",
},
{
DestP: &vaultConfig.CACert,
Flag: "vault-cacert",
Desc: "path to a PEM-encoded CA certificate file on the local disk. This file is used to verify the Vault server's SSL certificate. This environment variable takes precedence over VAULT_CAPATH.",
},
{
DestP: &vaultConfig.CAPath,
Flag: "vault-capath",
Desc: "path to a directory of PEM-encoded CA certificate files on the local disk. These certificates are used to verify the Vault server's SSL certificate.",
},
{
DestP: &vaultConfig.ClientCert,
Flag: "vault-client-cert",
Desc: "path to a PEM-encoded client certificate on the local disk. This file is used for TLS communication with the Vault server.",
},
{
DestP: &vaultConfig.ClientKey,
Flag: "vault-client-key",
Desc: "path to an unencrypted, PEM-encoded private key on disk which corresponds to the matching client certificate.",
},
{
DestP: &vaultConfig.InsecureSkipVerify,
Flag: "vault-skip-verify",
Desc: "do not verify Vault's presented certificate before communicating with it. Setting this variable is not recommended and voids Vault's security model.",
},
{
DestP: &vaultConfig.TLSServerName,
Flag: "vault-tls-server-name",
Desc: "name to use as the SNI host when connecting via TLS.",
},
{
DestP: &vaultConfig.Token,
Flag: "vault-token",
Desc: "vault authentication token",
},
{
DestP: &l.httpTLSCert,
Flag: "tls-cert",
Default: "",
Desc: "TLS certificate for HTTPs",
},
{
DestP: &l.httpTLSKey,
Flag: "tls-key",
Default: "",
Desc: "TLS key for HTTPs",
},
{
DestP: &l.httpTLSMinVersion,
Flag: "tls-min-version",
Default: "1.2",
Desc: "Minimum accepted TLS version",
},
{
DestP: &l.httpTLSStrictCiphers,
Flag: "tls-strict-ciphers",
Default: false,
Desc: "Restrict accept ciphers to: ECDHE_RSA_WITH_AES_256_GCM_SHA384, ECDHE_RSA_WITH_AES_256_CBC_SHA, RSA_WITH_AES_256_GCM_SHA384, RSA_WITH_AES_256_CBC_SHA",
},
{
DestP: &l.noTasks,
Flag: "no-tasks",
Default: false,
Desc: "disables the task scheduler",
},
{
DestP: &l.concurrencyQuota,
Flag: "query-concurrency",
Default: 10,
Desc: "the number of queries that are allowed to execute concurrently",
},
{
DestP: &l.initialMemoryBytesQuotaPerQuery,
Flag: "query-initial-memory-bytes",
Default: 0,
Desc: "the initial number of bytes allocated for a query when it is started. If this is unset, then query-memory-bytes will be used",
},
{
DestP: &l.memoryBytesQuotaPerQuery,
Flag: "query-memory-bytes",
Default: MaxInt,
Desc: "maximum number of bytes a query is allowed to use at any given time. This must be greater or equal to query-initial-memory-bytes",
},
{
DestP: &l.maxMemoryBytes,
Flag: "query-max-memory-bytes",
Default: 0,
Desc: "the maximum amount of memory used for queries. If this is unset, then this number is query-concurrency * query-memory-bytes",
},
{
DestP: &l.queueSize,
Flag: "query-queue-size",
Default: 10,
Desc: "the number of queries that are allowed to be awaiting execution before new queries are rejected",
},
{
DestP: &l.featureFlags,
Flag: "feature-flags",
Desc: "feature flag overrides",
},
// storage configuration
{
DestP: &l.StorageConfig.Data.WALFsyncDelay,
Flag: "storage-wal-fsync-delay",
Desc: "The amount of time that a write will wait before fsyncing. A duration greater than 0 can be used to batch up multiple fsync calls. This is useful for slower disks or when WAL write contention is seen.",
},
{
DestP: &l.StorageConfig.Data.ValidateKeys,
Flag: "storage-validate-keys",
Desc: "Validates incoming writes to ensure keys only have valid unicode characters.",
},
{
DestP: &l.StorageConfig.Data.CacheMaxMemorySize,
Flag: "storage-cache-max-memory-size",
Desc: "The maximum size a shard's cache can reach before it starts rejecting writes.",
},
{
DestP: &l.StorageConfig.Data.CacheSnapshotMemorySize,
Flag: "storage-cache-snapshot-memory-size",
Desc: "The size at which the engine will snapshot the cache and write it to a TSM file, freeing up memory.",
},
{
DestP: &l.StorageConfig.Data.CacheSnapshotWriteColdDuration,
Flag: "storage-cache-snapshot-write-cold-duration",
Desc: "The length of time at which the engine will snapshot the cache and write it to a new TSM file if the shard hasn't received writes or deletes.",
},
{
DestP: &l.StorageConfig.Data.CompactFullWriteColdDuration,
Flag: "storage-compact-full-write-cold-duration",
Desc: "The duration at which the engine will compact all TSM files in a shard if it hasn't received a write or delete.",
},
{
DestP: &l.StorageConfig.Data.CompactThroughputBurst,
Flag: "storage-compact-throughput-burst",
Desc: "The rate limit in bytes per second that we will allow TSM compactions to write to disk.",
},
// limits
{
DestP: &l.StorageConfig.Data.MaxConcurrentCompactions,
Flag: "storage-max-concurrent-compactions",
Desc: "The maximum number of concurrent full and level compactions that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. Any number greater than 0 limits compactions to that value. This setting does not apply to cache snapshotting.",
},
{
DestP: &l.StorageConfig.Data.MaxIndexLogFileSize,
Flag: "storage-max-index-log-file-size",
Desc: "The threshold, in bytes, when an index write-ahead log file will compact into an index file. Lower sizes will cause log files to be compacted more quickly and result in lower heap usage at the expense of write throughput.",
},
{
DestP: &l.StorageConfig.Data.SeriesIDSetCacheSize,
Flag: "storage-series-id-set-cache-size",
Desc: "The size of the internal cache used in the TSI index to store previously calculated series results.",
},
{
DestP: &l.StorageConfig.Data.SeriesFileMaxConcurrentSnapshotCompactions,
Flag: "storage-series-file-max-concurrent-snapshot-compactions",
Desc: "The maximum number of concurrent snapshot compactions that can be running at one time across all series partitions in a database.",
},
{
DestP: &l.StorageConfig.Data.TSMWillNeed,
Flag: "storage-tsm-use-madv-willneed",
Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.",
},
{
DestP: &l.StorageConfig.RetentionService.CheckInterval,
Flag: "storage-retention-check-interval",
Desc: "The interval of time when retention policy enforcement checks run.",
},
{
DestP: &l.StorageConfig.PrecreatorConfig.CheckInterval,
Flag: "storage-shard-precreator-check-interval",
Desc: "The interval of time when the check to pre-create new shards runs.",
},
{
DestP: &l.StorageConfig.PrecreatorConfig.AdvancePeriod,
Flag: "storage-shard-precreator-advance-period",
Desc: "The default period ahead of the endtime of a shard group that its successor group is created.",
},
// InfluxQL Coordinator Config
{
DestP: &l.CoordinatorConfig.MaxSelectPointN,
Flag: "influxql-max-select-point",
Desc: "The maximum number of points a SELECT can process. A value of 0 will make the maximum point count unlimited. This will only be checked every second so queries will not be aborted immediately when hitting the limit.",
},
{
DestP: &l.CoordinatorConfig.MaxSelectSeriesN,
Flag: "influxql-max-select-series",
Desc: "The maximum number of series a SELECT can run. A value of 0 will make the maximum series count unlimited.",
},
{
DestP: &l.CoordinatorConfig.MaxSelectBucketsN,
Flag: "influxql-max-select-buckets",
Desc: "The maximum number of group by time bucket a SELECT can create. A value of zero will max the maximum number of buckets unlimited.",
},
}
}
// Launcher represents the main program execution.
type Launcher struct {
wg sync.WaitGroup
cancel func()
running bool
wg sync.WaitGroup
cancel func()
storeType string
assetsPath string
testing bool
testingAlwaysAllowSetup bool
sessionLength int // in minutes
sessionRenewDisabled bool
logLevel string
tracingType string
reportingDisabled bool
httpBindAddress string
boltPath string
enginePath string
secretStore string
featureFlags map[string]string
flagger feature.Flagger
// Query options.
concurrencyQuota int32
initialMemoryBytesQuotaPerQuery int64
memoryBytesQuotaPerQuery int64
maxMemoryBytes int64
queueSize int32
flagger feature.Flagger
boltClient *bolt.Client
kvStore kv.SchemaStore
kvService *kv.Service
// storage engine
engine Engine
StorageConfig storage.Config
engine Engine
// InfluxQL query engine
CoordinatorConfig iqlcoordinator.Config
queryController *control.Controller
httpPort int
httpServer *nethttp.Server
httpTLSCert string
httpTLSKey string
httpTLSMinVersion string
httpTLSStrictCiphers bool
httpPort int
httpServer *nethttp.Server
natsServer *nats.Server
natsPort int
noTasks bool
scheduler stoppingScheduler
executor *executor.Executor
taskControlService taskbackend.TaskControlService
@ -542,14 +123,10 @@ type Launcher struct {
log *zap.Logger
reg *prom.Registry
opts []Option
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
apibackend *http.APIBackend
Viper *viper.Viper
}
type stoppingScheduler interface {
@ -558,46 +135,21 @@ type stoppingScheduler interface {
}
// NewLauncher returns a new instance of Launcher connected to standard in/out/err.
func NewLauncher(opts ...Option) *Launcher {
func NewLauncher() *Launcher {
l := &Launcher{
opts: opts,
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
StorageConfig: storage.NewConfig(),
}
for _, opt := range opts {
opt.applyInit(l)
}
if l.Viper == nil {
l.Viper = viper.New()
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
return l
}
// Running returns true if the main Launcher has started running.
func (m *Launcher) Running() bool {
return m.running
}
// ReportingDisabled is true if opted out of usage stats.
func (m *Launcher) ReportingDisabled() bool {
return m.reportingDisabled
}
// Registry returns the prometheus metrics registry.
func (m *Launcher) Registry() *prom.Registry {
return m.reg
}
// Log returns the launchers logger.
func (m *Launcher) Log() *zap.Logger {
return m.log
}
// URL returns the URL to connect to the HTTP server.
func (m *Launcher) URL() string {
return fmt.Sprintf("http://127.0.0.1:%d", m.httpPort)
@ -654,35 +206,14 @@ func (m *Launcher) Shutdown(ctx context.Context) {
// Cancel executes the context cancel on the program. Used for testing.
func (m *Launcher) Cancel() { m.cancel() }
// Run executes the program with the given CLI arguments.
func (m *Launcher) Run(ctx context.Context, args ...string) error {
cmd := &cobra.Command{
Use: "run",
Short: "Start the influxd server (default)",
RunE: func(cmd *cobra.Command, args []string) error {
return m.run(ctx)
},
}
setLauncherCMDOpts(m, cmd)
cmd.SetArgs(args)
return cmd.Execute()
}
func (m *Launcher) run(ctx context.Context) (err error) {
func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
m.running = true
ctx, m.cancel = context.WithCancel(ctx)
for _, opt := range m.opts {
opt.applyConfig(m)
}
var logLevel zapcore.Level
if err := logLevel.Set(m.logLevel); err != nil {
if err := logLevel.Set(opts.LogLevel); err != nil {
return fmt.Errorf("unknown log level; supported levels are debug, info, and error")
}
@ -705,7 +236,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
zap.String("build_date", info.Date),
)
switch m.tracingType {
switch opts.TracingType {
case LogTracing:
m.log.Info("Tracing via zap logging")
tracer := pzap.NewTracer(m.log, snowflake.NewIDGenerator())
@ -728,7 +259,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}
m.boltClient = bolt.NewClient(m.log.With(zap.String("service", "bolt")))
m.boltClient.Path = m.boltPath
m.boltClient.Path = opts.BoltPath
if err := m.boltClient.Open(ctx); err != nil {
m.log.Error("Failed opening bolt", zap.Error(err))
@ -736,24 +267,24 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}
var flushers flushers
switch m.storeType {
switch opts.StoreType {
case BoltStore:
store := bolt.NewKVStore(m.log.With(zap.String("service", "kvstore-bolt")), m.boltPath)
store := bolt.NewKVStore(m.log.With(zap.String("service", "kvstore-bolt")), opts.BoltPath)
store.WithDB(m.boltClient.DB())
m.kvStore = store
if m.testing {
if opts.Testing {
flushers = append(flushers, store)
}
case MemoryStore:
store := inmem.NewKVStore()
m.kvStore = store
if m.testing {
if opts.Testing {
flushers = append(flushers, store)
}
default:
err := fmt.Errorf("unknown store type %s; expected bolt or memory", m.storeType)
err := fmt.Errorf("unknown store type %s; expected bolt or memory", opts.StoreType)
m.log.Error("Failed opening bolt", zap.Error(err))
return err
}
@ -820,20 +351,20 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var secretSvc platform.SecretService = secret.NewMetricService(m.reg, secret.NewLogger(m.log.With(zap.String("service", "secret")), secret.NewService(secretStore)))
switch m.secretStore {
switch opts.SecretStore {
case "bolt":
// If it is bolt, then we already set it above.
case "vault":
// The vault secret service is configured using the standard vault environment variables.
// https://www.vaultproject.io/docs/commands/index.html#environment-variables
svc, err := vault.NewSecretService(vault.WithConfig(vaultConfig))
svc, err := vault.NewSecretService(vault.WithConfig(opts.VaultConfig))
if err != nil {
m.log.Error("Failed initializing vault secret service", zap.Error(err))
return err
}
secretSvc = svc
default:
err := fmt.Errorf("unknown secret service %q, expected \"bolt\" or \"vault\"", m.secretStore)
err := fmt.Errorf("unknown secret service %q, expected \"bolt\" or \"vault\"", opts.SecretStore)
m.log.Error("Failed setting secret service", zap.Error(err))
return err
}
@ -850,23 +381,23 @@ func (m *Launcher) run(ctx context.Context) (err error) {
return err
}
if m.testing {
if opts.Testing {
// the testing engine will write/read into a temporary directory
engine := NewTemporaryEngine(
m.StorageConfig,
opts.StorageConfig,
storage.WithMetaClient(metaClient),
)
flushers = append(flushers, engine)
m.engine = engine
} else {
// check for 2.x data / state from a prior 2.x
if err := checkForPriorVersion(ctx, m.log, m.boltPath, m.enginePath, ts.BucketService, metaClient); err != nil {
if err := checkForPriorVersion(ctx, m.log, opts.BoltPath, opts.EnginePath, ts.BucketService, metaClient); err != nil {
os.Exit(1)
}
m.engine = storage.NewEngine(
m.enginePath,
m.StorageConfig,
opts.EnginePath,
opts.StorageConfig,
storage.WithMetaClient(metaClient),
)
}
@ -899,11 +430,11 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}
m.queryController, err = control.New(control.Config{
ConcurrencyQuota: m.concurrencyQuota,
InitialMemoryBytesQuotaPerQuery: m.initialMemoryBytesQuotaPerQuery,
MemoryBytesQuotaPerQuery: m.memoryBytesQuotaPerQuery,
MaxMemoryBytes: m.maxMemoryBytes,
QueueSize: m.queueSize,
ConcurrencyQuota: opts.ConcurrencyQuota,
InitialMemoryBytesQuotaPerQuery: opts.InitialMemoryBytesQuotaPerQuery,
MemoryBytesQuotaPerQuery: opts.MemoryBytesQuotaPerQuery,
MaxMemoryBytes: opts.MaxMemoryBytes,
QueueSize: opts.QueueSize,
Logger: m.log.With(zap.String("service", "storage-reads")),
ExecutorDependencies: []flux.Dependency{deps},
})
@ -940,7 +471,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
schLogger := m.log.With(zap.String("service", "task-scheduler"))
var sch stoppingScheduler = &scheduler.NoopScheduler{}
if !m.noTasks {
if !opts.NoTasks {
var (
sm *scheduler.SchedulerMetrics
err error
@ -998,9 +529,9 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}
m.log.Info("Configuring InfluxQL statement executor (zeros indicate unlimited).",
zap.Int("max_select_point", m.CoordinatorConfig.MaxSelectPointN),
zap.Int("max_select_series", m.CoordinatorConfig.MaxSelectSeriesN),
zap.Int("max_select_buckets", m.CoordinatorConfig.MaxSelectBucketsN))
zap.Int("max_select_point", opts.CoordinatorConfig.MaxSelectPointN),
zap.Int("max_select_series", opts.CoordinatorConfig.MaxSelectSeriesN),
zap.Int("max_select_buckets", opts.CoordinatorConfig.MaxSelectBucketsN))
qe := iqlquery.NewExecutor(m.log, cm)
se := &iqlcoordinator.StatementExecutor{
@ -1008,9 +539,9 @@ func (m *Launcher) run(ctx context.Context) (err error) {
TSDBStore: m.engine.TSDBStore(),
ShardMapper: mapper,
DBRP: dbrpSvc,
MaxSelectPointN: m.CoordinatorConfig.MaxSelectPointN,
MaxSelectSeriesN: m.CoordinatorConfig.MaxSelectSeriesN,
MaxSelectBucketsN: m.CoordinatorConfig.MaxSelectBucketsN,
MaxSelectPointN: opts.CoordinatorConfig.MaxSelectPointN,
MaxSelectSeriesN: opts.CoordinatorConfig.MaxSelectSeriesN,
MaxSelectBucketsN: opts.CoordinatorConfig.MaxSelectBucketsN,
}
qe.StatementExecutor = se
qe.StatementNormalizer = se
@ -1085,19 +616,19 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}(m.log)
m.httpServer = &nethttp.Server{
Addr: m.httpBindAddress,
Addr: opts.HttpBindAddress,
}
if m.flagger == nil {
m.flagger = feature.DefaultFlagger()
if len(m.featureFlags) > 0 {
f, err := overrideflagger.Make(m.featureFlags, feature.ByKey)
if len(opts.FeatureFlags) > 0 {
f, err := overrideflagger.Make(opts.FeatureFlags, feature.ByKey)
if err != nil {
m.log.Error("Failed to configure feature flag overrides",
zap.Error(err), zap.Any("overrides", m.featureFlags))
zap.Error(err), zap.Any("overrides", opts.FeatureFlags))
return err
}
m.log.Info("Running with feature flag overrides", zap.Any("overrides", m.featureFlags))
m.log.Info("Running with feature flag overrides", zap.Any("overrides", opts.FeatureFlags))
m.flagger = f
}
}
@ -1109,7 +640,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
ts.UserService,
ts.UserResourceMappingService,
authSvc,
session.WithSessionLength(time.Duration(m.sessionLength)*time.Minute),
session.WithSessionLength(time.Duration(opts.SessionLength)*time.Minute),
)
sessionSvc = session.NewSessionMetrics(m.reg, sessionSvc)
sessionSvc = session.NewSessionLogger(m.log.With(zap.String("service", "session")), sessionSvc)
@ -1129,7 +660,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
ts.BucketService = dbrp.NewBucketService(m.log, ts.BucketService, dbrpSvc)
var onboardOpts []tenant.OnboardServiceOptionFn
if m.testingAlwaysAllowSetup {
if opts.TestingAlwaysAllowSetup {
onboardOpts = append(onboardOpts, tenant.WithAlwaysAllowInitialUser())
}
@ -1191,10 +722,10 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}
m.apibackend = &http.APIBackend{
AssetsPath: m.assetsPath,
AssetsPath: opts.AssetsPath,
HTTPErrorHandler: kithttp.ErrorHandler(0),
Logger: m.log,
SessionRenewDisabled: m.sessionRenewDisabled,
SessionRenewDisabled: opts.SessionRenewDisabled,
NewBucketService: source.NewBucketService,
NewQueryService: source.NewQueryService,
PointsWriter: &storage.LoggingPointsWriter{
@ -1393,12 +924,12 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.httpServer.Handler = http.LoggingMW(httpLogger)(m.httpServer.Handler)
}
// If we are in testing mode we allow all data to be flushed and removed.
if m.testing {
if opts.Testing {
m.httpServer.Handler = http.DebugFlush(ctx, m.httpServer.Handler, flushers)
}
}
ln, err := net.Listen("tcp", m.httpBindAddress)
ln, err := net.Listen("tcp", opts.HttpBindAddress)
if err != nil {
m.log.Error("failed http listener", zap.Error(err))
m.log.Info("Stopping")
@ -1408,9 +939,9 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var cer tls.Certificate
transport := "http"
if m.httpTLSCert != "" && m.httpTLSKey != "" {
if opts.HttpTLSCert != "" && opts.HttpTLSKey != "" {
var err error
cer, err = tls.LoadX509KeyPair(m.httpTLSCert, m.httpTLSKey)
cer, err = tls.LoadX509KeyPair(opts.HttpTLSCert, opts.HttpTLSKey)
if err != nil {
m.log.Error("failed to load x509 key pair", zap.Error(err))
@ -1422,7 +953,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
// Sensible default
var tlsMinVersion uint16 = tls.VersionTLS12
switch m.httpTLSMinVersion {
switch opts.HttpTLSMinVersion {
case "1.0":
m.log.Warn("Setting the minimum version of TLS to 1.0 - this is discouraged. Please use 1.2 or 1.3")
tlsMinVersion = tls.VersionTLS10
@ -1446,7 +977,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var cipherConfig []uint16 = nil
// TLS 1.3 does not support configuring the Cipher suites
if tlsMinVersion != tls.VersionTLS13 && m.httpTLSStrictCiphers {
if tlsMinVersion != tls.VersionTLS13 && opts.HttpTLSStrictCiphers {
cipherConfig = strictCiphers
}
@ -1465,10 +996,10 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.wg.Add(1)
go func(log *zap.Logger) {
defer m.wg.Done()
log.Info("Listening", zap.String("transport", transport), zap.String("addr", m.httpBindAddress), zap.Int("port", m.httpPort))
log.Info("Listening", zap.String("transport", transport), zap.String("addr", opts.HttpBindAddress), zap.Int("port", m.httpPort))
if cer.Certificate != nil {
if err := m.httpServer.ServeTLS(ln, m.httpTLSCert, m.httpTLSKey); err != nethttp.ErrServerClosed {
if err := m.httpServer.ServeTLS(ln, opts.HttpTLSCert, opts.HttpTLSKey); err != nethttp.ErrServerClosed {
log.Error("Failed https service", zap.Error(err))
}
} else {
@ -1479,9 +1010,24 @@ func (m *Launcher) run(ctx context.Context) (err error) {
log.Info("Stopping")
}(m.log)
if !opts.ReportingDisabled {
m.runReporter(ctx)
}
return nil
}
// runReporter configures and launches a periodic telemetry report for the server.
func (m *Launcher) runReporter(ctx context.Context) {
reporter := telemetry.NewReporter(m.log, m.reg)
reporter.Interval = 8 * time.Hour
m.wg.Add(1)
go func() {
defer m.wg.Done()
reporter.Report(ctx)
}()
}
func checkForPriorVersion(ctx context.Context, log *zap.Logger, boltPath string, enginePath string, bs platform.BucketService, metaClient *meta.Client) error {
buckets, _, err := bs.FindBuckets(ctx, platform.BucketFilter{})
if err != nil {

View File

@ -30,6 +30,8 @@ import (
"github.com/influxdata/influxdb/v2/tenant"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/spf13/viper"
"go.uber.org/zap"
)
// TestLauncher is a test wrapper for launcher.Launcher.
@ -57,8 +59,8 @@ type TestLauncher struct {
}
// NewTestLauncher returns a new instance of TestLauncher.
func NewTestLauncher(flagger feature.Flagger, opts ...Option) *TestLauncher {
l := &TestLauncher{Launcher: NewLauncher(opts...)}
func NewTestLauncher(flagger feature.Flagger) *TestLauncher {
l := &TestLauncher{Launcher: NewLauncher()}
l.Launcher.Stdin = &l.Stdin
l.Launcher.Stdout = &l.Stdout
l.Launcher.Stderr = &l.Stderr
@ -84,39 +86,49 @@ func NewTestLauncherServer(flagger feature.Flagger) *TestLauncher {
}
// RunTestLauncherOrFail initializes and starts the server.
func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, flagger feature.Flagger, args ...string) *TestLauncher {
func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, flagger feature.Flagger, setters ...OptSetter) *TestLauncher {
tb.Helper()
l := NewTestLauncher(flagger)
if err := l.Run(ctx, args...); err != nil {
if err := l.Run(ctx, setters...); err != nil {
tb.Fatal(err)
}
return l
}
// SetLogger sets the logger for the underlying program.
func (tl *TestLauncher) SetLogger(logger *zap.Logger) {
tl.Launcher.log = logger
}
type OptSetter = func(o *InfluxdOpts)
// Run executes the program with additional arguments to set paths and ports.
// Passed arguments will overwrite/add to the default ones.
func (tl *TestLauncher) Run(ctx context.Context, args ...string) error {
largs := make([]string, 0, len(args)+8)
func (tl *TestLauncher) Run(ctx context.Context, setters ...OptSetter) error {
opts := newOpts(viper.New())
if !tl.realServer {
largs = append(largs, "--store", "memory")
largs = append(largs, "--e2e-testing")
opts.StoreType = "memory"
opts.Testing = true
}
largs = append(largs, "--testing-always-allow-setup")
largs = append(largs, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename))
largs = append(largs, "--engine-path", filepath.Join(tl.Path, "engine"))
largs = append(largs, "--http-bind-address", "127.0.0.1:0")
largs = append(largs, "--log-level", "debug")
largs = append(largs, args...)
return tl.Launcher.Run(ctx, largs...)
opts.TestingAlwaysAllowSetup = true
opts.BoltPath = filepath.Join(tl.Path, bolt.DefaultFilename)
opts.EnginePath = filepath.Join(tl.Path, "engine")
opts.HttpBindAddress = "127.0.0.1:0"
opts.LogLevel = zap.DebugLevel.String()
opts.ReportingDisabled = true
for _, setter := range setters {
setter(opts)
}
return tl.Launcher.run(ctx, opts)
}
// Shutdown stops the program and cleans up temporary paths.
func (tl *TestLauncher) Shutdown(ctx context.Context) error {
if tl.running {
tl.Cancel()
tl.Launcher.Shutdown(ctx)
}
tl.Cancel()
tl.Launcher.Shutdown(ctx)
return os.RemoveAll(tl.Path)
}

View File

@ -1,64 +0,0 @@
package launcher
import (
"github.com/spf13/viper"
"go.uber.org/zap"
)
type Option interface {
applyInit(l *Launcher)
applyConfig(l *Launcher)
}
func WithLogger(log *zap.Logger) Option {
return &launcherOption{
applyConfigFn: func(l *Launcher) {
l.log = log
},
}
}
func WithViper(v *viper.Viper) Option {
return &launcherOption{
applyInitFn: func(l *Launcher) {
l.Viper = v
},
}
}
// WithInfluxQLMaxSelectSeriesN configures the maximum number of series returned by a select statement.
func WithInfluxQLMaxSelectSeriesN(n int) Option {
return &launcherOption{
applyConfigFn: func(l *Launcher) {
l.CoordinatorConfig.MaxSelectSeriesN = n
},
}
}
// WithInfluxQLMaxSelectBucketsN configures the maximum number of buckets returned by a select statement.
func WithInfluxQLMaxSelectBucketsN(n int) Option {
return &launcherOption{
applyConfigFn: func(l *Launcher) {
l.CoordinatorConfig.MaxSelectBucketsN = n
},
}
}
type launcherOption struct {
applyInitFn func(*Launcher)
applyConfigFn func(*Launcher)
}
var _ Option = launcherOption{}
func (o launcherOption) applyConfig(l *Launcher) {
if o.applyConfigFn != nil {
o.applyConfigFn(l)
}
}
func (o launcherOption) applyInit(l *Launcher) {
if o.applyInitFn != nil {
o.applyInitFn(l)
}
}

View File

@ -27,7 +27,9 @@ import (
var ctx = context.Background()
func TestLauncher_Pkger(t *testing.T) {
l := RunTestLauncherOrFail(t, ctx, nil, "--log-level", "error")
l := RunTestLauncherOrFail(t, ctx, nil, func(o *InfluxdOpts) {
o.LogLevel = zap.ErrorLevel.String()
})
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
require.NoError(t, l.BucketService(t).DeleteBucket(ctx, l.Bucket.ID))

View File

@ -31,6 +31,7 @@ import (
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query"
"go.uber.org/zap"
)
func TestLauncher_Write_Query_FieldKey(t *testing.T) {
@ -245,7 +246,7 @@ func queryPoints(ctx context.Context, t *testing.T, l *launcher.TestLauncher, op
func TestLauncher_QueryMemoryLimits(t *testing.T) {
tcs := []struct {
name string
args []string
setOpts launcher.OptSetter
err bool
querySizeBytes int
// max_memory - per_query_memory * concurrency
@ -253,10 +254,10 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
}{
{
name: "ok - initial memory bytes, memory bytes, and max memory set",
args: []string{
"--query-concurrency", "1",
"--query-initial-memory-bytes", "100",
"--query-max-memory-bytes", "1048576", // 1MB
setOpts: func(o *launcher.InfluxdOpts) {
o.ConcurrencyQuota = 1
o.InitialMemoryBytesQuotaPerQuery = 100
o.MaxMemoryBytes = 1048576 // 1MB
},
querySizeBytes: 30000,
err: false,
@ -264,10 +265,10 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
},
{
name: "error - memory bytes and max memory set",
args: []string{
"--query-concurrency", "1",
"--query-memory-bytes", "1",
"--query-max-memory-bytes", "100",
setOpts: func(o *launcher.InfluxdOpts) {
o.ConcurrencyQuota = 1
o.MemoryBytesQuotaPerQuery = 1
o.MaxMemoryBytes = 100
},
querySizeBytes: 2,
err: true,
@ -275,10 +276,10 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
},
{
name: "error - initial memory bytes and max memory set",
args: []string{
"--query-concurrency", "1",
"--query-initial-memory-bytes", "1",
"--query-max-memory-bytes", "100",
setOpts: func(o *launcher.InfluxdOpts) {
o.ConcurrencyQuota = 1
o.InitialMemoryBytesQuotaPerQuery = 1
o.MaxMemoryBytes = 100
},
querySizeBytes: 101,
err: true,
@ -286,11 +287,11 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
},
{
name: "error - initial memory bytes, memory bytes, and max memory set",
args: []string{
"--query-concurrency", "1",
"--query-initial-memory-bytes", "1",
"--query-memory-bytes", "50",
"--query-max-memory-bytes", "100",
setOpts: func(o *launcher.InfluxdOpts) {
o.ConcurrencyQuota = 1
o.InitialMemoryBytesQuotaPerQuery = 1
o.MemoryBytesQuotaPerQuery = 50
o.MaxMemoryBytes = 100
},
querySizeBytes: 51,
err: true,
@ -300,7 +301,7 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil, tc.args...)
l := launcher.RunTestLauncherOrFail(t, ctx, nil, tc.setOpts)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
@ -338,13 +339,13 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) {
t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\" on OK query")
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
"--log-level", "error",
"--query-concurrency", "1",
"--query-initial-memory-bytes", "100",
"--query-memory-bytes", "50000",
"--query-max-memory-bytes", "200000",
)
l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) {
o.LogLevel = zap.ErrorLevel.String()
o.ConcurrencyQuota = 1
o.InitialMemoryBytesQuotaPerQuery = 100
o.MemoryBytesQuotaPerQuery = 50000
o.MaxMemoryBytes = 200000
})
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
@ -383,13 +384,13 @@ func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) {
func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) {
t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\"")
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
"--log-level", "error",
"--query-concurrency", "1",
"--query-initial-memory-bytes", "100",
"--query-memory-bytes", "50000",
"--query-max-memory-bytes", "200000",
)
l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) {
o.LogLevel = zap.ErrorLevel.String()
o.ConcurrencyQuota = 1
o.InitialMemoryBytesQuotaPerQuery = 100
o.MemoryBytesQuotaPerQuery = 50000
o.MaxMemoryBytes = 200000
})
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
@ -427,14 +428,14 @@ func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) {
func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) {
t.Skip("this test is flaky, occasionally get error: \"dial tcp 127.0.0.1:59654: connect: connection reset by peer\"")
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
"--log-level", "error",
"--query-queue-size", "1024",
"--query-concurrency", "1",
"--query-initial-memory-bytes", "10000",
"--query-memory-bytes", "50000",
"--query-max-memory-bytes", "200000",
)
l := launcher.RunTestLauncherOrFail(t, ctx, nil, func(o *launcher.InfluxdOpts) {
o.LogLevel = zap.ErrorLevel.String()
o.QueueSize = 1024
o.ConcurrencyQuota = 1
o.InitialMemoryBytesQuotaPerQuery = 10000
o.MemoryBytesQuotaPerQuery = 50000
o.MaxMemoryBytes = 200000
})
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)

View File

@ -60,21 +60,18 @@ func NewPipeline(tb testing.TB, opts ...PipelineOption) *Pipeline {
logger = zaptest.NewLogger(tb, logLevel).With(zap.String("test_name", tb.Name()))
}
launcherOptions := []launcher.Option{
launcher.WithLogger(logger),
}
for _, o := range opts {
if opt := o.makeLauncherOption(); opt != nil {
launcherOptions = append(launcherOptions, opt)
}
}
tl := launcher.NewTestLauncher(nil)
tl.SetLogger(logger)
tl := launcher.NewTestLauncher(nil, launcherOptions...)
p := &Pipeline{
Launcher: tl,
}
err := tl.Run(context.Background())
err := tl.Run(context.Background(), func(o *launcher.InfluxdOpts) {
for _, opt := range opts {
opt.applyOptSetter(o)
}
})
require.NoError(tb, err)
// setup default operator

View File

@ -8,12 +8,12 @@ import (
// PipelineOption configures a pipeline.
type PipelineOption interface {
applyConfig(*pipelineConfig)
makeLauncherOption() launcher.Option
applyOptSetter(*launcher.InfluxdOpts)
}
type pipelineOption struct {
applyConfigFn func(*pipelineConfig)
makeLauncherOptionFn func() launcher.Option
applyConfigFn func(*pipelineConfig)
optSetter launcher.OptSetter
}
var _ PipelineOption = pipelineOption{}
@ -24,11 +24,10 @@ func (o pipelineOption) applyConfig(pc *pipelineConfig) {
}
}
func (o pipelineOption) makeLauncherOption() launcher.Option {
if o.makeLauncherOptionFn != nil {
return o.makeLauncherOptionFn()
func (o pipelineOption) applyOptSetter(opts *launcher.InfluxdOpts) {
if o.optSetter != nil {
o.optSetter(opts)
}
return nil
}
// WithDefaults returns a slice of options for a default pipeline.
@ -36,11 +35,11 @@ func WithDefaults() []PipelineOption {
return []PipelineOption{}
}
// WithReplicas sets the number of replicas in the pipeline.
// WithLogger sets the logger for the pipeline itself, and the underlying launcher.
func WithLogger(logger *zap.Logger) PipelineOption {
return pipelineOption{
applyConfigFn: func(pc *pipelineConfig) {
pc.logger = logger
applyConfigFn: func(config *pipelineConfig) {
config.logger = logger
},
}
}
@ -48,8 +47,8 @@ func WithLogger(logger *zap.Logger) PipelineOption {
// WithInfluxQLMaxSelectSeriesN configures the maximum number of series returned by a select statement.
func WithInfluxQLMaxSelectSeriesN(n int) PipelineOption {
return pipelineOption{
makeLauncherOptionFn: func() launcher.Option {
return launcher.WithInfluxQLMaxSelectSeriesN(n)
optSetter: func(o *launcher.InfluxdOpts) {
o.CoordinatorConfig.MaxSelectSeriesN = n
},
}
}
@ -57,8 +56,8 @@ func WithInfluxQLMaxSelectSeriesN(n int) PipelineOption {
// WithInfluxQLMaxSelectBucketsN configures the maximum number of buckets returned by a select statement.
func WithInfluxQLMaxSelectBucketsN(n int) PipelineOption {
return pipelineOption{
makeLauncherOptionFn: func() launcher.Option {
return launcher.WithInfluxQLMaxSelectBucketsN(n)
optSetter: func(o *launcher.InfluxdOpts) {
o.CoordinatorConfig.MaxSelectBucketsN = n
},
}
}