feat: annotate context with feature flags when handling flux queries (#18506)
Annotate the context with feature flags when handling flux queries in influxdb. Taking advantage of this in flux end-to-end tests. Using a custom flagger that can set overrides based on the test case that is about to be run, allowing us to enable features in the end-to-end tests.pull/18566/head
parent
9288dc7c36
commit
5d1a759170
|
@ -858,16 +858,18 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
Addr: m.httpBindAddress,
|
||||
}
|
||||
|
||||
m.flagger = feature.DefaultFlagger()
|
||||
if len(m.featureFlags) > 0 {
|
||||
f, err := overrideflagger.Make(m.featureFlags, feature.ByKey)
|
||||
if err != nil {
|
||||
m.log.Error("Failed to configure feature flag overrides",
|
||||
zap.Error(err), zap.Any("overrides", m.featureFlags))
|
||||
return err
|
||||
if m.flagger == nil {
|
||||
m.flagger = feature.DefaultFlagger()
|
||||
if len(m.featureFlags) > 0 {
|
||||
f, err := overrideflagger.Make(m.featureFlags, feature.ByKey)
|
||||
if err != nil {
|
||||
m.log.Error("Failed to configure feature flag overrides",
|
||||
zap.Error(err), zap.Any("overrides", m.featureFlags))
|
||||
return err
|
||||
}
|
||||
m.log.Info("Running with feature flag overrides", zap.Any("overrides", m.featureFlags))
|
||||
m.flagger = f
|
||||
}
|
||||
m.log.Info("Running with feature flag overrides", zap.Any("overrides", m.featureFlags))
|
||||
m.flagger = f
|
||||
}
|
||||
|
||||
var sessionSvc platform.SessionService
|
||||
|
|
|
@ -51,11 +51,12 @@ type TestLauncher struct {
|
|||
}
|
||||
|
||||
// NewTestLauncher returns a new instance of TestLauncher.
|
||||
func NewTestLauncher() *TestLauncher {
|
||||
func NewTestLauncher(flagger feature.Flagger) *TestLauncher {
|
||||
l := &TestLauncher{Launcher: NewLauncher()}
|
||||
l.Launcher.Stdin = &l.Stdin
|
||||
l.Launcher.Stdout = &l.Stdout
|
||||
l.Launcher.Stderr = &l.Stderr
|
||||
l.Launcher.flagger = flagger
|
||||
if testing.Verbose() {
|
||||
l.Launcher.Stdout = io.MultiWriter(l.Launcher.Stdout, os.Stdout)
|
||||
l.Launcher.Stderr = io.MultiWriter(l.Launcher.Stderr, os.Stderr)
|
||||
|
@ -70,9 +71,9 @@ func NewTestLauncher() *TestLauncher {
|
|||
}
|
||||
|
||||
// RunTestLauncherOrFail initializes and starts the server.
|
||||
func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, args ...string) *TestLauncher {
|
||||
func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, flagger feature.Flagger, args ...string) *TestLauncher {
|
||||
tb.Helper()
|
||||
l := NewTestLauncher()
|
||||
l := NewTestLauncher(flagger)
|
||||
|
||||
if err := l.Run(ctx, args...); err != nil {
|
||||
tb.Fatal(err)
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
var ctx = context.Background()
|
||||
|
||||
func TestLauncher_Setup(t *testing.T) {
|
||||
l := launcher.NewTestLauncher()
|
||||
l := launcher.NewTestLauncher(nil)
|
||||
if err := l.Run(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ func TestLauncher_Setup(t *testing.T) {
|
|||
// This is to mimic chronograf using cookies as sessions
|
||||
// rather than authorizations
|
||||
func TestLauncher_SetupWithUsers(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import (
|
|||
var ctx = context.Background()
|
||||
|
||||
func TestLauncher_Pkger(t *testing.T) {
|
||||
l := RunTestLauncherOrFail(t, ctx, "--log-level", "error")
|
||||
l := RunTestLauncherOrFail(t, ctx, nil, "--log-level", "error")
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
require.NoError(t, l.BucketService(t).DeleteBucket(ctx, l.Bucket.ID))
|
||||
|
|
|
@ -30,7 +30,7 @@ import (
|
|||
)
|
||||
|
||||
func TestLauncher_Write_Query_FieldKey(t *testing.T) {
|
||||
be := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
be := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
be.SetupOrFail(t)
|
||||
defer be.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -76,7 +76,7 @@ mem,server=b value=45.2`))
|
|||
// and checks that the queried results contain the expected number of tables
|
||||
// and expected number of columns.
|
||||
func TestLauncher_WriteV2_Query(t *testing.T) {
|
||||
be := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
be := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
be.SetupOrFail(t)
|
||||
defer be.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -295,7 +295,7 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
|
|||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, tc.args...)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil, tc.args...)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -333,7 +333,7 @@ func TestLauncher_QueryMemoryLimits(t *testing.T) {
|
|||
func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) {
|
||||
t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\" on OK query")
|
||||
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx,
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
|
||||
"--log-level", "error",
|
||||
"--query-concurrency", "1",
|
||||
"--query-initial-memory-bytes", "100",
|
||||
|
@ -378,7 +378,7 @@ func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) {
|
|||
func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) {
|
||||
t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\"")
|
||||
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx,
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
|
||||
"--log-level", "error",
|
||||
"--query-concurrency", "1",
|
||||
"--query-initial-memory-bytes", "100",
|
||||
|
@ -422,7 +422,7 @@ func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) {
|
|||
func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) {
|
||||
t.Skip("this test is flaky, occasionally get error: \"dial tcp 127.0.0.1:59654: connect: connection reset by peer\"")
|
||||
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx,
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
|
||||
"--log-level", "error",
|
||||
"--query-queue-size", "1024",
|
||||
"--query-concurrency", "1",
|
||||
|
@ -496,7 +496,7 @@ func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLauncher_Query_LoadSecret_Success(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -546,7 +546,7 @@ from(bucket: "%s")
|
|||
}
|
||||
|
||||
func TestLauncher_Query_LoadSecret_Forbidden(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -605,7 +605,7 @@ from(bucket: "%s")
|
|||
// This will change once we make side effects drive execution and remove from/to concurrency in our e2e tests.
|
||||
// See https://github.com/influxdata/flux/issues/1799.
|
||||
func TestLauncher_DynamicQuery(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -680,7 +680,7 @@ stream2 |> filter(fn: (r) => contains(value: r._value, set: col)) |> group() |>
|
|||
}
|
||||
|
||||
func TestLauncher_Query_ExperimentalTo(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -749,7 +749,7 @@ from(bucket: "%s")
|
|||
}
|
||||
|
||||
func TestLauncher_Query_PushDownWindowAggregateAndBareAggregate(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx,
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
|
||||
"--feature-flags", "pushDownWindowAggregateCount=true,pushDownWindowAggregateSum=true")
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
@ -897,7 +897,7 @@ from(bucket: v.bucket)
|
|||
}
|
||||
|
||||
func TestLauncher_Query_PushDownGroupAggregate(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx,
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateCount=true",
|
||||
"--feature-flags",
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
)
|
||||
|
||||
func TestStorage_WriteAndQuery(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
|
||||
org1 := l.OnBoardOrFail(t, &influxdb.OnboardingRequest{
|
||||
User: "USER-1",
|
||||
|
@ -54,7 +54,7 @@ func TestStorage_WriteAndQuery(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLauncher_WriteAndQuery(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -92,7 +92,7 @@ func TestLauncher_WriteAndQuery(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLauncher_BucketDelete(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
@ -158,7 +158,7 @@ func TestLauncher_BucketDelete(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStorage_CacheSnapshot_Size(t *testing.T) {
|
||||
l := launcher.NewTestLauncher()
|
||||
l := launcher.NewTestLauncher(nil)
|
||||
l.StorageConfig.Engine.Cache.SnapshotMemorySize = 10
|
||||
l.StorageConfig.Engine.Cache.SnapshotAgeDuration = toml.Duration(time.Hour)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
@ -204,7 +204,7 @@ func TestStorage_CacheSnapshot_Size(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStorage_CacheSnapshot_Age(t *testing.T) {
|
||||
l := launcher.NewTestLauncher()
|
||||
l := launcher.NewTestLauncher(nil)
|
||||
l.StorageConfig.Engine.Cache.SnapshotAgeDuration = toml.Duration(time.Second)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
pcontext "github.com/influxdata/influxdb/v2/context"
|
||||
"github.com/influxdata/influxdb/v2/http/metric"
|
||||
"github.com/influxdata/influxdb/v2/kit/check"
|
||||
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
||||
"github.com/influxdata/influxdb/v2/logger"
|
||||
|
@ -48,6 +49,7 @@ type FluxBackend struct {
|
|||
OrganizationService influxdb.OrganizationService
|
||||
ProxyQueryService query.ProxyQueryService
|
||||
FluxLanguageService influxdb.FluxLanguageService
|
||||
Flagger feature.Flagger
|
||||
}
|
||||
|
||||
// NewFluxBackend returns a new instance of FluxBackend.
|
||||
|
@ -63,6 +65,7 @@ func NewFluxBackend(log *zap.Logger, b *APIBackend) *FluxBackend {
|
|||
},
|
||||
OrganizationService: b.OrganizationService,
|
||||
FluxLanguageService: b.FluxLanguageService,
|
||||
Flagger: b.Flagger,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -83,6 +86,8 @@ type FluxHandler struct {
|
|||
FluxLanguageService influxdb.FluxLanguageService
|
||||
|
||||
EventRecorder metric.EventRecorder
|
||||
|
||||
Flagger feature.Flagger
|
||||
}
|
||||
|
||||
// Prefix provides the route prefix.
|
||||
|
@ -102,6 +107,7 @@ func NewFluxHandler(log *zap.Logger, b *FluxBackend) *FluxHandler {
|
|||
OrganizationService: b.OrganizationService,
|
||||
EventRecorder: b.QueryEventRecorder,
|
||||
FluxLanguageService: b.FluxLanguageService,
|
||||
Flagger: b.Flagger,
|
||||
}
|
||||
|
||||
// query reponses can optionally be gzip encoded
|
||||
|
@ -170,6 +176,9 @@ func (h *FluxHandler) handleQuery(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
// Transform the context into one with the request's authorization.
|
||||
ctx = pcontext.SetAuthorizer(ctx, req.Request.Authorization)
|
||||
if h.Flagger != nil {
|
||||
ctx, _ = feature.Annotate(ctx, h.Flagger)
|
||||
}
|
||||
|
||||
hd, ok := req.Dialect.(HTTPDialect)
|
||||
if !ok {
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/query"
|
||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||
"github.com/influxdata/influxdb/v2/query/mock"
|
||||
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
|
@ -341,6 +342,7 @@ func TestFluxHandler_PostQuery_Errors(t *testing.T) {
|
|||
},
|
||||
},
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
Flagger: feature.DefaultFlagger(),
|
||||
}
|
||||
h := NewFluxHandler(zaptest.NewLogger(t), b)
|
||||
|
||||
|
@ -501,6 +503,7 @@ func TestFluxService_Query_gzip(t *testing.T) {
|
|||
OrganizationService: orgService,
|
||||
ProxyQueryService: queryService,
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
Flagger: feature.DefaultFlagger(),
|
||||
}
|
||||
|
||||
fluxHandler := NewFluxHandler(zaptest.NewLogger(t), fluxBackend)
|
||||
|
@ -638,6 +641,7 @@ func benchmarkQuery(b *testing.B, disableCompression bool) {
|
|||
OrganizationService: orgService,
|
||||
ProxyQueryService: queryService,
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
Flagger: feature.DefaultFlagger(),
|
||||
}
|
||||
|
||||
fluxHandler := NewFluxHandler(zaptest.NewLogger(b), fluxBackend)
|
||||
|
|
|
@ -23,8 +23,58 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/query"
|
||||
_ "github.com/influxdata/influxdb/v2/query/stdlib"
|
||||
itesting "github.com/influxdata/influxdb/v2/query/stdlib/testing" // Import the stdlib
|
||||
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||
"github.com/influxdata/influxdb/v2/kit/feature/override"
|
||||
)
|
||||
|
||||
// Flagger for end-to-end test cases. This flagger contains a pointer to a
|
||||
// single struct instance that all the test cases will consult. It will return flags
|
||||
// based on the contents of FluxEndToEndFeatureFlags and the currently active
|
||||
// test case. This works only because tests are serialized. We can set the
|
||||
// current test case in the common flagger state, then run the test. If we were
|
||||
// to run tests in parallel we would need to create multiple users and assign
|
||||
// them different flags combinations, then run the tests under different users.
|
||||
|
||||
type Flagger struct {
|
||||
flaggerState *FlaggerState
|
||||
}
|
||||
|
||||
type FlaggerState struct {
|
||||
Path string
|
||||
Name string
|
||||
FeatureFlags itesting.PerTestFeatureFlagMap
|
||||
DefaultFlagger feature.Flagger
|
||||
}
|
||||
|
||||
func newFlagger(featureFlagMap itesting.PerTestFeatureFlagMap) Flagger {
|
||||
flaggerState := &FlaggerState{}
|
||||
flaggerState.FeatureFlags = featureFlagMap
|
||||
flaggerState.DefaultFlagger = feature.DefaultFlagger()
|
||||
return Flagger{flaggerState}
|
||||
}
|
||||
|
||||
func (f Flagger) SetActiveTestCase(path string, name string) {
|
||||
f.flaggerState.Path = path
|
||||
f.flaggerState.Name = name
|
||||
}
|
||||
|
||||
func (f Flagger) Flags(ctx context.Context, _f ...feature.Flag) (map[string]interface{}, error) {
|
||||
// If an override is set for the test case, construct an override flagger
|
||||
// and use it's computed flags.
|
||||
overrides := f.flaggerState.FeatureFlags[f.flaggerState.Path][f.flaggerState.Name]
|
||||
if overrides != nil {
|
||||
f, err := override.Make( overrides, nil )
|
||||
if err != nil {
|
||||
panic("failed to construct override flagger, probably an invalid flag in FluxEndToEndFeatureFlags")
|
||||
}
|
||||
return f.Flags(ctx)
|
||||
}
|
||||
|
||||
// Otherwise use flags from a default flagger.
|
||||
return f.flaggerState.DefaultFlagger.Flags( ctx )
|
||||
}
|
||||
|
||||
|
||||
// Default context.
|
||||
var ctx = influxdbcontext.SetAuthorizer(context.Background(), mock.NewMockAuthorizer(true, nil))
|
||||
|
||||
|
@ -40,7 +90,8 @@ func BenchmarkFluxEndToEnd(b *testing.B) {
|
|||
}
|
||||
|
||||
func runEndToEnd(t *testing.T, pkgs []*ast.Package) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx)
|
||||
flagger := newFlagger(itesting.FluxEndToEndFeatureFlags)
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, flagger)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
for _, pkg := range pkgs {
|
||||
|
@ -60,6 +111,8 @@ func runEndToEnd(t *testing.T, pkgs []*ast.Package) {
|
|||
if reason, ok := itesting.FluxEndToEndSkipList[pkg.Path][name]; ok {
|
||||
t.Skip(reason)
|
||||
}
|
||||
|
||||
flagger.SetActiveTestCase(pkg.Path, name)
|
||||
testFlux(t, l, file)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -149,3 +149,28 @@ var FluxEndToEndSkipList = map[string]map[string]string{
|
|||
"year": "flakey test: https://github.com/influxdata/influxdb/issues/15667",
|
||||
},
|
||||
}
|
||||
|
||||
type PerTestFeatureFlagMap = map[string]map[string]map[string]string
|
||||
|
||||
var FluxEndToEndFeatureFlags = PerTestFeatureFlagMap{
|
||||
"planner": {
|
||||
"window_count_push": {
|
||||
"pushDownWindowAggregateCount": "true",
|
||||
},
|
||||
"window_sum_push": {
|
||||
"pushDownWindowAggregateSum": "true",
|
||||
},
|
||||
"bare_count_push": {
|
||||
"pushDownWindowAggregateCount": "true",
|
||||
},
|
||||
"bare_sum_push": {
|
||||
"pushDownWindowAggregateSum": "true",
|
||||
},
|
||||
"group_count_push": {
|
||||
"pushDownGroupAggregateCount": "true",
|
||||
},
|
||||
"group_sum_push": {
|
||||
"pushDownGroupAggregateSum": "true",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue