From 5a919b69d7661b510690cc78e103644b48ba7a1b Mon Sep 17 00:00:00 2001 From: William Baker Date: Mon, 13 Dec 2021 16:01:50 -0600 Subject: [PATCH] feat: enable remotes and replication streams feature (#22990) --- cmd/influxd/launcher/launcher.go | 28 +++++++++++------------- cmd/influxd/launcher/replication_test.go | 17 ++++---------- flags.yml | 8 ------- kit/feature/list.go | 16 -------------- remotes/transport/http.go | 15 ------------- remotes/transport/http_test.go | 15 +------------ replications/transport/http.go | 15 ------------- replications/transport/http_test.go | 15 +------------ 8 files changed, 19 insertions(+), 110 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 1f64c97721..69583bc12e 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -359,24 +359,22 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { ts.BucketService = replications.NewBucketService( m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc) - if feature.ReplicationStreamBackend().Enabled(ctx, m.flagger) { - m.reg.MustRegister(replicationsMetrics.PrometheusCollectors()...) + m.reg.MustRegister(replicationsMetrics.PrometheusCollectors()...) - if err = replicationSvc.Open(ctx); err != nil { - m.log.Error("Failed to open replications service", zap.Error(err)) - return err - } - - m.closers = append(m.closers, labeledCloser{ - label: "replications", - closer: func(context.Context) error { - return replicationSvc.Close() - }, - }) - - pointsWriter = replicationSvc + if err = replicationSvc.Open(ctx); err != nil { + m.log.Error("Failed to open replications service", zap.Error(err)) + return err } + m.closers = append(m.closers, labeledCloser{ + label: "replications", + closer: func(context.Context) error { + return replicationSvc.Close() + }, + }) + + pointsWriter = replicationSvc + deps, err := influxdb.NewDependencies( storageflux.NewReader(storage2.NewStore(m.engine.TSDBStore(), m.engine.MetaClient())), m.engine, diff --git a/cmd/influxd/launcher/replication_test.go b/cmd/influxd/launcher/replication_test.go index a1b2113fb8..cb623f32e3 100644 --- a/cmd/influxd/launcher/replication_test.go +++ b/cmd/influxd/launcher/replication_test.go @@ -14,14 +14,11 @@ import ( "github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/cmd/influxd/launcher" - "github.com/influxdata/influxdb/v2/kit/feature" "github.com/stretchr/testify/require" ) func TestValidateReplication_Valid(t *testing.T) { - l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { - o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"} - }) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) client := l.APIClient(t) @@ -84,9 +81,7 @@ func TestValidateReplication_Valid(t *testing.T) { } func TestValidateReplication_Invalid(t *testing.T) { - l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { - o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"} - }) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) client := l.APIClient(t) @@ -200,9 +195,7 @@ func TestReplicationStreamEndToEnd(t *testing.T) { `,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,300,f,m,v3` + "\r\n" + `,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,400,f,m,v4` + "\r\n\r\n" - l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { - o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"} - }) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) defer l.ShutdownOrFail(t, ctx) client := l.APIClient(t) @@ -312,9 +305,7 @@ func TestReplicationStreamEndToEnd(t *testing.T) { } func TestReplicationsLocalWriteAndShutdownBlocking(t *testing.T) { - l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { - o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"} - }) + l := launcher.RunAndSetupNewLauncherOrFail(ctx, t) client := l.APIClient(t) // Server that only returns an error will cause the remote write to retry on loop. diff --git a/flags.yml b/flags.yml index 2a4a23a85f..324bd4164e 100644 --- a/flags.yml +++ b/flags.yml @@ -90,14 +90,6 @@ expose: true lifetime: temporary -- name: Replication Stream Backend - description: Enable replication-stream APIs and underlying synchronization queues - key: replicationStreamBackend - default: false - contact: Edge Team - expose: true - lifetime: temporary - - name: New Dashboard Autorefresh description: Enables the new dashboard autorefresh controls in the UI key: newAutoRefresh diff --git a/kit/feature/list.go b/kit/feature/list.go index 23fe6058ec..19f6721563 100644 --- a/kit/feature/list.go +++ b/kit/feature/list.go @@ -156,20 +156,6 @@ func RefreshSingleCell() BoolFlag { return refreshSingleCell } -var replicationStreamBackend = MakeBoolFlag( - "Replication Stream Backend", - "replicationStreamBackend", - "Edge Team", - false, - Temporary, - true, -) - -// ReplicationStreamBackend - Enable replication-stream APIs and underlying synchronization queues -func ReplicationStreamBackend() BoolFlag { - return replicationStreamBackend -} - var newAutoRefresh = MakeBoolFlag( "New Dashboard Autorefresh", "newAutoRefresh", @@ -196,7 +182,6 @@ var all = []Flag{ timeFilterFlags, cursorAtEOF, refreshSingleCell, - replicationStreamBackend, newAutoRefresh, } @@ -212,6 +197,5 @@ var byKey = map[string]Flag{ "timeFilterFlags": timeFilterFlags, "cursorAtEOF": cursorAtEOF, "refreshSingleCell": refreshSingleCell, - "replicationStreamBackend": replicationStreamBackend, "newAutoRefresh": newAutoRefresh, } diff --git a/remotes/transport/http.go b/remotes/transport/http.go index 0b3f9a0269..1ae8df32fb 100644 --- a/remotes/transport/http.go +++ b/remotes/transport/http.go @@ -7,7 +7,6 @@ import ( "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" @@ -80,7 +79,6 @@ func newRemoteConnectionHandler(log *zap.Logger, svc RemoteConnectionService) *R middleware.Recoverer, middleware.RequestID, middleware.RealIP, - h.mwRemotesFlag, // Temporary, remove when feature flag for remote connections is perma-enabled. ) r.Route("/", func(r chi.Router) { @@ -102,19 +100,6 @@ func (h *RemoteConnectionHandler) Prefix() string { return prefixRemotes } -func (h *RemoteConnectionHandler) mwRemotesFlag(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - flags := feature.FlagsFromContext(r.Context()) - - if flagVal, ok := flags[feature.ReplicationStreamBackend().Key()]; !ok || !flagVal.(bool) { - h.api.Respond(w, r, http.StatusNotFound, nil) - return - } - - next.ServeHTTP(w, r) - }) -} - func (h *RemoteConnectionHandler) handleGetRemotes(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() diff --git a/remotes/transport/http_test.go b/remotes/transport/http_test.go index adc4adaa0c..9d3152d1c3 100644 --- a/remotes/transport/http_test.go +++ b/remotes/transport/http_test.go @@ -9,13 +9,11 @@ import ( "github.com/golang/mock/gomock" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/remotes/mock" "github.com/stretchr/testify/assert" tmock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -181,21 +179,10 @@ func TestRemoteConnectionHandler(t *testing.T) { func newTestServer(t *testing.T) (*httptest.Server, *mock.MockRemoteConnectionService) { ctrlr := gomock.NewController(t) svc := mock.NewMockRemoteConnectionService(ctrlr) - server := annotatedTestServer(newRemoteConnectionHandler(zaptest.NewLogger(t), svc)) + server := newRemoteConnectionHandler(zaptest.NewLogger(t), svc) return httptest.NewServer(server), svc } -func annotatedTestServer(serv http.Handler) http.Handler { - replicationFlag := feature.MakeFlag("", feature.ReplicationStreamBackend().Key(), "", true, 0, true) - - return feature.NewHandler( - zap.NewNop(), - feature.DefaultFlagger(), - []feature.Flag{replicationFlag}, - serv, - ) -} - func newTestRequest(t *testing.T, method, path string, body interface{}) *http.Request { dat, err := json.Marshal(body) require.NoError(t, err) diff --git a/replications/transport/http.go b/replications/transport/http.go index f1a09e9998..6e5e6ea347 100644 --- a/replications/transport/http.go +++ b/replications/transport/http.go @@ -7,7 +7,6 @@ import ( "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" @@ -102,7 +101,6 @@ func newReplicationHandler(log *zap.Logger, svc ReplicationService) *Replication middleware.Recoverer, middleware.RequestID, middleware.RealIP, - h.mwReplicationsFlag, ) r.Route("/", func(r chi.Router) { @@ -125,19 +123,6 @@ func (h *ReplicationHandler) Prefix() string { return prefixReplications } -func (h *ReplicationHandler) mwReplicationsFlag(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - flags := feature.FlagsFromContext(r.Context()) - - if flagVal, ok := flags[feature.ReplicationStreamBackend().Key()]; !ok || !flagVal.(bool) { - h.api.Respond(w, r, http.StatusNotFound, nil) - return - } - - next.ServeHTTP(w, r) - }) -} - func (h *ReplicationHandler) handleGetReplications(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() diff --git a/replications/transport/http_test.go b/replications/transport/http_test.go index 88e899c3ec..3703efbdcd 100644 --- a/replications/transport/http_test.go +++ b/replications/transport/http_test.go @@ -9,13 +9,11 @@ import ( "github.com/golang/mock/gomock" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/replications/mock" "github.com/stretchr/testify/assert" tmock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -316,21 +314,10 @@ func TestReplicationHandler(t *testing.T) { func newTestServer(t *testing.T) (*httptest.Server, *mock.MockReplicationService) { ctrl := gomock.NewController(t) svc := mock.NewMockReplicationService(ctrl) - server := annotatedTestServer(newReplicationHandler(zaptest.NewLogger(t), svc)) + server := newReplicationHandler(zaptest.NewLogger(t), svc) return httptest.NewServer(server), svc } -func annotatedTestServer(serv http.Handler) http.Handler { - replicationFlag := feature.MakeFlag("", feature.ReplicationStreamBackend().Key(), "", true, 0, true) - - return feature.NewHandler( - zap.NewNop(), - feature.DefaultFlagger(), - []feature.Flag{replicationFlag}, - serv, - ) -} - func newTestRequest(t *testing.T, method, path string, body interface{}) *http.Request { dat, err := json.Marshal(body) require.NoError(t, err)