diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index d7f70dd968..222d92efde 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -57,6 +57,8 @@ import ( "github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb" "github.com/influxdata/influxdb/v2/remotes" remotesTransport "github.com/influxdata/influxdb/v2/remotes/transport" + "github.com/influxdata/influxdb/v2/replications" + replicationTransport "github.com/influxdata/influxdb/v2/replications/transport" "github.com/influxdata/influxdb/v2/secret" "github.com/influxdata/influxdb/v2/session" "github.com/influxdata/influxdb/v2/snowflake" @@ -972,6 +974,12 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { ), ) + replicationSvc := replications.NewService() + replicationServer := replicationTransport.NewReplicationHandler( + m.log.With(zap.String("handler", "replications")), + replicationSvc, + ) + platformHandler := http.NewPlatformHandler( m.apibackend, http.WithResourceHandler(stacksHTTPServer), @@ -990,6 +998,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { http.WithResourceHandler(notebookServer), http.WithResourceHandler(annotationServer), http.WithResourceHandler(remotesServer), + http.WithResourceHandler(replicationServer), ) httpLogger := m.log.With(zap.String("service", "http")) diff --git a/mock/replication.go b/mock/replication.go new file mode 100644 index 0000000000..9be326c11b --- /dev/null +++ b/mock/replication.go @@ -0,0 +1,153 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2 (interfaces: ReplicationService) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + influxdb "github.com/influxdata/influxdb/v2" + platform "github.com/influxdata/influxdb/v2/kit/platform" +) + +// MockReplicationService is a mock of ReplicationService interface. +type MockReplicationService struct { + ctrl *gomock.Controller + recorder *MockReplicationServiceMockRecorder +} + +// MockReplicationServiceMockRecorder is the mock recorder for MockReplicationService. +type MockReplicationServiceMockRecorder struct { + mock *MockReplicationService +} + +// NewMockReplicationService creates a new mock instance. +func NewMockReplicationService(ctrl *gomock.Controller) *MockReplicationService { + mock := &MockReplicationService{ctrl: ctrl} + mock.recorder = &MockReplicationServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockReplicationService) EXPECT() *MockReplicationServiceMockRecorder { + return m.recorder +} + +// CreateReplication mocks base method. +func (m *MockReplicationService) CreateReplication(arg0 context.Context, arg1 influxdb.CreateReplicationRequest) (*influxdb.Replication, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateReplication", arg0, arg1) + ret0, _ := ret[0].(*influxdb.Replication) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateReplication indicates an expected call of CreateReplication. +func (mr *MockReplicationServiceMockRecorder) CreateReplication(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateReplication", reflect.TypeOf((*MockReplicationService)(nil).CreateReplication), arg0, arg1) +} + +// DeleteReplication mocks base method. +func (m *MockReplicationService) DeleteReplication(arg0 context.Context, arg1 platform.ID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteReplication", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteReplication indicates an expected call of DeleteReplication. +func (mr *MockReplicationServiceMockRecorder) DeleteReplication(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteReplication", reflect.TypeOf((*MockReplicationService)(nil).DeleteReplication), arg0, arg1) +} + +// GetReplication mocks base method. +func (m *MockReplicationService) GetReplication(arg0 context.Context, arg1 platform.ID) (*influxdb.Replication, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetReplication", arg0, arg1) + ret0, _ := ret[0].(*influxdb.Replication) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetReplication indicates an expected call of GetReplication. +func (mr *MockReplicationServiceMockRecorder) GetReplication(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReplication", reflect.TypeOf((*MockReplicationService)(nil).GetReplication), arg0, arg1) +} + +// ListReplications mocks base method. +func (m *MockReplicationService) ListReplications(arg0 context.Context, arg1 influxdb.ReplicationListFilter) (*influxdb.Replications, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListReplications", arg0, arg1) + ret0, _ := ret[0].(*influxdb.Replications) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListReplications indicates an expected call of ListReplications. +func (mr *MockReplicationServiceMockRecorder) ListReplications(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListReplications", reflect.TypeOf((*MockReplicationService)(nil).ListReplications), arg0, arg1) +} + +// UpdateReplication mocks base method. +func (m *MockReplicationService) UpdateReplication(arg0 context.Context, arg1 platform.ID, arg2 influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateReplication", arg0, arg1, arg2) + ret0, _ := ret[0].(*influxdb.Replication) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateReplication indicates an expected call of UpdateReplication. +func (mr *MockReplicationServiceMockRecorder) UpdateReplication(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateReplication", reflect.TypeOf((*MockReplicationService)(nil).UpdateReplication), arg0, arg1, arg2) +} + +// ValidateNewReplication mocks base method. +func (m *MockReplicationService) ValidateNewReplication(arg0 context.Context, arg1 influxdb.CreateReplicationRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ValidateNewReplication", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ValidateNewReplication indicates an expected call of ValidateNewReplication. +func (mr *MockReplicationServiceMockRecorder) ValidateNewReplication(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateNewReplication", reflect.TypeOf((*MockReplicationService)(nil).ValidateNewReplication), arg0, arg1) +} + +// ValidateReplication mocks base method. +func (m *MockReplicationService) ValidateReplication(arg0 context.Context, arg1 platform.ID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ValidateReplication", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ValidateReplication indicates an expected call of ValidateReplication. +func (mr *MockReplicationServiceMockRecorder) ValidateReplication(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateReplication", reflect.TypeOf((*MockReplicationService)(nil).ValidateReplication), arg0, arg1) +} + +// ValidateUpdatedReplication mocks base method. +func (m *MockReplicationService) ValidateUpdatedReplication(arg0 context.Context, arg1 platform.ID, arg2 influxdb.UpdateReplicationRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ValidateUpdatedReplication", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// ValidateUpdatedReplication indicates an expected call of ValidateUpdatedReplication. +func (mr *MockReplicationServiceMockRecorder) ValidateUpdatedReplication(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateUpdatedReplication", reflect.TypeOf((*MockReplicationService)(nil).ValidateUpdatedReplication), arg0, arg1, arg2) +} diff --git a/remotes/transport/http.go b/remotes/transport/http.go index f471559a2d..c419475bd2 100644 --- a/remotes/transport/http.go +++ b/remotes/transport/http.go @@ -182,7 +182,7 @@ func (h *RemoteConnectionHandler) handlePatchRemote(w http.ResponseWriter, r *ht h.api.Err(w, r, err) return } - h.api.Respond(w, r, http.StatusNoContent, false) + h.api.Respond(w, r, http.StatusNoContent, nil) return } diff --git a/replication.go b/replication.go new file mode 100644 index 0000000000..fbcf7655f2 --- /dev/null +++ b/replication.go @@ -0,0 +1,117 @@ +package influxdb + +import ( + "context" + "fmt" + + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/kit/platform/errors" +) + +const ( + MinReplicationMaxQueueSizeBytes int64 = 33554430 // 32 MiB + DefaultReplicationMaxQueueSizeBytes = 2 * MinReplicationMaxQueueSizeBytes +) + +var ErrMaxQueueSizeTooSmall = errors.Error{ + Code: errors.EInvalid, + Msg: fmt.Sprintf("maxQueueSize too small, must be at least %d", MinReplicationMaxQueueSizeBytes), +} + +type ReplicationService interface { + // ListReplications returns all info about registered replications matching a filter. + ListReplications(context.Context, ReplicationListFilter) (*Replications, error) + + // CreateReplication registers a new replication stream. + CreateReplication(context.Context, CreateReplicationRequest) (*Replication, error) + + // ValidateNewReplication validates that the given settings for a replication are usable, + // without persisting the configuration. + ValidateNewReplication(context.Context, CreateReplicationRequest) error + + // GetReplication returns metadata about the replication with the given ID. + GetReplication(context.Context, platform.ID) (*Replication, error) + + // UpdateReplication updates the settings for the replication with the given ID. + UpdateReplication(context.Context, platform.ID, UpdateReplicationRequest) (*Replication, error) + + // ValidateUpdatedReplication valdiates that a replication is still usable after applying the + // given update, without persisting the new configuration. + ValidateUpdatedReplication(context.Context, platform.ID, UpdateReplicationRequest) error + + // DeleteReplication deletes all info for the replication with the given ID. + DeleteReplication(context.Context, platform.ID) error + + // ValidateReplication checks that the replication with the given ID is still usable with its + // persisted settings. + ValidateReplication(context.Context, platform.ID) error +} + +// Replication contains all info about a replication that should be returned to users. +type Replication struct { + ID platform.ID `json:"id"` + OrgID platform.ID `json:"orgID"` + Name string `json:"name"` + Description *string `json:"description,omitempty"` + RemoteID platform.ID `json:"remoteID"` + LocalBucketID platform.ID `json:"localBucketID"` + RemoteBucketID platform.ID `json:"remoteBucketID"` + MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes"` + CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes"` + LatestResponseCode *int32 `json:"latestResponseCode,omitempty"` + LatestErrorMessage *string `json:"latestErrorMessage,omitempty"` +} + +// ReplicationListFilter is a selection filter for listing replications. +type ReplicationListFilter struct { + OrgID platform.ID + Name *string + RemoteID *platform.ID + LocalBucketID *platform.ID +} + +// Replications is a collection of metadata about replications. +type Replications struct { + Replications []Replication `json:"replications"` +} + +// CreateReplicationRequest contains all info needed to establish a new replication +// to a remote InfluxDB bucket. +type CreateReplicationRequest struct { + OrgID platform.ID `json:"orgID"` + Name string `json:"name"` + Description *string `json:"description,omitempty"` + RemoteID platform.ID `json:"remoteID"` + LocalBucketID platform.ID `json:"localBucketID"` + RemoteBucketID platform.ID `json:"remoteBucketID"` + MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes,omitempty"` +} + +func (r *CreateReplicationRequest) OK() error { + if r.MaxQueueSizeBytes < MinReplicationMaxQueueSizeBytes { + return &ErrMaxQueueSizeTooSmall + } + + return nil +} + +// UpdateReplicationRequest contains a partial update to existing info about a replication. +type UpdateReplicationRequest struct { + Name *string `json:"name,omitempty"` + Description *string `json:"description,omitempty"` + RemoteID *platform.ID `json:"remoteID,omitempty"` + RemoteBucketID *platform.ID `json:"remoteBucketID,omitempty"` + MaxQueueSizeBytes *int64 `json:"maxQueueSizeBytes,omitempty"` +} + +func (r *UpdateReplicationRequest) OK() error { + if r.MaxQueueSizeBytes == nil { + return nil + } + + if *r.MaxQueueSizeBytes < MinReplicationMaxQueueSizeBytes { + return &ErrMaxQueueSizeTooSmall + } + + return nil +} diff --git a/replications/service.go b/replications/service.go new file mode 100644 index 0000000000..282bd7e967 --- /dev/null +++ b/replications/service.go @@ -0,0 +1,54 @@ +package replications + +import ( + "context" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform" + ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors" +) + +var errNotImplemented = &ierrors.Error{ + Code: ierrors.ENotImplemented, + Msg: "replication APIs not yet implemented", +} + +func NewService() *service { + return &service{} +} + +type service struct{} + +var _ influxdb.ReplicationService = (*service)(nil) + +func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { + return nil, errNotImplemented +} + +func (s service) CreateReplication(ctx context.Context, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) { + return nil, errNotImplemented +} + +func (s service) ValidateNewReplication(ctx context.Context, request influxdb.CreateReplicationRequest) error { + return errNotImplemented +} + +func (s service) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) { + return nil, errNotImplemented +} + +func (s service) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) { + return nil, errNotImplemented +} + +func (s service) ValidateUpdatedReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) error { + return errNotImplemented +} + +func (s service) DeleteReplication(ctx context.Context, id platform.ID) error { + return errNotImplemented +} + +func (s service) ValidateReplication(ctx context.Context, id platform.ID) error { + return errNotImplemented +} diff --git a/replications/transport/http.go b/replications/transport/http.go new file mode 100644 index 0000000000..a2cb18c5e8 --- /dev/null +++ b/replications/transport/http.go @@ -0,0 +1,247 @@ +package transport + +import ( + "net/http" + + "github.com/go-chi/chi" + "github.com/go-chi/chi/middleware" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/feature" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/kit/platform/errors" + kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" + "go.uber.org/zap" +) + +const ( + prefixReplications = "/api/v2/replications" +) + +var ( + errBadOrg = &errors.Error{ + Code: errors.EInvalid, + Msg: "invalid or missing org ID", + } + + errBadRemoteID = &errors.Error{ + Code: errors.EInvalid, + Msg: "invalid remote ID", + } + + errBadLocalBucketID = &errors.Error{ + Code: errors.EInvalid, + Msg: "invalid local bucket ID", + } + + errBadId = &errors.Error{ + Code: errors.EInvalid, + Msg: "replication ID is invalid", + } +) + +type ReplicationHandler struct { + chi.Router + + log *zap.Logger + api *kithttp.API + + replicationsService influxdb.ReplicationService +} + +func NewReplicationHandler(log *zap.Logger, svc influxdb.ReplicationService) *ReplicationHandler { + h := &ReplicationHandler{ + log: log, + api: kithttp.NewAPI(kithttp.WithLog(log)), + replicationsService: svc, + } + + r := chi.NewRouter() + r.Use( + middleware.Recoverer, + middleware.RequestID, + middleware.RealIP, + h.mwReplicationsFlag, + ) + + r.Route("/", func(r chi.Router) { + r.Get("/", h.handleGetReplications) + r.Post("/", h.handlePostReplication) + + r.Route("/{id}", func(r chi.Router) { + r.Get("/", h.handleGetReplication) + r.Patch("/", h.handlePatchReplication) + r.Delete("/", h.handleDeleteReplication) + r.Post("/validate", h.handleValidateReplication) + }) + }) + + h.Router = r + return h +} + +func (h *ReplicationHandler) Prefix() string { + return prefixReplications +} + +func (h *ReplicationHandler) mwReplicationsFlag(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + flags := feature.FlagsFromContext(r.Context()) + + if flagVal, ok := flags[feature.ReplicationStreamBackend().Key()]; !ok || !flagVal.(bool) { + h.api.Respond(w, r, http.StatusNotFound, nil) + return + } + + next.ServeHTTP(w, r) + }) +} + +func (h *ReplicationHandler) handleGetReplications(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + + // orgID is required for listing replications. + orgID := q.Get("orgID") + o, err := platform.IDFromString(orgID) + if err != nil { + h.api.Err(w, r, errBadOrg) + return + } + + // name, remoteID, and localBucketID are optional additional filters. + name := q.Get("name") + remoteID := q.Get("remoteID") + localBucketID := q.Get("localBucketID") + + filters := influxdb.ReplicationListFilter{OrgID: *o} + if name != "" { + filters.Name = &name + } + if remoteID != "" { + i, err := platform.IDFromString(remoteID) + if err != nil { + h.api.Err(w, r, errBadRemoteID) + return + } + filters.RemoteID = i + } + if localBucketID != "" { + i, err := platform.IDFromString(localBucketID) + if err != nil { + h.api.Err(w, r, errBadLocalBucketID) + return + } + filters.LocalBucketID = i + } + + rs, err := h.replicationsService.ListReplications(r.Context(), filters) + if err != nil { + h.api.Err(w, r, err) + return + } + h.api.Respond(w, r, http.StatusOK, rs) +} + +func (h *ReplicationHandler) handlePostReplication(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + q := r.URL.Query() + + validate := q.Get("validate") == "true" + req := influxdb.CreateReplicationRequest{MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes} + if err := h.api.DecodeJSON(r.Body, &req); err != nil { + h.api.Err(w, r, err) + return + } + + if validate { + if err := h.replicationsService.ValidateNewReplication(ctx, req); err != nil { + h.api.Err(w, r, err) + return + } + h.api.Respond(w, r, http.StatusNoContent, nil) + return + } + + replication, err := h.replicationsService.CreateReplication(ctx, req) + if err != nil { + h.api.Err(w, r, err) + return + } + h.api.Respond(w, r, http.StatusCreated, replication) +} + +func (h *ReplicationHandler) handleGetReplication(w http.ResponseWriter, r *http.Request) { + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadId) + return + } + + replication, err := h.replicationsService.GetReplication(r.Context(), *id) + if err != nil { + h.api.Err(w, r, err) + return + } + h.api.Respond(w, r, http.StatusOK, replication) +} + +func (h *ReplicationHandler) handlePatchReplication(w http.ResponseWriter, r *http.Request) { + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadId) + return + } + + ctx := r.Context() + q := r.URL.Query() + + validate := q.Get("validate") == "true" + var req influxdb.UpdateReplicationRequest + if err := h.api.DecodeJSON(r.Body, &req); err != nil { + h.api.Err(w, r, err) + return + } + + if validate { + if err := h.replicationsService.ValidateUpdatedReplication(ctx, *id, req); err != nil { + h.api.Err(w, r, err) + return + } + h.api.Respond(w, r, http.StatusNoContent, nil) + return + } + + replication, err := h.replicationsService.UpdateReplication(ctx, *id, req) + if err != nil { + h.api.Err(w, r, err) + return + } + h.api.Respond(w, r, http.StatusOK, replication) +} + +func (h *ReplicationHandler) handleDeleteReplication(w http.ResponseWriter, r *http.Request) { + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadId) + return + } + + if err := h.replicationsService.DeleteReplication(r.Context(), *id); err != nil { + h.api.Err(w, r, err) + return + } + h.api.Respond(w, r, http.StatusNoContent, nil) +} + +func (h *ReplicationHandler) handleValidateReplication(w http.ResponseWriter, r *http.Request) { + id, err := platform.IDFromString(chi.URLParam(r, "id")) + if err != nil { + h.api.Err(w, r, errBadId) + return + } + + if err := h.replicationsService.ValidateReplication(r.Context(), *id); err != nil { + h.api.Err(w, r, err) + return + } + h.api.Respond(w, r, http.StatusNoContent, nil) +} diff --git a/replications/transport/http_test.go b/replications/transport/http_test.go new file mode 100644 index 0000000000..ce82d441f8 --- /dev/null +++ b/replications/transport/http_test.go @@ -0,0 +1,352 @@ +package transport + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/golang/mock/gomock" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/feature" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/mock" + "github.com/stretchr/testify/assert" + tmock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" +) + +var ( + orgStr = "1234123412341234" + orgID, _ = platform.IDFromString(orgStr) + remoteStr = "9876987698769876" + remoteID, _ = platform.IDFromString(remoteStr) + idStr = "4321432143214321" + id, _ = platform.IDFromString(idStr) + localBucketStr = "1111111111111111" + localBucketId, _ = platform.IDFromString(localBucketStr) + remoteBucketStr = "1234567887654321" + remoteBucketID, _ = platform.IDFromString(remoteBucketStr) + testReplication = influxdb.Replication{ + ID: *id, + OrgID: *orgID, + RemoteID: *remoteID, + LocalBucketID: *localBucketId, + RemoteBucketID: *remoteBucketID, + Name: "example", + MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes, + } +) + +func TestReplicationHandler(t *testing.T) { + t.Run("get replications happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "GET", ts.URL, nil) + + q := req.URL.Query() + q.Add("orgID", orgStr) + q.Add("name", testReplication.Name) + q.Add("remoteID", remoteStr) + q.Add("localBucketID", localBucketStr) + req.URL.RawQuery = q.Encode() + + expected := influxdb.Replications{Replications: []influxdb.Replication{testReplication}} + + svc.EXPECT(). + ListReplications(gomock.Any(), tmock.MatchedBy(func(in influxdb.ReplicationListFilter) bool { + return assert.Equal(t, *orgID, in.OrgID) && + assert.Equal(t, testReplication.Name, *in.Name) && + assert.Equal(t, testReplication.RemoteID, *in.RemoteID) && + assert.Equal(t, testReplication.LocalBucketID, *in.LocalBucketID) + })).Return(&expected, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + var got influxdb.Replications + require.NoError(t, json.NewDecoder(res.Body).Decode(&got)) + require.Equal(t, expected, got) + }) + + t.Run("create replication happy path", func(t *testing.T) { + + body := influxdb.CreateReplicationRequest{ + OrgID: testReplication.OrgID, + Name: testReplication.Name, + RemoteID: testReplication.RemoteID, + LocalBucketID: testReplication.LocalBucketID, + RemoteBucketID: testReplication.RemoteBucketID, + } + + t.Run("with explicit queue size", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + body := body + body.MaxQueueSizeBytes = 2 * influxdb.DefaultReplicationMaxQueueSizeBytes + + req := newTestRequest(t, "POST", ts.URL, &body) + + svc.EXPECT().CreateReplication(gomock.Any(), body).Return(&testReplication, nil) + + res := doTestRequest(t, req, http.StatusCreated, true) + + var got influxdb.Replication + require.NoError(t, json.NewDecoder(res.Body).Decode(&got)) + require.Equal(t, testReplication, got) + }) + + t.Run("with default queue size", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "POST", ts.URL, &body) + + expectedBody := body + expectedBody.MaxQueueSizeBytes = influxdb.DefaultReplicationMaxQueueSizeBytes + + svc.EXPECT().CreateReplication(gomock.Any(), expectedBody).Return(&testReplication, nil) + + res := doTestRequest(t, req, http.StatusCreated, true) + + var got influxdb.Replication + require.NoError(t, json.NewDecoder(res.Body).Decode(&got)) + require.Equal(t, testReplication, got) + }) + }) + + t.Run("dry-run create happy path", func(t *testing.T) { + + body := influxdb.CreateReplicationRequest{ + OrgID: testReplication.OrgID, + Name: testReplication.Name, + RemoteID: testReplication.RemoteID, + LocalBucketID: testReplication.LocalBucketID, + RemoteBucketID: testReplication.RemoteBucketID, + } + + t.Run("with explicit queue size", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + body := body + body.MaxQueueSizeBytes = 2 * influxdb.DefaultReplicationMaxQueueSizeBytes + + req := newTestRequest(t, "POST", ts.URL, &body) + q := req.URL.Query() + q.Add("validate", "true") + req.URL.RawQuery = q.Encode() + + svc.EXPECT().ValidateNewReplication(gomock.Any(), body).Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + + t.Run("with default queue size", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "POST", ts.URL, &body) + q := req.URL.Query() + q.Add("validate", "true") + req.URL.RawQuery = q.Encode() + + expectedBody := body + expectedBody.MaxQueueSizeBytes = influxdb.DefaultReplicationMaxQueueSizeBytes + + svc.EXPECT().ValidateNewReplication(gomock.Any(), expectedBody).Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + }) + + t.Run("get replication happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "GET", ts.URL+"/"+id.String(), nil) + + svc.EXPECT().GetReplication(gomock.Any(), *id).Return(&testReplication, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + var got influxdb.Replication + require.NoError(t, json.NewDecoder(res.Body).Decode(&got)) + require.Equal(t, testReplication, got) + }) + + t.Run("delete replication happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "DELETE", ts.URL+"/"+id.String(), nil) + + svc.EXPECT().DeleteReplication(gomock.Any(), *id).Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + + t.Run("update replication happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + newDescription := "my cool replication" + newQueueSize := 3 * influxdb.DefaultReplicationMaxQueueSizeBytes + body := influxdb.UpdateReplicationRequest{Description: &newDescription, MaxQueueSizeBytes: &newQueueSize} + + req := newTestRequest(t, "PATCH", ts.URL+"/"+id.String(), body) + + svc.EXPECT().UpdateReplication(gomock.Any(), *id, body).Return(&testReplication, nil) + + res := doTestRequest(t, req, http.StatusOK, true) + + var got influxdb.Replication + require.NoError(t, json.NewDecoder(res.Body).Decode(&got)) + require.Equal(t, testReplication, got) + }) + + t.Run("dry-run update happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + newDescription := "my cool replication" + newQueueSize := 3 * influxdb.DefaultReplicationMaxQueueSizeBytes + body := influxdb.UpdateReplicationRequest{Description: &newDescription, MaxQueueSizeBytes: &newQueueSize} + + req := newTestRequest(t, "PATCH", ts.URL+"/"+id.String(), body) + q := req.URL.Query() + q.Add("validate", "true") + req.URL.RawQuery = q.Encode() + + svc.EXPECT().ValidateUpdatedReplication(gomock.Any(), *id, body).Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + + t.Run("validate replication happy path", func(t *testing.T) { + ts, svc := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "POST", ts.URL+"/"+id.String()+"/validate", nil) + + svc.EXPECT().ValidateReplication(gomock.Any(), *id).Return(nil) + + doTestRequest(t, req, http.StatusNoContent, false) + }) + + t.Run("invalid replication IDs return 400", func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + req1 := newTestRequest(t, "GET", ts.URL+"/foo", nil) + req2 := newTestRequest(t, "PATCH", ts.URL+"/foo", &influxdb.UpdateReplicationRequest{}) + req3 := newTestRequest(t, "DELETE", ts.URL+"/foo", nil) + + for _, req := range []*http.Request{req1, req2, req3} { + t.Run(req.Method, func(t *testing.T) { + doTestRequest(t, req, http.StatusBadRequest, true) + }) + } + }) + + t.Run("invalid org ID to GET /replications returns 400", func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + req := newTestRequest(t, "GET", ts.URL, nil) + q := req.URL.Query() + q.Add("orgID", "foo") + req.URL.RawQuery = q.Encode() + + doTestRequest(t, req, http.StatusBadRequest, true) + }) + + t.Run("invalid request bodies return 400", func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + body := "o no not an object" + req1 := newTestRequest(t, "POST", ts.URL, &body) + req2 := newTestRequest(t, "PATCH", ts.URL+"/"+id.String(), &body) + + for _, req := range []*http.Request{req1, req2} { + t.Run(req.Method, func(t *testing.T) { + doTestRequest(t, req, http.StatusBadRequest, true) + }) + } + }) + + t.Run("too-small queue size on create is rejected", func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + body := influxdb.CreateReplicationRequest{ + OrgID: testReplication.OrgID, + Name: testReplication.Name, + RemoteID: testReplication.RemoteID, + LocalBucketID: testReplication.LocalBucketID, + RemoteBucketID: testReplication.RemoteBucketID, + MaxQueueSizeBytes: influxdb.MinReplicationMaxQueueSizeBytes / 2, + } + + req := newTestRequest(t, "POST", ts.URL, &body) + + doTestRequest(t, req, http.StatusBadRequest, true) + }) + + t.Run("too-small queue size on update is rejected", func(t *testing.T) { + ts, _ := newTestServer(t) + defer ts.Close() + + newSize := influxdb.MinReplicationMaxQueueSizeBytes / 2 + body := influxdb.UpdateReplicationRequest{MaxQueueSizeBytes: &newSize} + + req := newTestRequest(t, "PATCH", ts.URL+"/"+id.String(), &body) + + doTestRequest(t, req, http.StatusBadRequest, true) + }) +} + +func newTestServer(t *testing.T) (*httptest.Server, *mock.MockReplicationService) { + ctrl := gomock.NewController(t) + svc := mock.NewMockReplicationService(ctrl) + server := annotatedTestServer(NewReplicationHandler(zaptest.NewLogger(t), svc)) + return httptest.NewServer(server), svc +} + +func annotatedTestServer(serv http.Handler) http.Handler { + replicationFlag := feature.MakeFlag("", feature.ReplicationStreamBackend().Key(), "", true, 0, true) + + return feature.NewHandler( + zap.NewNop(), + feature.DefaultFlagger(), + []feature.Flag{replicationFlag}, + serv, + ) +} + +func newTestRequest(t *testing.T, method, path string, body interface{}) *http.Request { + dat, err := json.Marshal(body) + require.NoError(t, err) + + req, err := http.NewRequest(method, path, bytes.NewBuffer(dat)) + require.NoError(t, err) + + req.Header.Add("Content-Type", "application/json") + + return req +} + +func doTestRequest(t *testing.T, req *http.Request, wantCode int, needJSON bool) *http.Response { + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, wantCode, res.StatusCode) + if needJSON { + require.Equal(t, "application/json; charset=utf-8", res.Header.Get("Content-Type")) + } + return res +} diff --git a/scripts/fetch-swagger.sh b/scripts/fetch-swagger.sh index 44192c4294..6adea585c7 100755 --- a/scripts/fetch-swagger.sh +++ b/scripts/fetch-swagger.sh @@ -10,7 +10,7 @@ declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR}) declare -r STATIC_DIR="$ROOT_DIR/static" # Pins the swagger that will be downloaded to a specific commit -declare -r OPENAPI_SHA=04737639fd46ce4dcb6a55b10550967c24d08620 +declare -r OPENAPI_SHA=b1c4e11654e5755f83c197e271c713147d784b8e # Don't do a shallow clone since the commit we want might be several commits # back; but do only clone the main branch.