feat: add APIs for management of replication streams (#22287)

pull/22291/head
Daniel Moran 2021-08-24 14:19:03 -04:00 committed by GitHub
parent 409256c748
commit 641c02f9a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 934 additions and 2 deletions

View File

@ -57,6 +57,8 @@ import (
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/v2/remotes"
remotesTransport "github.com/influxdata/influxdb/v2/remotes/transport"
"github.com/influxdata/influxdb/v2/replications"
replicationTransport "github.com/influxdata/influxdb/v2/replications/transport"
"github.com/influxdata/influxdb/v2/secret"
"github.com/influxdata/influxdb/v2/session"
"github.com/influxdata/influxdb/v2/snowflake"
@ -972,6 +974,12 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
),
)
replicationSvc := replications.NewService()
replicationServer := replicationTransport.NewReplicationHandler(
m.log.With(zap.String("handler", "replications")),
replicationSvc,
)
platformHandler := http.NewPlatformHandler(
m.apibackend,
http.WithResourceHandler(stacksHTTPServer),
@ -990,6 +998,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
http.WithResourceHandler(notebookServer),
http.WithResourceHandler(annotationServer),
http.WithResourceHandler(remotesServer),
http.WithResourceHandler(replicationServer),
)
httpLogger := m.log.With(zap.String("service", "http"))

153
mock/replication.go Normal file
View File

@ -0,0 +1,153 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/influxdata/influxdb/v2 (interfaces: ReplicationService)
// Package mock is a generated GoMock package.
package mock
import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
influxdb "github.com/influxdata/influxdb/v2"
platform "github.com/influxdata/influxdb/v2/kit/platform"
)
// MockReplicationService is a mock of ReplicationService interface.
type MockReplicationService struct {
ctrl *gomock.Controller
recorder *MockReplicationServiceMockRecorder
}
// MockReplicationServiceMockRecorder is the mock recorder for MockReplicationService.
type MockReplicationServiceMockRecorder struct {
mock *MockReplicationService
}
// NewMockReplicationService creates a new mock instance.
func NewMockReplicationService(ctrl *gomock.Controller) *MockReplicationService {
mock := &MockReplicationService{ctrl: ctrl}
mock.recorder = &MockReplicationServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockReplicationService) EXPECT() *MockReplicationServiceMockRecorder {
return m.recorder
}
// CreateReplication mocks base method.
func (m *MockReplicationService) CreateReplication(arg0 context.Context, arg1 influxdb.CreateReplicationRequest) (*influxdb.Replication, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateReplication", arg0, arg1)
ret0, _ := ret[0].(*influxdb.Replication)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateReplication indicates an expected call of CreateReplication.
func (mr *MockReplicationServiceMockRecorder) CreateReplication(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateReplication", reflect.TypeOf((*MockReplicationService)(nil).CreateReplication), arg0, arg1)
}
// DeleteReplication mocks base method.
func (m *MockReplicationService) DeleteReplication(arg0 context.Context, arg1 platform.ID) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteReplication", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteReplication indicates an expected call of DeleteReplication.
func (mr *MockReplicationServiceMockRecorder) DeleteReplication(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteReplication", reflect.TypeOf((*MockReplicationService)(nil).DeleteReplication), arg0, arg1)
}
// GetReplication mocks base method.
func (m *MockReplicationService) GetReplication(arg0 context.Context, arg1 platform.ID) (*influxdb.Replication, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetReplication", arg0, arg1)
ret0, _ := ret[0].(*influxdb.Replication)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetReplication indicates an expected call of GetReplication.
func (mr *MockReplicationServiceMockRecorder) GetReplication(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReplication", reflect.TypeOf((*MockReplicationService)(nil).GetReplication), arg0, arg1)
}
// ListReplications mocks base method.
func (m *MockReplicationService) ListReplications(arg0 context.Context, arg1 influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ListReplications", arg0, arg1)
ret0, _ := ret[0].(*influxdb.Replications)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListReplications indicates an expected call of ListReplications.
func (mr *MockReplicationServiceMockRecorder) ListReplications(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListReplications", reflect.TypeOf((*MockReplicationService)(nil).ListReplications), arg0, arg1)
}
// UpdateReplication mocks base method.
func (m *MockReplicationService) UpdateReplication(arg0 context.Context, arg1 platform.ID, arg2 influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateReplication", arg0, arg1, arg2)
ret0, _ := ret[0].(*influxdb.Replication)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// UpdateReplication indicates an expected call of UpdateReplication.
func (mr *MockReplicationServiceMockRecorder) UpdateReplication(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateReplication", reflect.TypeOf((*MockReplicationService)(nil).UpdateReplication), arg0, arg1, arg2)
}
// ValidateNewReplication mocks base method.
func (m *MockReplicationService) ValidateNewReplication(arg0 context.Context, arg1 influxdb.CreateReplicationRequest) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ValidateNewReplication", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// ValidateNewReplication indicates an expected call of ValidateNewReplication.
func (mr *MockReplicationServiceMockRecorder) ValidateNewReplication(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateNewReplication", reflect.TypeOf((*MockReplicationService)(nil).ValidateNewReplication), arg0, arg1)
}
// ValidateReplication mocks base method.
func (m *MockReplicationService) ValidateReplication(arg0 context.Context, arg1 platform.ID) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ValidateReplication", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// ValidateReplication indicates an expected call of ValidateReplication.
func (mr *MockReplicationServiceMockRecorder) ValidateReplication(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateReplication", reflect.TypeOf((*MockReplicationService)(nil).ValidateReplication), arg0, arg1)
}
// ValidateUpdatedReplication mocks base method.
func (m *MockReplicationService) ValidateUpdatedReplication(arg0 context.Context, arg1 platform.ID, arg2 influxdb.UpdateReplicationRequest) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ValidateUpdatedReplication", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// ValidateUpdatedReplication indicates an expected call of ValidateUpdatedReplication.
func (mr *MockReplicationServiceMockRecorder) ValidateUpdatedReplication(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateUpdatedReplication", reflect.TypeOf((*MockReplicationService)(nil).ValidateUpdatedReplication), arg0, arg1, arg2)
}

View File

@ -182,7 +182,7 @@ func (h *RemoteConnectionHandler) handlePatchRemote(w http.ResponseWriter, r *ht
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, false)
h.api.Respond(w, r, http.StatusNoContent, nil)
return
}

117
replication.go Normal file
View File

@ -0,0 +1,117 @@
package influxdb
import (
"context"
"fmt"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
)
const (
MinReplicationMaxQueueSizeBytes int64 = 33554430 // 32 MiB
DefaultReplicationMaxQueueSizeBytes = 2 * MinReplicationMaxQueueSizeBytes
)
var ErrMaxQueueSizeTooSmall = errors.Error{
Code: errors.EInvalid,
Msg: fmt.Sprintf("maxQueueSize too small, must be at least %d", MinReplicationMaxQueueSizeBytes),
}
type ReplicationService interface {
// ListReplications returns all info about registered replications matching a filter.
ListReplications(context.Context, ReplicationListFilter) (*Replications, error)
// CreateReplication registers a new replication stream.
CreateReplication(context.Context, CreateReplicationRequest) (*Replication, error)
// ValidateNewReplication validates that the given settings for a replication are usable,
// without persisting the configuration.
ValidateNewReplication(context.Context, CreateReplicationRequest) error
// GetReplication returns metadata about the replication with the given ID.
GetReplication(context.Context, platform.ID) (*Replication, error)
// UpdateReplication updates the settings for the replication with the given ID.
UpdateReplication(context.Context, platform.ID, UpdateReplicationRequest) (*Replication, error)
// ValidateUpdatedReplication valdiates that a replication is still usable after applying the
// given update, without persisting the new configuration.
ValidateUpdatedReplication(context.Context, platform.ID, UpdateReplicationRequest) error
// DeleteReplication deletes all info for the replication with the given ID.
DeleteReplication(context.Context, platform.ID) error
// ValidateReplication checks that the replication with the given ID is still usable with its
// persisted settings.
ValidateReplication(context.Context, platform.ID) error
}
// Replication contains all info about a replication that should be returned to users.
type Replication struct {
ID platform.ID `json:"id"`
OrgID platform.ID `json:"orgID"`
Name string `json:"name"`
Description *string `json:"description,omitempty"`
RemoteID platform.ID `json:"remoteID"`
LocalBucketID platform.ID `json:"localBucketID"`
RemoteBucketID platform.ID `json:"remoteBucketID"`
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes"`
CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes"`
LatestResponseCode *int32 `json:"latestResponseCode,omitempty"`
LatestErrorMessage *string `json:"latestErrorMessage,omitempty"`
}
// ReplicationListFilter is a selection filter for listing replications.
type ReplicationListFilter struct {
OrgID platform.ID
Name *string
RemoteID *platform.ID
LocalBucketID *platform.ID
}
// Replications is a collection of metadata about replications.
type Replications struct {
Replications []Replication `json:"replications"`
}
// CreateReplicationRequest contains all info needed to establish a new replication
// to a remote InfluxDB bucket.
type CreateReplicationRequest struct {
OrgID platform.ID `json:"orgID"`
Name string `json:"name"`
Description *string `json:"description,omitempty"`
RemoteID platform.ID `json:"remoteID"`
LocalBucketID platform.ID `json:"localBucketID"`
RemoteBucketID platform.ID `json:"remoteBucketID"`
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes,omitempty"`
}
func (r *CreateReplicationRequest) OK() error {
if r.MaxQueueSizeBytes < MinReplicationMaxQueueSizeBytes {
return &ErrMaxQueueSizeTooSmall
}
return nil
}
// UpdateReplicationRequest contains a partial update to existing info about a replication.
type UpdateReplicationRequest struct {
Name *string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
RemoteID *platform.ID `json:"remoteID,omitempty"`
RemoteBucketID *platform.ID `json:"remoteBucketID,omitempty"`
MaxQueueSizeBytes *int64 `json:"maxQueueSizeBytes,omitempty"`
}
func (r *UpdateReplicationRequest) OK() error {
if r.MaxQueueSizeBytes == nil {
return nil
}
if *r.MaxQueueSizeBytes < MinReplicationMaxQueueSizeBytes {
return &ErrMaxQueueSizeTooSmall
}
return nil
}

54
replications/service.go Normal file
View File

@ -0,0 +1,54 @@
package replications
import (
"context"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
)
var errNotImplemented = &ierrors.Error{
Code: ierrors.ENotImplemented,
Msg: "replication APIs not yet implemented",
}
func NewService() *service {
return &service{}
}
type service struct{}
var _ influxdb.ReplicationService = (*service)(nil)
func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
return nil, errNotImplemented
}
func (s service) CreateReplication(ctx context.Context, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) {
return nil, errNotImplemented
}
func (s service) ValidateNewReplication(ctx context.Context, request influxdb.CreateReplicationRequest) error {
return errNotImplemented
}
func (s service) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) {
return nil, errNotImplemented
}
func (s service) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) {
return nil, errNotImplemented
}
func (s service) ValidateUpdatedReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) error {
return errNotImplemented
}
func (s service) DeleteReplication(ctx context.Context, id platform.ID) error {
return errNotImplemented
}
func (s service) ValidateReplication(ctx context.Context, id platform.ID) error {
return errNotImplemented
}

View File

@ -0,0 +1,247 @@
package transport
import (
"net/http"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"go.uber.org/zap"
)
const (
prefixReplications = "/api/v2/replications"
)
var (
errBadOrg = &errors.Error{
Code: errors.EInvalid,
Msg: "invalid or missing org ID",
}
errBadRemoteID = &errors.Error{
Code: errors.EInvalid,
Msg: "invalid remote ID",
}
errBadLocalBucketID = &errors.Error{
Code: errors.EInvalid,
Msg: "invalid local bucket ID",
}
errBadId = &errors.Error{
Code: errors.EInvalid,
Msg: "replication ID is invalid",
}
)
type ReplicationHandler struct {
chi.Router
log *zap.Logger
api *kithttp.API
replicationsService influxdb.ReplicationService
}
func NewReplicationHandler(log *zap.Logger, svc influxdb.ReplicationService) *ReplicationHandler {
h := &ReplicationHandler{
log: log,
api: kithttp.NewAPI(kithttp.WithLog(log)),
replicationsService: svc,
}
r := chi.NewRouter()
r.Use(
middleware.Recoverer,
middleware.RequestID,
middleware.RealIP,
h.mwReplicationsFlag,
)
r.Route("/", func(r chi.Router) {
r.Get("/", h.handleGetReplications)
r.Post("/", h.handlePostReplication)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.handleGetReplication)
r.Patch("/", h.handlePatchReplication)
r.Delete("/", h.handleDeleteReplication)
r.Post("/validate", h.handleValidateReplication)
})
})
h.Router = r
return h
}
func (h *ReplicationHandler) Prefix() string {
return prefixReplications
}
func (h *ReplicationHandler) mwReplicationsFlag(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flags := feature.FlagsFromContext(r.Context())
if flagVal, ok := flags[feature.ReplicationStreamBackend().Key()]; !ok || !flagVal.(bool) {
h.api.Respond(w, r, http.StatusNotFound, nil)
return
}
next.ServeHTTP(w, r)
})
}
func (h *ReplicationHandler) handleGetReplications(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
// orgID is required for listing replications.
orgID := q.Get("orgID")
o, err := platform.IDFromString(orgID)
if err != nil {
h.api.Err(w, r, errBadOrg)
return
}
// name, remoteID, and localBucketID are optional additional filters.
name := q.Get("name")
remoteID := q.Get("remoteID")
localBucketID := q.Get("localBucketID")
filters := influxdb.ReplicationListFilter{OrgID: *o}
if name != "" {
filters.Name = &name
}
if remoteID != "" {
i, err := platform.IDFromString(remoteID)
if err != nil {
h.api.Err(w, r, errBadRemoteID)
return
}
filters.RemoteID = i
}
if localBucketID != "" {
i, err := platform.IDFromString(localBucketID)
if err != nil {
h.api.Err(w, r, errBadLocalBucketID)
return
}
filters.LocalBucketID = i
}
rs, err := h.replicationsService.ListReplications(r.Context(), filters)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, rs)
}
func (h *ReplicationHandler) handlePostReplication(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
q := r.URL.Query()
validate := q.Get("validate") == "true"
req := influxdb.CreateReplicationRequest{MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes}
if err := h.api.DecodeJSON(r.Body, &req); err != nil {
h.api.Err(w, r, err)
return
}
if validate {
if err := h.replicationsService.ValidateNewReplication(ctx, req); err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, nil)
return
}
replication, err := h.replicationsService.CreateReplication(ctx, req)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusCreated, replication)
}
func (h *ReplicationHandler) handleGetReplication(w http.ResponseWriter, r *http.Request) {
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadId)
return
}
replication, err := h.replicationsService.GetReplication(r.Context(), *id)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, replication)
}
func (h *ReplicationHandler) handlePatchReplication(w http.ResponseWriter, r *http.Request) {
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadId)
return
}
ctx := r.Context()
q := r.URL.Query()
validate := q.Get("validate") == "true"
var req influxdb.UpdateReplicationRequest
if err := h.api.DecodeJSON(r.Body, &req); err != nil {
h.api.Err(w, r, err)
return
}
if validate {
if err := h.replicationsService.ValidateUpdatedReplication(ctx, *id, req); err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, nil)
return
}
replication, err := h.replicationsService.UpdateReplication(ctx, *id, req)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, replication)
}
func (h *ReplicationHandler) handleDeleteReplication(w http.ResponseWriter, r *http.Request) {
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadId)
return
}
if err := h.replicationsService.DeleteReplication(r.Context(), *id); err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, nil)
}
func (h *ReplicationHandler) handleValidateReplication(w http.ResponseWriter, r *http.Request) {
id, err := platform.IDFromString(chi.URLParam(r, "id"))
if err != nil {
h.api.Err(w, r, errBadId)
return
}
if err := h.replicationsService.ValidateReplication(r.Context(), *id); err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusNoContent, nil)
}

View File

@ -0,0 +1,352 @@
package transport
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/mock"
"github.com/stretchr/testify/assert"
tmock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
var (
orgStr = "1234123412341234"
orgID, _ = platform.IDFromString(orgStr)
remoteStr = "9876987698769876"
remoteID, _ = platform.IDFromString(remoteStr)
idStr = "4321432143214321"
id, _ = platform.IDFromString(idStr)
localBucketStr = "1111111111111111"
localBucketId, _ = platform.IDFromString(localBucketStr)
remoteBucketStr = "1234567887654321"
remoteBucketID, _ = platform.IDFromString(remoteBucketStr)
testReplication = influxdb.Replication{
ID: *id,
OrgID: *orgID,
RemoteID: *remoteID,
LocalBucketID: *localBucketId,
RemoteBucketID: *remoteBucketID,
Name: "example",
MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes,
}
)
func TestReplicationHandler(t *testing.T) {
t.Run("get replications happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "GET", ts.URL, nil)
q := req.URL.Query()
q.Add("orgID", orgStr)
q.Add("name", testReplication.Name)
q.Add("remoteID", remoteStr)
q.Add("localBucketID", localBucketStr)
req.URL.RawQuery = q.Encode()
expected := influxdb.Replications{Replications: []influxdb.Replication{testReplication}}
svc.EXPECT().
ListReplications(gomock.Any(), tmock.MatchedBy(func(in influxdb.ReplicationListFilter) bool {
return assert.Equal(t, *orgID, in.OrgID) &&
assert.Equal(t, testReplication.Name, *in.Name) &&
assert.Equal(t, testReplication.RemoteID, *in.RemoteID) &&
assert.Equal(t, testReplication.LocalBucketID, *in.LocalBucketID)
})).Return(&expected, nil)
res := doTestRequest(t, req, http.StatusOK, true)
var got influxdb.Replications
require.NoError(t, json.NewDecoder(res.Body).Decode(&got))
require.Equal(t, expected, got)
})
t.Run("create replication happy path", func(t *testing.T) {
body := influxdb.CreateReplicationRequest{
OrgID: testReplication.OrgID,
Name: testReplication.Name,
RemoteID: testReplication.RemoteID,
LocalBucketID: testReplication.LocalBucketID,
RemoteBucketID: testReplication.RemoteBucketID,
}
t.Run("with explicit queue size", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
body := body
body.MaxQueueSizeBytes = 2 * influxdb.DefaultReplicationMaxQueueSizeBytes
req := newTestRequest(t, "POST", ts.URL, &body)
svc.EXPECT().CreateReplication(gomock.Any(), body).Return(&testReplication, nil)
res := doTestRequest(t, req, http.StatusCreated, true)
var got influxdb.Replication
require.NoError(t, json.NewDecoder(res.Body).Decode(&got))
require.Equal(t, testReplication, got)
})
t.Run("with default queue size", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "POST", ts.URL, &body)
expectedBody := body
expectedBody.MaxQueueSizeBytes = influxdb.DefaultReplicationMaxQueueSizeBytes
svc.EXPECT().CreateReplication(gomock.Any(), expectedBody).Return(&testReplication, nil)
res := doTestRequest(t, req, http.StatusCreated, true)
var got influxdb.Replication
require.NoError(t, json.NewDecoder(res.Body).Decode(&got))
require.Equal(t, testReplication, got)
})
})
t.Run("dry-run create happy path", func(t *testing.T) {
body := influxdb.CreateReplicationRequest{
OrgID: testReplication.OrgID,
Name: testReplication.Name,
RemoteID: testReplication.RemoteID,
LocalBucketID: testReplication.LocalBucketID,
RemoteBucketID: testReplication.RemoteBucketID,
}
t.Run("with explicit queue size", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
body := body
body.MaxQueueSizeBytes = 2 * influxdb.DefaultReplicationMaxQueueSizeBytes
req := newTestRequest(t, "POST", ts.URL, &body)
q := req.URL.Query()
q.Add("validate", "true")
req.URL.RawQuery = q.Encode()
svc.EXPECT().ValidateNewReplication(gomock.Any(), body).Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("with default queue size", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "POST", ts.URL, &body)
q := req.URL.Query()
q.Add("validate", "true")
req.URL.RawQuery = q.Encode()
expectedBody := body
expectedBody.MaxQueueSizeBytes = influxdb.DefaultReplicationMaxQueueSizeBytes
svc.EXPECT().ValidateNewReplication(gomock.Any(), expectedBody).Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
})
t.Run("get replication happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "GET", ts.URL+"/"+id.String(), nil)
svc.EXPECT().GetReplication(gomock.Any(), *id).Return(&testReplication, nil)
res := doTestRequest(t, req, http.StatusOK, true)
var got influxdb.Replication
require.NoError(t, json.NewDecoder(res.Body).Decode(&got))
require.Equal(t, testReplication, got)
})
t.Run("delete replication happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "DELETE", ts.URL+"/"+id.String(), nil)
svc.EXPECT().DeleteReplication(gomock.Any(), *id).Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("update replication happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
newDescription := "my cool replication"
newQueueSize := 3 * influxdb.DefaultReplicationMaxQueueSizeBytes
body := influxdb.UpdateReplicationRequest{Description: &newDescription, MaxQueueSizeBytes: &newQueueSize}
req := newTestRequest(t, "PATCH", ts.URL+"/"+id.String(), body)
svc.EXPECT().UpdateReplication(gomock.Any(), *id, body).Return(&testReplication, nil)
res := doTestRequest(t, req, http.StatusOK, true)
var got influxdb.Replication
require.NoError(t, json.NewDecoder(res.Body).Decode(&got))
require.Equal(t, testReplication, got)
})
t.Run("dry-run update happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
newDescription := "my cool replication"
newQueueSize := 3 * influxdb.DefaultReplicationMaxQueueSizeBytes
body := influxdb.UpdateReplicationRequest{Description: &newDescription, MaxQueueSizeBytes: &newQueueSize}
req := newTestRequest(t, "PATCH", ts.URL+"/"+id.String(), body)
q := req.URL.Query()
q.Add("validate", "true")
req.URL.RawQuery = q.Encode()
svc.EXPECT().ValidateUpdatedReplication(gomock.Any(), *id, body).Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("validate replication happy path", func(t *testing.T) {
ts, svc := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "POST", ts.URL+"/"+id.String()+"/validate", nil)
svc.EXPECT().ValidateReplication(gomock.Any(), *id).Return(nil)
doTestRequest(t, req, http.StatusNoContent, false)
})
t.Run("invalid replication IDs return 400", func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
req1 := newTestRequest(t, "GET", ts.URL+"/foo", nil)
req2 := newTestRequest(t, "PATCH", ts.URL+"/foo", &influxdb.UpdateReplicationRequest{})
req3 := newTestRequest(t, "DELETE", ts.URL+"/foo", nil)
for _, req := range []*http.Request{req1, req2, req3} {
t.Run(req.Method, func(t *testing.T) {
doTestRequest(t, req, http.StatusBadRequest, true)
})
}
})
t.Run("invalid org ID to GET /replications returns 400", func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
req := newTestRequest(t, "GET", ts.URL, nil)
q := req.URL.Query()
q.Add("orgID", "foo")
req.URL.RawQuery = q.Encode()
doTestRequest(t, req, http.StatusBadRequest, true)
})
t.Run("invalid request bodies return 400", func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
body := "o no not an object"
req1 := newTestRequest(t, "POST", ts.URL, &body)
req2 := newTestRequest(t, "PATCH", ts.URL+"/"+id.String(), &body)
for _, req := range []*http.Request{req1, req2} {
t.Run(req.Method, func(t *testing.T) {
doTestRequest(t, req, http.StatusBadRequest, true)
})
}
})
t.Run("too-small queue size on create is rejected", func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
body := influxdb.CreateReplicationRequest{
OrgID: testReplication.OrgID,
Name: testReplication.Name,
RemoteID: testReplication.RemoteID,
LocalBucketID: testReplication.LocalBucketID,
RemoteBucketID: testReplication.RemoteBucketID,
MaxQueueSizeBytes: influxdb.MinReplicationMaxQueueSizeBytes / 2,
}
req := newTestRequest(t, "POST", ts.URL, &body)
doTestRequest(t, req, http.StatusBadRequest, true)
})
t.Run("too-small queue size on update is rejected", func(t *testing.T) {
ts, _ := newTestServer(t)
defer ts.Close()
newSize := influxdb.MinReplicationMaxQueueSizeBytes / 2
body := influxdb.UpdateReplicationRequest{MaxQueueSizeBytes: &newSize}
req := newTestRequest(t, "PATCH", ts.URL+"/"+id.String(), &body)
doTestRequest(t, req, http.StatusBadRequest, true)
})
}
func newTestServer(t *testing.T) (*httptest.Server, *mock.MockReplicationService) {
ctrl := gomock.NewController(t)
svc := mock.NewMockReplicationService(ctrl)
server := annotatedTestServer(NewReplicationHandler(zaptest.NewLogger(t), svc))
return httptest.NewServer(server), svc
}
func annotatedTestServer(serv http.Handler) http.Handler {
replicationFlag := feature.MakeFlag("", feature.ReplicationStreamBackend().Key(), "", true, 0, true)
return feature.NewHandler(
zap.NewNop(),
feature.DefaultFlagger(),
[]feature.Flag{replicationFlag},
serv,
)
}
func newTestRequest(t *testing.T, method, path string, body interface{}) *http.Request {
dat, err := json.Marshal(body)
require.NoError(t, err)
req, err := http.NewRequest(method, path, bytes.NewBuffer(dat))
require.NoError(t, err)
req.Header.Add("Content-Type", "application/json")
return req
}
func doTestRequest(t *testing.T, req *http.Request, wantCode int, needJSON bool) *http.Response {
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, wantCode, res.StatusCode)
if needJSON {
require.Equal(t, "application/json; charset=utf-8", res.Header.Get("Content-Type"))
}
return res
}

View File

@ -10,7 +10,7 @@ declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR})
declare -r STATIC_DIR="$ROOT_DIR/static"
# Pins the swagger that will be downloaded to a specific commit
declare -r OPENAPI_SHA=04737639fd46ce4dcb6a55b10550967c24d08620
declare -r OPENAPI_SHA=b1c4e11654e5755f83c197e271c713147d784b8e
# Don't do a shallow clone since the commit we want might be several commits
# back; but do only clone the main branch.